/event-processor

Primary LanguageJavaMIT LicenseMIT

Contributors Forks Stargazers Issues MIT License

About The Project

Overview

Note 1: See my write-up on Home Security Automation which provides an architectural overview of how this project works with others in my collection.

Note 2: While I use the word Automation in these projects, there is no integration with sensible frameworks like openHAB or Home Assistant... not yet at least. The goal behind this project was a learning opportunity by employing some specific technologies and my opinion on design. The parts you'll most likely find useful are touch-points with third-party libraries like Flask, ZeroMQ and RabbitMQ, because seamless behavior comes after much trial and error.

This project hosts a basic event hub for my various automation applications. Others can be found in my Snapshot Processor and Remote Monitor projects. It is designed with resource constraints in mind and so can run with no issues on a Raspberry Pi.

The application is responsible for the following functions:

  • Collects heartbeat messages from devices on a configured RabbitMQ exchange to automatically register the device as an event input or output. A device can advertise both input and output devices on the same event source.
  • Shows available output devices on a Web UI dashboard. Devices can be enabled (i.e. will trigger configured outputs) via either a button on the web page, or a configured schedule. The Web UI dashboard can be configured for reachability across either a free-tier ngrok tunnel or via Tailscale if your network already has this set up.
  • Shows available input-to-output linking options on the Web UI. Any known input/output combination can be created meaning that for a given set of inputs, a list of possible outputs is possible. Configuration is saved to a local SQLite database, the schema for which can be found here. Database backups are created periodically on a cron job and are written to Amazon S3 with this script.
  • Processes input messages and directs output triggers according to the configuration.
  • A built-in sms output device type exists in the form of a Telegram bot. Optional fallback via SNS is supported.
  • Supports MQTT messages with mDash device discovery.
  • A crude audit trail exists in the local database.
  • Updates from meter device types are written to a configured InfluxDB bucket.

This application extends my own boilerplate application hosted in docker hub and takes its own git submodule dependency on my own package.

Package Structure

This application has grown organically over a number of years, and so a fair amount of code has been factored out. The diagrams below show both the class inheritance structure. Here is the relationship between this project and my pylib submodule. For brevity, not everything is included such as the SQLAlchemy data-access-object classes, but those are mostly self-explanatory. These are the non-obvious relationships.

classes

  • EventProcessor is responsible for the main application loop and inherits RabbitMQ message handling from MQConnection.
  • MQConnection is responsible for connection to the RabbitMQ exchange and does channel management, error handing and shutdown. It inherits AppThread for thread tracking and Closable to track and shutdown all ZeroMQ sockets.
  • TBot: Represents a Telegram bot wrapper that conforms to their asyncio paradigm.

See the diagram below for an example about how ZeroMQ is used as a message relay between threads.

comms

  • HeartbeatFilter: This is one of a variety of uses of ZeroMQ for internal message passing between threads in my applications. My post goes into more detail. The filter is used as a relay for incoming devices messages: A message arrives from an MQTT or RabbitMQ event source, and then is forwarded to the filter to update its inventory of device-specific hearbeats. Thereafter, the message is forwarded to the EventProcessor instance for normal processing.

(back to top)

Built With

Technologies that help make this project useful:

1Password AWS Bootstrap Font Awesome Docker InfluxDB ngrok MQTT RabbitMQ Poetry Python Flask Sentry SQLite Telegram ZeroMQ

Also:

(back to top)

Getting Started

Here is some detail about the intended use of this project.

Prerequisites

Beyond the Python dependencies defined in the Poetry configuration, the project carries hardcoded dependencies on Sentry and 1Password in order to function.

Required Tools

Install these tools and make sure that they are on the environment $PATH.

Installation

  1. 🛑 This project uses 1Password Secrets Automation to store both application key-value pairs as well as runtime secrets. It is assumed that the connect server containers are already running on your environment. If you do not want to use this, then you'll need to fork this package and make the changes as appropriate. It's actually very easy to set up, but note that 1Password is a paid product with a free-tier for secrets automation. Here is an example of how this looks for my application and the generation of the docker-compose.yml relies on this step. Your secrets automation vault must contain an entry called ENV.event-processor with these keys:
Variable Description Example
APP_FLASK_DEBUG Web server debug false
APP_FLASK_HTTP_PORT Web server port 8080
APP_NAME Application name used in logging and metrics event-processor
AWS_CONFIG_FILE AWS client configuration file /home/app/.aws/config
AWS_DEFAULT_REGION AWS region us-east-1
BACKUP_S3_BUCKET Bucket name for database backup project specific
CRONITOR_MONITOR_KEY Cronitor configuration key project specific
DEVICE_NAME Used for container host name. event-processor-a
HC_PING_URL Healthchecks URL project specific
HEALTHCHECKS_BADGE_CSV Healthchecks badge project specific
INFLUXDB_BUCKET InfluxDB bucket for meter metrics meter
LEADER_ELECTION_ENABLED Use leader election for other instances of this application false
MDASH_API_BASE_URL mDash discovery API https://mdash.net/api/v2/devices
MDASH_APP_CONFIG_MQTT_PUB_TOPIC Application publish topic app.mqtt_pub_topic
MDASH_DEVICE_TAGS_CSV Only register devices with these mDash tags meter,sensor
MQTT_METER_RESET_TOPIC Topic to control reset of meter register meter/electricity/control
MQTT_PUB_TOPIC_CSV MQTT publication topics meter/electricity/#,sensor/garage/#
MQTT_SERVER_ADDRESS IP address of MQTT broker network specific
NGROK_CLIENT_API_PORT ngrok management port 4040
NGROK_ENABLED Create ngrok tunnel with container true
NGROK_TUNNEL_NAME Tunnel name in configuration frontend
OP_CONNECT_HOST 1Password connect server URL network specific
OP_CONNECT_TOKEN 1Password connect server token project specific
OP_VAULT 1Password vault project specific
OUTPUT_TYPE_BLUETOOTH Output type representing Bluetooth L2 ping l2ping
OUTPUT_TYPE_SNAPSHOT Output type for snapshots Camera
OUTPUT_TYPE_SWITCH Output types for switches switch,Buzzer
OUTPUT_TYPE_TTS Text-to-speech output type TTS
RABBITMQ_EXCHANGE Name of RabbitMQ exchange home_automation
RABBITMQ_SERVER_ADDRESS IP address of RabbitMQ exchange network specific
SNS_CONTROL_ENABLED Enable control messages from SQS false
SQS_QUEUE SQS queue name automation-control
TABLESPACE_PATH SQLite database for configuration /data/event-processor.db
TELEGRAM_CHAT_ROOM Telegram chat room ID project specific
TELEGRAM_IMAGE_SEND_ONLY_WITH_PEOPLE Send images only to humans true
TELEGRAM_SMS_FALLBACK_ENABLED Fall back to AWS SNS (SMS) false
TELEGRAM_USERS_CSV Permitted Telegram users to interact with bot (CSV) project specific
USER_TZ Set to override from pytz.all_timezones if not UTC. project specific

With these configured, you are now able to build the application. Any variables referenced in the application configuration will be automatically replaced.

In addition to this, additional runtime configuration is used by the application, and also need to be contained within the secrets vault. With these configured, you are now able to run the application.

  1. Clone the repo
    git clone https://github.com/tailucas/event-processor.git
  2. Verify that the git submodule is present.
    git submodule init
    git submodule update
  3. Make the Docker runtime user and set directory permissions. ✋ Be sure to first review the Makefile contents for assumptions around user IDs for Docker.
    task user
  4. Now generate the docker-compose.yml:
    task setup
  5. And generate the Docker image:
    task build
  6. If successful and the local environment is running the 1Password connect containers, run the application. For foreground:
    task run
    For background:
    task rund

(back to top)

Usage

Running the application will:

  • Start the RabbitMQ client.
  • Start the broker client.
  • Start the auto-scheduler for devices that enable/disable on a desired schedule.
  • Start the main application loop.
  • Start discovery of MQTT event sources from mDash.
  • Start the asyncio Telegram bot.
  • Start an instance of pylib.threads.thread_nanny which will notice and report on any thread death and will also help move the application to debug logging after a prolonged shutdown. Shutdown delay is normally as a result of failure to properly close all ZMQ sockets.
  • Start the Flask web server.

(back to top)

Java Configuration

Suggested Java VM arguments to enable JMX profiling:

-Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.host=127.0.0.1 -Dcom.sun.management.jmxremote.port=3333 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false

(back to top)

License

Distributed under the MIT License. See LICENSE for more information.

(back to top)

Acknowledgments

(back to top)

Hits