/remio

🔥 A library for managing concurrent socketio, cv2, and pyserial processes. Useful for making robots or devices with Arduinos and Raspberry Pi. 💻

Primary LanguagePythonApache License 2.0Apache-2.0

REMIO

Documentation    |    License

Code Style Codecov branch PyPi version

Table of Contents

  1. Introduction
  2. Features
  3. Installation
  4. Development
  5. Simplejpeg API
  6. Simple MJPEG Server
  7. Single Camera API (Multithread)
  8. Single Camera API (not Multithread)
  9. Background Processing API
  10. Multiple Cameras API
  11. Multiple Serial API
  12. Examples

Introduction

REMIO is a library for managing concurrent socketio, cv2, and pyserial processes. Useful for making robots or devices with Arduinos and Raspberry Pi. It was born in the context of remote laboratories, hence its name, where I used and developed several prototypes where the code began to redound. That's where I extracted the modules from this library. The hardware architecture that I used to employ was the following:

So I programmed the following architecture

Features

  • Multiple Camera API
  • Multiple Serial API
  • Event-driven programming API for Serial.
  • Event-driven programming API for Cameras.
  • MJPEG streamer with SocketIO

Installation

First you need to create a virtualenv:

python3 -m venv venv

Then you should active it:

source venv/bin/activate

After choose an option for install remio, for example using pip:

# Pypi source
pip install remio

# Github source
pip install "git+https://github.com/Hikki12/remio"

Or if you prefer, clone the repository:

git clone https://github.com/Hikki12/remio

cd remio

pip install .

Development

If you are a devolper, install the library as follows:

pip install -e .

Single Camera API

A Camera instance creates a background loop for reading and processing frames when you call start() method.

import time
from remio import Camera

# Initialize Single Camera device
camera = Camera(name="webcam", src=0, size=[400, 400])
camera.start()

# Also you could use
# camera = Camera(name="webcam", src=0, size=[400, 400]).start()


while True:
    print("Doing some other tasks...")
    time.sleep(2)

Single Camera API not Multithread

But If you prefer you could manage reading and procesing on main thread, for that, you must change start() method by loadDevice().

import time
from remio import Camera

# Initialize Single Camera device
camera = Camera(name="webcam", src=0, size=[400, 400])

# Loads camera device but doesn't start background read loop
camera.loadDevice()

while True:
    # Now the processing ocurrs on the main thread.
    frame = camera.read()
    if frame is not None:
        print("A frame is available!")
    time.sleep(1 /10)

Background Processing API

You could set a processing function which will run on background. Use setProcessing() method for that.

import time
import cv2
from remio import Camera


def processing(frame, *args, **kwargs):
    """Applies some image processing.

    Args:
        frame: a numpy array
    """

    if 'color' in kwargs: # if you pass a param named `color` it will print it
        print(kwargs['color'])

    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    return frame


# Intialize Camera manager
camera = Camera(src=0, size=[600, 400])

# set processing function passing to it some params
camera.setProcessing(processing, color='red')

# Also you could do
# camera.setProcessing(processing)

# Start device(s) read loop on background
camera.start()

# Set a FPS speed to display image(s)
FPS = 20
T = 1 / FPS # Sampling period


while True:

    t0 = time.time()

    frame = camera.read()
    camera.clearFrame()  # to avoid repeated frames, this line it's optional

    if frame is not None:
        cv2.imshow("webcam1", frame)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

    t1 = time.time()

    readtime = t1 - t0 # reading and display time

    # Get a fixed delay value, where sampling period should be T = readtime + delay
    delay = abs(T - readtime)
    time.sleep(delay)


# Close all Windows
cv2.destroyAllWindows()

# Stop all Running devices
camera.stop()

Multiple Cameras API

import time
import cv2
from remio import Cameras


# Define devices
devices = {
    "webcam1": {
        "src": 0,
        "size": [400, 300],
        "fps": None,
        "reconnectDelay": 5,
        "backgroundIsEnabled": True,
        "emitterIsEnabled": False,
    },
    "webcam2": {
        "src": "http://192.168.100.70:3000/video/mjpeg",
        "size": [400, 300],
        "fps": None,
        "reconnectDelay": 5,
        "backgroundIsEnabled": True,
        "emitterIsEnabled": False,
    },
}

# Intialize Serial manager
camera = Cameras(devices=devices)

# Start device(s) connection on background
camera.startAll()

# Set a FPS speed to display image(s)
FPS = 20
T = 1 / FPS

while True:

    t0 = time.time()

    webcam1, webcam2 = camera.read(asDict=False)
    camera.clearAllFrames()  # to avoid repeated frames

    if webcam1 is not None:
        cv2.imshow("webcam1", webcam1)

    if webcam2 is not None:
        cv2.imshow("webcam2", webcam2)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

    t1 = time.time()

    # Get a fixed delay value (t1 - t0) + delay = T
    delay = abs(T - (t1 - t0))
    time.sleep(delay)


# Close all Windows
cv2.destroyAllWindows()

# Stop all Running devices
camera.stopAll()

Multiple Serial API

"""Multiple serial devices management."""
import time
from remio import Serials


# Define devices
devices = {
    "arduino1": {
        "port": "/dev/cu.usbserial-1440",
        "baudrate": 9600,
        "emitterIsEnabled": True,  # Enable on/emit callbacks
        "reconnectDelay": 5,
    },
    "arduino2": {
        "port": "COM2",
        "baudrate": 9600,
        "emitterIsEnabled": True,
        "reconnectDelay": 5,
    },
}

# Intialize Serial manager
serial = Serials(devices=devices)

# Configure callbacks
serial.on("connection", lambda status: print(f"serial connected: {status}"))

# Start device(s) connection on background
serial.startAll()


while True:
    print("Doing some tasks...")
    time.sleep(1)

Simplejpeg API

REMIO uses simplejpeg library for encode camera images. You could used its API as follows:

import time
from remio import Camera

# Initialize camera device
camera = Camera(src=0, fps=15, size=[800, 600], flipX=True)

while True:
    jpeg = camera.jpeg()
    time.sleep(1/10)

Simple MJPEG Server

You could server your camera image with the MJPEG server, with a few lines:

"""A simple MJPEG."""
from remio import Camera, MJPEGServer


encoderParams = {
    "quality": 90,
    "colorspace": "bgr",
    "colorsubsampling": "422",
    "fastdct": True,
}


# Initialize camera device
camera = Camera(src=0, fps=15, size=[800, 600], flipX=True, encoderParams=encoderParams)

# Configure MJPEG Server
server = MJPEGServer(
    camera=camera, ip="0.0.0.0", port=8080, endpoint="/video/mjpeg", fps=15
)

try:
    server.run(display_url=True, start_camera=True)
except KeyboardInterrupt:
    server.stop(stop_camera=True)
# The video must be accessible through the generated link
>> MJPEG server running on http://0.0.0.0:8080/video/mjpeg

Examples

You could see more examples here.

Resources

Copyright

Copyright © hikki12 2022
This library is released under the Apache 2.0 License.