Note 1: See my write-up on Home Security Automation which provides an architectural overview of how this project works with others in my collection.
Note 2: While I use the word Automation in these projects, there is no integration with sensible frameworks like openHAB or Home Assistant... not yet at least. The goal behind this project was a learning opportunity by employing some specific technologies and my opinion on design. The parts you'll most likely find useful are touch-points with third-party libraries like Flask, ZeroMQ and RabbitMQ, because seamless behavior comes after much trial and error.
This project hosts a basic event hub for my various automation applications. Others can be found in my Snapshot Processor and Remote Monitor projects. It is designed with resource constraints in mind and so can run with no issues on a Raspberry Pi.
The application is responsible for the following functions:
- Collects heartbeat messages from devices on a configured RabbitMQ exchange to automatically register the device as an event input or output. A device can advertise both input and output devices on the same event source.
- Shows available output devices on a Web UI dashboard. Devices can be enabled (i.e. will trigger configured outputs) via either a button on the web page, or a configured schedule. The Web UI dashboard can be configured for reachability across either a free-tier ngrok tunnel or via Tailscale if your network already has this set up.
- Shows available input-to-output linking options on the Web UI. Any known input/output combination can be created meaning that for a given set of inputs, a list of possible outputs is possible. Configuration is saved to a local SQLite database, the schema for which can be found here. Database backups are created periodically on a cron job and are written to Amazon S3 with this script.
- Processes input messages and directs output triggers according to the configuration.
- A built-in sms output device type exists in the form of a Telegram bot. Optional fallback via SNS is supported.
- Supports MQTT messages with mDash device discovery.
- A crude audit trail exists in the local database.
- Updates from meter device types are written to a configured InfluxDB bucket.
This application extends my own boilerplate application hosted in docker hub and takes its own git submodule dependency on my own package.
This application has grown organically over a number of years, and so a fair amount of code has been factored out. The diagrams below show both the class inheritance structure. Here is the relationship between this project and my pylib submodule. For brevity, not everything is included such as the SQLAlchemy data-access-object classes, but those are mostly self-explanatory. These are the non-obvious relationships.
EventProcessor
is responsible for the main application loop and inherits RabbitMQ message handling fromMQConnection
.MQConnection
is responsible for connection to the RabbitMQ exchange and does channel management, error handing and shutdown. It inheritsAppThread
for thread tracking andClosable
to track and shutdown all ZeroMQ sockets.TBot
: Represents a Telegram bot wrapper that conforms to theirasyncio
paradigm.
See the diagram below for an example about how ZeroMQ is used as a message relay between threads.
HeartbeatFilter
: This is one of a variety of uses of ZeroMQ for internal message passing between threads in my applications. My post goes into more detail. The filter is used as a relay for incoming devices messages: A message arrives from an MQTT or RabbitMQ event source, and then is forwarded to the filter to update its inventory of device-specific hearbeats. Thereafter, the message is forwarded to theEventProcessor
instance for normal processing.
Technologies that help make this project useful:
Also:
Here is some detail about the intended use of this project.
Beyond the Python dependencies defined in the Poetry configuration, the project carries hardcoded dependencies on Sentry and 1Password in order to function.
Install these tools and make sure that they are on the environment $PATH
.
-
task
for project build orchestration: https://taskfile.dev/installation/#install-script -
docker
anddocker-compose
for container builds and execution: https://docs.docker.com/engine/install/ -
mvn
Maven for Java build orchestration: https://maven.apache.org/download.cgi -
poetry
for Python dependency management: https://python-poetry.org/docs/#installation -
java
andjavac
for Java build and runtime: https://aws.amazon.com/corretto/ -
python
ispython3
for Python runtime: https://www.python.org/downloa
- 🛑 This project uses 1Password Secrets Automation to store both application key-value pairs as well as runtime secrets. It is assumed that the connect server containers are already running on your environment. If you do not want to use this, then you'll need to fork this package and make the changes as appropriate. It's actually very easy to set up, but note that 1Password is a paid product with a free-tier for secrets automation. Here is an example of how this looks for my application and the generation of the docker-compose.yml relies on this step. Your secrets automation vault must contain an entry called
ENV.event-processor
with these keys:
Variable | Description | Example |
---|---|---|
APP_FLASK_DEBUG |
Web server debug | false |
APP_FLASK_HTTP_PORT |
Web server port | 8080 |
APP_NAME |
Application name used in logging and metrics | event-processor |
AWS_CONFIG_FILE |
AWS client configuration file | /home/app/.aws/config |
AWS_DEFAULT_REGION |
AWS region | us-east-1 |
BACKUP_S3_BUCKET |
Bucket name for database backup | project specific |
CRONITOR_MONITOR_KEY |
Cronitor configuration key | project specific |
DEVICE_NAME |
Used for container host name. | event-processor-a |
HC_PING_URL |
Healthchecks URL | project specific |
HEALTHCHECKS_BADGE_CSV |
Healthchecks badge | project specific |
INFLUXDB_BUCKET |
InfluxDB bucket for meter metrics | meter |
LEADER_ELECTION_ENABLED |
Use leader election for other instances of this application | false |
MDASH_API_BASE_URL |
mDash discovery API | https://mdash.net/api/v2/devices |
MDASH_APP_CONFIG_MQTT_PUB_TOPIC |
Application publish topic | app.mqtt_pub_topic |
MDASH_DEVICE_TAGS_CSV |
Only register devices with these mDash tags | meter,sensor |
MQTT_METER_RESET_TOPIC |
Topic to control reset of meter register | meter/electricity/control |
MQTT_PUB_TOPIC_CSV |
MQTT publication topics | meter/electricity/#,sensor/garage/# |
MQTT_SERVER_ADDRESS |
IP address of MQTT broker | network specific |
NGROK_CLIENT_API_PORT |
ngrok management port | 4040 |
NGROK_ENABLED |
Create ngrok tunnel with container | true |
NGROK_TUNNEL_NAME |
Tunnel name in configuration | frontend |
OP_CONNECT_HOST |
1Password connect server URL | network specific |
OP_CONNECT_TOKEN |
1Password connect server token | project specific |
OP_VAULT |
1Password vault | project specific |
OUTPUT_TYPE_BLUETOOTH |
Output type representing Bluetooth L2 ping | l2ping |
OUTPUT_TYPE_SNAPSHOT |
Output type for snapshots | Camera |
OUTPUT_TYPE_SWITCH |
Output types for switches | switch,Buzzer |
OUTPUT_TYPE_TTS |
Text-to-speech output type | TTS |
RABBITMQ_EXCHANGE |
Name of RabbitMQ exchange | home_automation |
RABBITMQ_SERVER_ADDRESS |
IP address of RabbitMQ exchange | network specific |
SNS_CONTROL_ENABLED |
Enable control messages from SQS | false |
SQS_QUEUE |
SQS queue name | automation-control |
TABLESPACE_PATH |
SQLite database for configuration | /data/event-processor.db |
TELEGRAM_CHAT_ROOM |
Telegram chat room ID | project specific |
TELEGRAM_IMAGE_SEND_ONLY_WITH_PEOPLE |
Send images only to humans | true |
TELEGRAM_SMS_FALLBACK_ENABLED |
Fall back to AWS SNS (SMS) | false |
TELEGRAM_USERS_CSV |
Permitted Telegram users to interact with bot (CSV) | project specific |
USER_TZ |
Set to override from pytz.all_timezones if not UTC. |
project specific |
With these configured, you are now able to build the application. Any variables referenced in the application configuration will be automatically replaced.
In addition to this, additional runtime configuration is used by the application, and also need to be contained within the secrets vault. With these configured, you are now able to run the application.
- Clone the repo
git clone https://github.com/tailucas/event-processor.git
- Verify that the git submodule is present.
git submodule init git submodule update
- Make the Docker runtime user and set directory permissions. ✋ Be sure to first review the Makefile contents for assumptions around user IDs for Docker.
task user
- Now generate the docker-compose.yml:
task setup
- And generate the Docker image:
task build
- If successful and the local environment is running the 1Password connect containers, run the application. For foreground:
For background:
task run
task rund
Running the application will:
- Start the RabbitMQ client.
- Start the broker client.
- Start the auto-scheduler for devices that enable/disable on a desired schedule.
- Start the main application loop.
- Start discovery of MQTT event sources from mDash.
- Start the
asyncio
Telegram bot. - Start an instance of
pylib.threads.thread_nanny
which will notice and report on any thread death and will also help move the application to debug logging after a prolonged shutdown. Shutdown delay is normally as a result of failure to properly close all ZMQ sockets. - Start the Flask web server.
Suggested Java VM arguments to enable JMX profiling:
-Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.host=127.0.0.1 -Dcom.sun.management.jmxremote.port=3333 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
Distributed under the MIT License. See LICENSE for more information.