Last active
August 9, 2022 20:10
-
-
Save JoeKarlsson/011d893c3283728390876374803f3c22 to your computer and use it in GitHub Desktop.
Viam Dofbot Config
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from viam.components.arm import ArmClient | |
from viam.components.gripper import GripperClient | |
from viam.proto.api.component.arm import JointPositions | |
from viam.robot.client import RobotClient | |
from viam.components.camera import CameraClient, Camera | |
from viam.services.vision import DetectorConfig, DetectorType, Detection | |
from viam.services.types import ServiceType | |
from viam.rpc.dial import Credentials, DialOptions | |
import time | |
import asyncio | |
import os | |
from math import degrees | |
from PIL import Image, ImageDraw | |
# Connect our robot to the Viam app in the cloud | |
async def setupRobot(): | |
ADDRESS_SECRET = "Dofbot-main.dt0gzquiy2.local.viam.cloud:8080" | |
PAYLOAD_SECRET = "8r7b0za66679nxqe58wxkciys9slsl7p5q7rudg541u2d6b3" | |
# ADDRESS_SECRET = "[ADD YOUR ROBOT ADDRESS HERE. YOU CAN FIND THIS ON THE CONNECT TAB]" | |
# PAYLOAD_SECRET = "[PLEASE ADD YOUR SECRET HERE. YOU CAN FIND THIS ON THE CONNECT TAB]" | |
creds = Credentials(type='robot-location-secret', payload=PAYLOAD_SECRET) | |
opts = RobotClient.Options(refresh_interval=0, dial_options=DialOptions(credentials=creds)) | |
robot = await RobotClient.at_address(ADDRESS_SECRET, opts) | |
# print(robot.resource_names) | |
return robot | |
# initialize the robot arm in a vertical straight position | |
async def resetRobotArmPosition(robot: RobotClient, arm: ArmClient, gripper: GripperClient): | |
init_position = JointPositions(values=[0, -90, 45, 45, 0]) # bent at 90 degree angle | |
await arm.move_to_joint_positions(init_position) | |
await gripper.open() | |
return robot | |
async def cleanUpOldPhotos(): | |
mydir = "data" | |
for f in os.listdir(mydir): | |
if not f.endswith(".png"): | |
continue | |
os.remove(os.path.join(mydir, f)) | |
# Moves the arm left or right when the triggers are pressed and held | |
async def handleLeftRightTrigger(arm: ArmClient, leftOrRight): | |
currentPosition = await arm.get_joint_positions() | |
speed = 11.0 | |
if leftOrRight == 'left': | |
newServoOnePosition = (currentPosition.values[0]) + speed | |
else: | |
newServoOnePosition = (currentPosition.values[0]) - speed | |
if newServoOnePosition >= -90 and newServoOnePosition <= 90: | |
newArmPositions = [newServoOnePosition, | |
currentPosition.values[1], | |
currentPosition.values[2], | |
currentPosition.values[3], | |
currentPosition.values[4] | |
] | |
newArmJointPositions = JointPositions(values=newArmPositions) | |
await arm.move_to_joint_positions(newArmJointPositions) | |
# Returns the vision service with an added detector | |
async def getVisionService(robot: RobotClient): | |
vis = robot.get_service(ServiceType.VISION) | |
detectors = await vis.get_detector_names() | |
paramDict = {"model_path": "/home/viamRobot1/models/effdet0.tflite", "num_threads": 2, "label_path": "/home/viamRobot1/models/labels.txt"} | |
tflite = DetectorConfig(name="ml-detector", type=DetectorType("tflite"), parameters=paramDict) | |
await vis.add_detector(tflite) | |
return vis | |
# takePic returns a PIL image from the camera | |
async def takePic(robot): | |
camera = Camera.from_robot(robot, "camera") | |
frame = await camera.get_frame() | |
return frame | |
# Saves the photo as a PNG to local storage | |
def savePic(frame): | |
timestr = time.strftime("%Y%m%d-%H%M%S") | |
frame.save("data/lastFrame" + timestr + ".png") | |
width, height = frame.size | |
frame.show() | |
return frame | |
# Draws the detection box on the frame | |
async def drawFancyBox(d, frame): | |
w, h = frame.size | |
rect = [d.x_min, d.y_min , d.x_max, d.y_max] | |
img1 = ImageDraw.Draw(frame) | |
img1.rectangle(rect, outline="red") | |
img1.text((d.x_min+10,d.y_min+10),d.class_name) | |
return frame | |
# Takes a pic, and uses the ml-detector to see what's in it | |
# Only considers the top 5 (according to model) most likely detections. | |
async def takePhotoAndDetectImageContents(robot: RobotClient): | |
vis = await getVisionService(robot) | |
frame = await takePic(robot) | |
cleanImage = frame.convert('RGB') | |
detections = await vis.get_detections_from_camera('camera', "ml-detector") | |
for d in detections[0:5]: | |
frame2 = await drawFancyBox(d,frame) | |
# savePic(frame2) | |
return frame2 | |
# Determines whether the arm should move left or right based on the largest detection box | |
async def leftOrRight(robot, detections, midpoint): | |
# Get largest box | |
largest_personbox_area = 0 | |
largest = Detection() | |
if not detections: | |
print("Joe - it isnt detecting") | |
return -1 | |
for d in detections: | |
a = (d.x_max - d.x_min) * (d.y_max-d.y_min) | |
if d.class_name == 'Person': | |
if a > largest_personbox_area: | |
a = largest_personbox_area | |
largest = d | |
largestCenterX = (largest.x_min + largest.x_max) / 2 | |
leftMidPointX = midpoint - (midpoint / 6) | |
rightMidPointX = midpoint + (midpoint / 6) | |
if largestCenterX > leftMidPointX and largestCenterX < rightMidPointX: | |
return 1 # basically centered | |
if largestCenterX < rightMidPointX: | |
return 0 # on the right | |
else: | |
return 2 # on the left | |
async def findAndFollowMe(robot: RobotClient, arm: ArmClient): | |
vis = await getVisionService(robot) | |
vis = await getVisionService(robot) | |
while (True): | |
frame = await takePic(robot) | |
cleanImage = frame.convert('RGB') | |
detections = await vis.get_detections(cleanImage, "ml-detector") | |
await takePhotoAndDetectImageContents(robot) | |
answer = await leftOrRight(robot, detections, frame.size[0] / 2) | |
if answer == 0: | |
print("left") | |
await handleLeftRightTrigger(arm, 'left') | |
if answer == 1: | |
print("center") | |
if answer == 2: | |
print("right") | |
await handleLeftRightTrigger(arm, 'right') | |
await asyncio.sleep(0.05) | |
async def main(): | |
robot = await setupRobot() | |
await cleanUpOldPhotos() | |
arm = ArmClient.from_robot(robot=robot, name='myarm') | |
gripper = GripperClient.from_robot(robot=robot, name='mygripper') | |
await resetRobotArmPosition(robot, arm, gripper) | |
await findAndFollowMe(robot, arm) | |
await robot.close() | |
if __name__ == '__main__': | |
print("Starting up... ") | |
asyncio.run(main()) | |
print("Done.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from viam.components.arm import ArmClient | |
from viam.components.gripper import GripperClient | |
from viam.proto.api.component.arm import JointPositions | |
import viam.proto.api.component.inputcontroller as ic | |
from viam.robot.client import RobotClient | |
from viam.components.camera import CameraClient, Camera | |
from viam.services.vision import DetectorConfig, DetectorType, Detection | |
from viam.services.motion import MotionClient | |
from viam.services.types import ServiceType | |
from viam.proto.api.common import Pose, PoseInFrame, WorldState, GeometriesInFrame, Geometry, RectangularPrism, ResourceName | |
from viam.rpc.dial import Credentials, DialOptions | |
import time | |
import asyncio | |
import random | |
from math import degrees | |
from PIL import Image, ImageDraw | |
# Connect our robot to the Viam app in the cloud | |
async def setupRobot(): | |
ADDRESS_SECRET = "Dofbot-main.dt0gzquiy2.local.viam.cloud:8080" | |
PAYLOAD_SECRET = "8r7b0za66679nxqe58wxkciys9slsl7p5q7rudg541u2d6b3" | |
# ADDRESS_SECRET = "[ADD YOUR ROBOT ADDRESS HERE. YOU CAN FIND THIS ON THE CONNECT TAB]" | |
# PAYLOAD_SECRET = "[PLEASE ADD YOUR SECRET HERE. YOU CAN FIND THIS ON THE CONNECT TAB]" | |
creds = Credentials(type='robot-location-secret', payload=PAYLOAD_SECRET) | |
opts = RobotClient.Options(refresh_interval=0, dial_options=DialOptions(credentials=creds)) | |
robot = await RobotClient.at_address(ADDRESS_SECRET, opts) | |
# print(robot.resource_names) | |
return robot | |
# initialize the robot arm in a vertical straight position | |
async def resetRobotArmPosition(robot: RobotClient, arm: ArmClient, gripper: GripperClient): | |
init_position = JointPositions(values=[0, -90, 45, 45, 0]) # bent at 90 degree angle | |
# init_position = JointPositions(values=[0, -90, 0, 0, 0]) # stick straight up in the air | |
await arm.move_to_joint_positions(init_position) | |
await gripper.open() | |
return robot | |
# Opens the gripper and closes it again | |
async def openAndCloseGripper(gripper: GripperClient): | |
await gripper.open() | |
await asyncio.sleep(0.5) | |
await gripper.grab() | |
# Moves the arm left or right when the triggers are pressed and held | |
async def handleLeftRightTrigger(arm: ArmClient, leftOrRight): | |
currentPosition = await arm.get_joint_positions() | |
speed = 20.0 | |
if leftOrRight == 'left': | |
newServoOnePosition = (currentPosition.values[0]) + speed | |
else: | |
newServoOnePosition = (currentPosition.values[0]) - speed | |
if newServoOnePosition >= -90 and newServoOnePosition <= 90: | |
newArmPositions = [newServoOnePosition, | |
currentPosition.values[1], | |
currentPosition.values[2], | |
currentPosition.values[3], | |
currentPosition.values[4] | |
] | |
newArmJointPositions = JointPositions(values=newArmPositions) | |
await arm.move_to_joint_positions(newArmJointPositions) | |
# generic function to handle all the buttons on a 8bit controller and maps them to the robot arm | |
async def handleController(robot: RobotClient, arm: ArmClient, gripper: GripperClient): | |
controller = ic.InputControllerServiceStub(robot._channel) | |
resp: ic.GetControlsResponse = await controller.GetControls(ic.GetControlsRequest(controller="8bit-do-controller")) | |
# Show the input controller's buttons/axes | |
print(f'Controls:\n{resp.controls}') | |
# Subscribe to AllEvents on All Controls from above. | |
subEvents = [ic.StreamEventsRequest.Events(events=["AllEvents"], control=c) for c in resp.controls] | |
request = ic.StreamEventsRequest(controller="8bit-do-controller", events=subEvents) | |
async with controller.StreamEvents.open() as stream: | |
await stream.send_message(request, end=True) | |
async for reply in stream: | |
# Pretty print the event (time is UTC) | |
print(f'{reply.event.time.ToDatetime()} - {reply.event.control} - {reply.event.event} - {reply.event.value}') | |
if reply.event.control == "ButtonStart": | |
if reply.event.event == "ButtonPress": | |
await resetRobotArmPosition(robot, arm, gripper) | |
if reply.event.control == "ButtonSelect": | |
if reply.event.event == "ButtonPress": | |
await openAndCloseGripper(gripper) | |
if reply.event.control == "ButtonLT": | |
if reply.event.event == "ButtonHold": | |
await handleLeftRightTrigger(arm, 'left') | |
# elif reply.event.event == "ButtonRelease": | |
if reply.event.control == "ButtonRT": | |
if reply.event.event == "ButtonHold": | |
await handleLeftRightTrigger(arm, 'right') | |
# elif reply.event.event == "ButtonRelease": | |
async def main(): | |
robot = await setupRobot() | |
arm = ArmClient.from_robot(robot=robot, name='myarm') | |
gripper = GripperClient.from_robot(robot=robot, name='mygripper') | |
await resetRobotArmPosition(robot, arm, gripper) | |
await handleController(robot, arm, gripper) | |
await robot.close() | |
if __name__ == '__main__': | |
print("Starting up... ") | |
asyncio.run(main()) | |
print("Done.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from viam.components.arm import ArmClient | |
from viam.components.gripper import GripperClient | |
from viam.proto.api.component.arm import JointPositions | |
from viam.robot.client import RobotClient | |
from viam.services.types import ServiceType | |
from viam.proto.api.common import Pose, PoseInFrame, WorldState, GeometriesInFrame, Geometry, RectangularPrism, ResourceName | |
from viam.rpc.dial import Credentials, DialOptions | |
import time | |
import asyncio | |
import random | |
from math import degrees | |
from PIL import Image, ImageDraw | |
# Connect our robot to the Viam app in the cloud | |
async def setupRobot(): | |
ADDRESS_SECRET = "Dofbot-main.dt0gzquiy2.local.viam.cloud:8080" | |
PAYLOAD_SECRET = "8r7b0za66679nxqe58wxkciys9slsl7p5q7rudg541u2d6b3" | |
# ADDRESS_SECRET = "[ADD YOUR ROBOT ADDRESS HERE. YOU CAN FIND THIS ON THE CONNECT TAB]" | |
# PAYLOAD_SECRET = "[PLEASE ADD YOUR SECRET HERE. YOU CAN FIND THIS ON THE CONNECT TAB]" | |
creds = Credentials(type='robot-location-secret', payload=PAYLOAD_SECRET) | |
opts = RobotClient.Options(refresh_interval=0, dial_options=DialOptions(credentials=creds)) | |
robot = await RobotClient.at_address(ADDRESS_SECRET, opts) | |
# print(robot.resource_names) | |
return robot | |
# initialize the robot arm in a vertical straight position | |
async def resetRobotArmPosition(robot: RobotClient, arm: ArmClient, gripper: GripperClient): | |
init_position = JointPositions(values=[0, -90, 45, 45, 0]) # bent at 90 degree angle | |
await arm.move_to_joint_positions(init_position) | |
await gripper.open() | |
return robot | |
# Creates a virtual rectangular obstacle in the dofbots world, and moves around while avoiding this virtual obstacle | |
async def motionServiceTest(robot: RobotClient, arm: ArmClient): | |
motionServ = robot.get_service(ServiceType.MOTION) | |
# That will create a 1x1x1 obstacle 5mm below wherever your arm currently is | |
geom = Geometry(center=Pose(x=410, y=-110, z=527), box=RectangularPrism(width_mm =2, length_mm =5, depth_mm =5)) | |
geomFrame = GeometriesInFrame(reference_frame="myarm", geometries=[geom]) | |
worldstate = WorldState(obstacles=[geomFrame]) | |
# Coordinates of the change in the DOFBOT's arm's pose | |
x = 0 | |
y = 0 | |
z = 30 | |
# orientation or theta | |
o_x = 0 | |
o_y = 0 | |
o_z = 1 | |
pos = await arm.get_end_position() | |
print(pos) | |
pos.x += x | |
pos.y += y | |
pos.z += z | |
pos.o_x += o_x | |
pos.o_y += o_y | |
pos.o_z += o_z | |
for resname in robot.resource_names: | |
if resname.name == "myarm": | |
armRes = resname | |
# We pass the WorldState above with the geometry. The arm should successfully route around it. | |
await motionServ.move(component_name=armRes, destination=PoseInFrame(reference_frame="world", pose=pos), world_state=worldstate) | |
pos = await arm.get_end_position() | |
pos.x += x | |
pos.y += y | |
pos.z += z | |
pos.o_x += o_x | |
pos.o_y += o_y | |
pos.o_z += o_z | |
await asyncio.sleep(1) | |
await motionServ.move(component_name=armRes, destination=PoseInFrame(reference_frame="world", pose=pos), world_state=worldstate) | |
async def main(): | |
robot = await setupRobot() | |
arm = ArmClient.from_robot(robot=robot, name='myarm') | |
gripper = GripperClient.from_robot(robot=robot, name='mygripper') | |
await resetRobotArmPosition(robot, arm, gripper) | |
await motionServiceTest(robot, arm) | |
await robot.close() | |
if __name__ == '__main__': | |
print("Starting up... ") | |
asyncio.run(main()) | |
print("Done.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from viam.components.arm import ArmClient | |
from viam.components.gripper import GripperClient | |
from viam.proto.api.component.arm import JointPositions | |
from viam.robot.client import RobotClient | |
from viam.rpc.dial import Credentials, DialOptions | |
import time | |
import asyncio | |
import random | |
from math import degrees | |
from PIL import Image, ImageDraw | |
# Connect our robot to the Viam app in the cloud | |
async def setupRobot(): | |
ADDRESS_SECRET = "Dofbot-main.dt0gzquiy2.local.viam.cloud:8080" | |
PAYLOAD_SECRET = "8r7b0za66679nxqe58wxkciys9slsl7p5q7rudg541u2d6b3" | |
# ADDRESS_SECRET = "[ADD YOUR ROBOT ADDRESS HERE. YOU CAN FIND THIS ON THE CONNECT TAB]" | |
# PAYLOAD_SECRET = "[PLEASE ADD YOUR SECRET HERE. YOU CAN FIND THIS ON THE CONNECT TAB]" | |
creds = Credentials(type='robot-location-secret', payload=PAYLOAD_SECRET) | |
opts = RobotClient.Options(refresh_interval=0, dial_options=DialOptions(credentials=creds)) | |
robot = await RobotClient.at_address(ADDRESS_SECRET, opts) | |
# print(robot.resource_names) | |
return robot | |
# initialize the robot arm in a vertical straight position | |
async def resetRobotArmPosition(robot: RobotClient, arm: ArmClient, gripper: GripperClient): | |
init_position = JointPositions(values=[0, -90, 45, 45, 0]) # bent at 90 degree angle | |
# init_position = JointPositions(values=[0, -90, 0, 0, 0]) # stick straight up in the air | |
await arm.move_to_joint_positions(init_position) | |
await gripper.open() | |
return robot | |
# Get a random position of each servo that is within the range of movement that won't completly destroy the arm | |
def getRandoms(): | |
return [random.randint(-90, 90), | |
random.randint(-120, -45), | |
random.randint(-45, 45), | |
random.randint(-45, 45), | |
random.randint(-45, 45)] | |
# Opens the gripper and closes it again | |
async def openAndCloseGripper(gripper: GripperClient): | |
await gripper.open() | |
await asyncio.sleep(0.5) | |
await gripper.grab() | |
# Moves the arm into a new random position every second | |
async def randomMovement(robot: RobotClient, arm: ArmClient, gripper: GripperClient): | |
while (True): | |
await openAndCloseGripper(gripper) | |
randomPositions = getRandoms() | |
newRandomArmJointPositions = JointPositions(values=randomPositions) | |
await arm.move_to_joint_positions(newRandomArmJointPositions) | |
await asyncio.sleep(1) | |
return | |
async def main(): | |
robot = await setupRobot() | |
arm = ArmClient.from_robot(robot=robot, name='myarm') | |
gripper = GripperClient.from_robot(robot=robot, name='mygripper') | |
await resetRobotArmPosition(robot, arm, gripper) | |
await randomMovement(robot, arm, gripper) | |
await robot.close() | |
if __name__ == '__main__': | |
print("Starting up... ") | |
asyncio.run(main()) | |
print("Done.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"components": [ | |
{ | |
"attributes": { | |
"i2cs": [ | |
{ | |
"bus": "1", | |
"name": "bus1" | |
} | |
] | |
}, | |
"depends_on": [], | |
"model": "pi", | |
"name": "local", | |
"service_config": [], | |
"type": "board" | |
}, | |
{ | |
"attributes": { | |
"camera_parameters": { | |
"distortion": { | |
"rk1": 0, | |
"rk2": 0, | |
"rk3": 0, | |
"tp1": 0, | |
"tp2": 0 | |
}, | |
"fx": 0, | |
"fy": 0, | |
"height": 720, | |
"ppx": 0, | |
"ppy": 0, | |
"width": 720 | |
}, | |
"debug": false, | |
"dump": false, | |
"format": "", | |
"height": 0, | |
"hide": false, | |
"path": "/dev/video0", | |
"path_pattern": "", | |
"source": "", | |
"stream": "", | |
"width": 0 | |
}, | |
"depends_on": [], | |
"model": "webcam", | |
"name": "camera", | |
"service_config": [ | |
{ | |
"attributes": { | |
"capture_methods": [] | |
}, | |
"type": "data_manager" | |
} | |
], | |
"type": "camera" | |
}, | |
{ | |
"attributes": { | |
"board": "local", | |
"i2c": "bus1" | |
}, | |
"depends_on": [ | |
"local" | |
], | |
"model": "yahboom-dofbot", | |
"name": "myarm", | |
"service_config": [], | |
"type": "arm", | |
"frame": { | |
"parent": "world" | |
} | |
}, | |
{ | |
"attributes": { | |
"arm": "myarm" | |
}, | |
"depends_on": [ | |
"myarm" | |
], | |
"model": "yahboom-dofbot", | |
"name": "mygripper", | |
"service_config": [], | |
"type": "gripper" | |
}, | |
{ | |
"attributes": { | |
"auto_reconnect": true | |
}, | |
"depends_on": [], | |
"model": "gamepad", | |
"name": "8bit-do-controller", | |
"service_config": [], | |
"type": "input_controller" | |
} | |
], | |
"services": [ | |
{ | |
"attributes": { | |
"base": "yahboom-base", | |
"input_controller": "8bit" | |
}, | |
"name": "yahboom_gamepad_control", | |
"type": "base_remote_control" | |
}, | |
{ | |
"attributes": {}, | |
"name": "motion", | |
"type": "motion" | |
} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment