/pycluon

A python wrapper around libcluon

Primary LanguagePythonApache License 2.0Apache-2.0

pycluon

A python wrapper around libcluon. pycluon aims to follow the API of libcluon as closely as possible to avoid surprises.

So far, pycluon wraps the following concepts from libcluon:

  • Envelope
  • OD4Session
  • UDPSender
  • UDPReceiver
  • TCPConnection
  • TCPServer
  • SharedMemory

It also bundles the following command-line applications:

  • protoc
  • cluon-msc
  • cluon-OD4toStdout
  • cluon-OD4toJSON
  • cluon-LCMtoJSON
  • cluon-filter
  • cluon-livefeed
  • cluon-rec2csv
  • cluon-replay

Versioning

pycluon version libcluon version
0.1.0 0.0.140
0.1.1 0.0.140
0.1.2 0.0.140
0.1.3 0.0.140
0.2.0 0.0.140

Installation

pycluon is available on PyPI:

pip install pycluon

Examples

Import an odvd specification into a python module

from pycluon.importer import import_odvd

my_module = import_odvd("/path/to/my/odvd/specification.odvd")

Send an envelope

import time
from pycluon import OD4Session, Envelope

session = OD4Session(111)

message = my_module.MyMessage()

envelope = Envelope()
envelope.sent = envelope.sampled = time.time()
envelope.serialized_data = message.SerializeToString()
envelope.data_type = 13
envelope.sender_stamp = 13

session.send(envelope)

Receive an envelope

import time
from pycluon import OD4Session

session = OD4Session(111)

def callback(envelope):
    message = my_module.MyMessage()
    message.ParseFromString(envelope.serialized_data)
    print(f"Received at {envelope.received} seconds since epoch")

session.add_data_trigger(13, callback)

while session.is_running():
    time.sleep(0.01)

Write to a shared memory area

from datetime import datetime
from pycluon import SharedMemory

sm = SharedMemory("frame.argb", 640*480)

sm.lock()
sm.timestamp = datetime.now()
sm.data = b"<bytes>"
sm.unlock()
sm.notify_all()

Read from an existing shared memory area

from pycluon import SharedMemory

sm = SharedMemory("frame.argb")

sm.wait() # Wait for notification from writing process
sm.lock()
print(sm.timestamp)
print(sm.data)
sm.unlock()

See the tests for usage of UDPSender, UDPReceiver, TCPConnection and TCPServer.