Created
November 24, 2020 19:13
-
-
Save samirsogay/70b7f4850ca538e27f1407110c57ba75 to your computer and use it in GitHub Desktop.
Python code for Toddler Following Rover
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -* | |
# mode: 1..hand control, 2..find green area, 3..find squares and recognize, 4..found | |
# 41..right_step1, 42..rs2, 43..rs3, ... | |
# 51..left_step1, 52..ls2, 53..ls3, ... | |
# 61, 62, 63 ... back to the middle | |
from megapi_python3 import * | |
import numpy as np | |
import imutils | |
import cv2 | |
from imutils.video import VideoStream | |
from flask import Response | |
from flask import Flask | |
from flask import render_template | |
import threading | |
import datetime | |
import time | |
import os | |
from tflite_runtime.interpreter import Interpreter | |
MODEL_NAME = 'Sample_TFLite_model' | |
GRAPH_NAME = 'detect.tflite' | |
LABELMAP_NAME = 'labelmap.txt' | |
min_conf_threshold = float(0.6) | |
imW, imH = 640, 480 | |
# Get path to current working directory | |
CWD_PATH = os.getcwd() | |
# Path to .tflite file, which contains the model that is used for object detection | |
PATH_TO_CKPT = os.path.join(CWD_PATH,MODEL_NAME,GRAPH_NAME) | |
# Path to label map file | |
PATH_TO_LABELS = os.path.join(CWD_PATH,MODEL_NAME,LABELMAP_NAME) | |
# Load the label map | |
with open(PATH_TO_LABELS, 'r') as f: | |
labels = [line.strip() for line in f.readlines()] | |
# Have to do a weird fix for label map if using the COCO "starter model" from | |
# https://www.tensorflow.org/lite/models/object_detection/overview | |
# First label is '???', which has to be removed. | |
if labels[0] == '???': | |
del(labels[0]) | |
interpreter = Interpreter(model_path=PATH_TO_CKPT) | |
interpreter.allocate_tensors() | |
# Get model details | |
input_details = interpreter.get_input_details() | |
output_details = interpreter.get_output_details() | |
height = input_details[0]['shape'][1] | |
width = input_details[0]['shape'][2] | |
floating_model = (input_details[0]['dtype'] == np.float32) | |
input_mean = 127.5 | |
input_std = 127.5 | |
# Initialize frame rate calculation | |
frame_rate_calc = 1 | |
freq = cv2.getTickFrequency() | |
# initialize the output frame and a lock used to ensure thread-safe | |
# exchanges of the output frames (useful for multiple browsers/tabs | |
# are viewing tthe stream) | |
outputFrame = None | |
lock = threading.Lock() | |
detect=1 | |
# initialize a flask object | |
app = Flask(__name__,static_url_path='/static') | |
@app.route("/") | |
def index(): | |
# return the rendered template | |
return render_template("index.html") | |
@app.route("/forward") | |
def forward( ): | |
global key | |
key=82 | |
return 'OK' | |
@app.route("/stop") | |
def stop( ): | |
global key | |
key = ord('t') | |
return 'OK' | |
@app.route("/backward") | |
def backward( ): | |
global key | |
key=84 | |
return 'OK' | |
@app.route("/right") | |
def right( ): | |
global key | |
key = 83 | |
return 'OK' | |
@app.route("/left") | |
def left( ): | |
global key | |
key=81 | |
return 'OK' | |
@app.route("/hand") | |
def hand( ): | |
global key | |
key = ord('h') | |
return 'OK' | |
@app.route("/auto") | |
def auto( ): | |
global key | |
key=ord('s') | |
return 'OK' | |
@app.route("/quit") | |
def quit( ): | |
global key | |
key = ord('q') | |
return 'OK' | |
def onRead(v): | |
global dist | |
dist = v | |
def onEnd(v): | |
stop() | |
def stop(): | |
bot.encoderMotorRun(1,0) | |
bot.encoderMotorRun(2,0) | |
def right(step): | |
bot.encoderMotorMove(1,120,-step,onEnd) | |
bot.encoderMotorMove(2,120,-step,onEnd) | |
def left(step): | |
bot.encoderMotorMove(1,120,step,onEnd) | |
bot.encoderMotorMove(2,120,step,onEnd) | |
def forw(step): | |
bot.encoderMotorMove(1,150, step, onEnd) | |
bot.encoderMotorMove(2,150, -step, onEnd) | |
def back(step): | |
bot.encoderMotorMove(1,150, -step, onEnd) | |
bot.encoderMotorMove(2,150, step, onEnd) | |
def s_forw(): | |
bot.encoderMotorRun(1,150) | |
bot.encoderMotorRun(2,-150) | |
def s_left(): | |
bot.encoderMotorRun(1, 120) | |
bot.encoderMotorRun(2, 120) | |
def s_right(): | |
bot.encoderMotorRun(1,-120) | |
bot.encoderMotorRun(2,-120) | |
def s_back(): | |
bot.encoderMotorRun(1,-150) | |
bot.encoderMotorRun(2,150) | |
def dete(im): # Adrian R. | |
global dist,detect,hold_time_start,end_time,start_time,mode,person | |
print('Detection begins') | |
start_time = time.time() | |
frame_rgb = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) | |
frame_resized = cv2.resize(frame_rgb, (width, height)) | |
input_data = np.expand_dims(frame_resized, axis=0) | |
# Normalize pixel values if using a floating model (i.e. if model is non-quantized) | |
if floating_model: | |
input_data = (np.float32(input_data) - input_mean) / input_std | |
start_time=time.time() | |
# Perform the actual detection by running the model with the image as input | |
interpreter.set_tensor(input_details[0]['index'],input_data) | |
interpreter.invoke() | |
end_time=time.time() | |
zyc=end_time-start_time | |
# Retrieve detection results | |
boxes = interpreter.get_tensor(output_details[0]['index'])[0] # Bounding box coordinates of detected objects | |
classes = interpreter.get_tensor(output_details[1]['index'])[0] # Class index of detected objects | |
scores = interpreter.get_tensor(output_details[2]['index'])[0] # Confidence of detected objects | |
end_time = time.time() | |
person = 0 | |
for i in range(len(scores)): | |
if ((scores[i] > min_conf_threshold) and (scores[i] <= 1.0) and int(classes[i]) == 0): | |
print('Confidence greater than 0.6 for class person') | |
# Get bounding box coordinates and draw box | |
# Interpreter can return coordinates that are outside of image dimensions, need to force them to be within image using max() and min() | |
ymin = int(max(1,(boxes[i][0] * imH))) | |
xmin = int(max(1,(boxes[i][1] * imW))) | |
ymax = int(min(imH,(boxes[i][2] * imH))) | |
xmax = int(min(imW,(boxes[i][3] * imW))) | |
cv2.rectangle(im, (xmin,ymin), (xmax,ymax), (10, 255, 0), 2) | |
print('found person') | |
person=1 | |
x = (xmin+xmax)/2 | |
print('x',x) | |
if x == 0: # nothing found | |
left(50) # turn left | |
print('nothing found') | |
elif x < 240: # M-point too far left | |
y=240-int(x) | |
# y=y*2/3 | |
print('y',y) | |
left(int(y)) | |
sleep(1) | |
print('Turning left to centre camera') | |
elif x > 400: # M-point too far right | |
z=int(x)-400 | |
# z=z*2/3 | |
right(int(z)) | |
sleep(1) | |
print('Turning right to centre camera') | |
else: # in the center (230 ... 370) | |
print('found person in centre') | |
bot.ultrasonicSensorRead(7,onRead) | |
sleep(0.1) | |
a = round(dist) | |
print('Distance=') | |
print( str(a)) | |
hold_time_start=time.time() | |
detect=0 | |
if a == 400: | |
print('Wrong reading') | |
sleep(1) | |
elif a > 200: | |
forw(250) | |
print('Forwarding 250 steps') | |
sleep(1) | |
return im | |
# Main Program | |
camera = VideoStream(resolution=(imW,imH),framerate=30).start() | |
time.sleep(2.0) | |
bot = MegaPi() | |
bot.start() | |
stop() | |
mode = 1 # hand control | |
frcnt = 40 # Camera initialisation time 4 s (40 x 0.1 s) | |
key=ord('h') | |
person=0 | |
hold_time_start=time.time() | |
def main(): | |
global frcnt,mode,lock,outputFrame,key,detect,person | |
while True: # loop takes 0.1 s | |
# start_time = time.time() # Seconds display | |
frame = camera.read() | |
frame = cv2.flip(frame,-1) | |
frcnt = frcnt - 1 # show some frames without activity | |
if frcnt < 1: | |
# print('mode',mode) | |
if mode == 1: # hand control | |
frcnt = 1 | |
if key == 82: # arrow keys | |
s_forw() | |
bot.ultrasonicSensorRead(7,onRead) | |
sleep(0.1) | |
a = round(dist) | |
if a < 20: | |
key = ord('t') | |
elif key == 84: | |
s_back() | |
elif key == ord('t'): | |
stop() | |
elif key == 81: | |
s_left() | |
elif key == 83: | |
s_right() | |
with lock: | |
outputFrame = frame.copy() | |
elif mode == 62: | |
left(150) | |
mode = 4 | |
frcnt = 30 | |
with lock: | |
outputFrame = frame.copy() | |
elif mode == 4: # find person | |
stop() | |
current_time=time.time() | |
hold_time=current_time - hold_time_start | |
if hold_time>=10: | |
detect=1 | |
if detect==1: | |
frame = dete(frame) # recognize the picture now | |
zyc = end_time-start_time | |
label = "{:.1f}".format(zyc) | |
cv2.putText(frame, label, (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2) | |
if person == 1 : | |
mode = 4 | |
else : | |
mode = 62 | |
person = 0 | |
with lock: | |
outputFrame = frame.copy() | |
if key == ord('q'): | |
break | |
elif key == ord('h'): | |
mode = 1 | |
print('hand control') | |
elif key == ord('s'): | |
key = 0 | |
mode = 4 | |
print('find Person') | |
stop() | |
bot.close() | |
sys.exit() | |
def generate(): | |
# grab global references to the output frame and lock variables | |
global outputFrame, lock | |
# loop over frames from the output stream | |
while True: | |
# wait until the lock is acquired | |
with lock: | |
# check if the output frame is available, otherwise skip | |
# the iteration of the loop | |
if outputFrame is None: | |
continue | |
# encode the frame in JPEG format | |
(flag, encodedImage) = cv2.imencode(".jpg", outputFrame) | |
# ensure the frame was successfully encoded | |
if not flag: | |
continue | |
# yield the output frame in the byte format | |
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + | |
bytearray(encodedImage) + b'\r\n') | |
@app.route("/video_feed") | |
def video_feed(): | |
# return the response generated along with the specific media | |
# type (mime type) | |
return Response(generate(), | |
mimetype = "multipart/x-mixed-replace; boundary=frame") | |
if __name__ == '__main__': | |
print('[INFO] Abort with Ctrl+c') | |
print('[INFO] (megapi-Threads terminate)') | |
t = threading.Thread(target=main) | |
t.daemon = True | |
t.start() | |
# start the flask app | |
app.run(host='0.0.0.0', port='8000', debug=True, | |
threaded=True, use_reloader=False) | |
sys.exit() | |
sleep(0.1) | |
camera.stop() | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment