- python 3.7+ implementation of the misty II REST API
- use python 3.7
- should be able to install with just
pip3 install git+https://github.com/acushner-xaxis/misty_py.git
- run
jupyter console
, import the api, and then, from there you can do things likehelp(MistyAPI)
- additionally the nice thing is that jupyter's already in an event loop, so you can do things like
await api.movement.move_head(0, 0, 0)
directly in the console
- additionally the nice thing is that jupyter's already in an event loop, so you can do things like
Alternately, you can use the install shell script (if you are using Windows, you'll need WSL):
Make sure Python 3.7 is installed.
curl https://raw.githubusercontent.com/acushner-xaxis/misty_py/master/install.sh | bash
Or check out this repository:
git clone https://github.com/acushner-xaxis/misty_py.git
cd misty_py
./install.sh
Then you can use the environment with:
source .virtualenv/bin/activate
Or just start Jupyter Notebook with:
./run-jupyter
- first, export
MISTY_IP=<your_misty_ip>
in your env
a simple face training example:
import asyncio
from misty_py.api import MistyAPI
from misty_py.utils.color import RGB
api = MistyAPI()
await asyncio.gather(
api.images.set_led(RGB(0, 255, 0)),
api.faces.wait_for_training('name')
)
await api.images.set_led()
subscriptions are more complicated than normal rest calls
there are a few ways to subscribe:
- via
SubType
- higher-level subscription types (e.g.SubType.face_recognition
) - via
LLSubType
- lower-level subscription types (e.g.Actuator.yaw
) - via
Sub
- combination ofSubType
andEventCondition
s
each time you subscribe you get a SubId
object which can be used to interact with a given subscription
here are some examples:
from misty_py.subscriptions import SubPayload, SubType, Bump
from misty_py.api import MistyAPI
api = MistyAPI()
async def debug_handler(sp: SubPayload):
print(sp)
# subscribe to one single higher-level subscription
# (get a self_state message every 2000 ms)
await api.ws.subscribe(SubType.self_state, debug_handler, 2000)
# subscribe to a single lower-level subscription
await api.ws.subscribe(Bump.front_right, debug_handler)
# subscribe via a higher-level `SubType` to all its lower-level `LLSubType`
# this will generate 4 separate subscriptions, one for each of `LLSubType`: `Bump`
# - all will share the same handler
await api.ws.subscribe(SubType.bump_sensor, debug_handler, 2000)
there are 3 ways to unsubscribe:
- via the
SubId
you got when you created your subscription - via higher-level
SubType
- via
event_name
, which will generally only need to be used in debugging
await api.ws.unsubscribe(SubType.self_state)
- object that combines a callback with an event.
- when the callback returns a truthy value, the event will be set.
- useful for figuring out when something's done
- works great as a handler for subscribing to messages
# example of waiting for an audio file with a certain name to complete playing
async def _handle_audio_complete(self, name):
"""subscribe and wait for an audio complete event"""
async def _wait_name(sp: SubPayload):
return sp.data.message.metaData.name == name
event = EventCallback(_wait_name)
try:
async with self.api.ws.sub_unsub(SubType.audio_play_complete, event):
await event
except asyncio.CancelledError:
event.set()