/microblocks_messaging_library

MicroBlocks and Python Communication with Messages.

Primary LanguagePythonOtherNOASSERTION

readme

MicroBlocks and Python Communication with Messages.

Install

# Python3
python -m pip install microblocks

Usage

MicroBlocks demo

wireless programming (BLE)

import time
from microblocks import Message

m = Message()
# found_devices = m.discover() # Discover MicroBlocks devices
m.connect('MicroBlocks KCY') # replace the string with the device name

# broadcast message from Python to MicroBlocks
m.send('happy')
time.sleep(1)
m.send('sad')

# receive broadcasts from MicroBlocks
while True:
    message = m.receive()
    if message:
        print(message)

# receive(non-blocking) broadcasts from MicroBlocks. Work with m.on_message = <a function>
# m.loopStart()

serial

import time
from microblocks import SerialMessage

m = SerialMessage()
m.connect('/dev/tty.usbmodem1402') # replace the string with micro:bit port

# broadcast message from Python to MicroBlocks
m.send('happy')
time.sleep(1)
m.send('sad')

# receive broadcasts from MicroBlocks
while True:
    message = m.receive()
    if message:
        print(message)

# receive(non-blocking) broadcasts from MicroBlocks. Work with m.on_message = <a function>
# m.loopStart()

Work with the MicroBlocks code (you can save this PNG file, then drag it into MicroBlocks to load the scripts):

message flow diagram

MicroBlocks interoperability

MicroBlocksClient

Save this project(same as Snap!) to your MicroBlocks device

# pip install -U microblocks
from microblocks import MicroblocksClient

client = MicroblocksClient()
found_devices = client.discover(timeout=3) # timeout=5 in windows
print("found devices:", found_devices)
client.connect("MicroBlocks AZB", timeout=3)

client.display_character("f")

while True:
    tiltX = client.tiltX
    print(tiltX)

ref: MicroBlocks Client

Connect multiple devices

from microblocks import MicroblocksClient

client = MicroblocksClient()
found_devices = client.discover(timeout=3) # timeout=5 in windows
print("found devices:", found_devices)
# connect all discovered devices
devices = [MicroblocksClient(i) for i in found_devices]

for index, device in enumerate(devices):
    device.display_character(index)

Parallel Control

from concurrent.futures import ThreadPoolExecutor

# Launching parallel tasks: https://docs.python.org/3/library/concurrent.futures.html
with ThreadPoolExecutor() as executor:
    for client in devices:
        executor.submit(client.scroll_text, "hello")

demo

ref: notebooks