This repository is a Gundi Action Runner implementation for integration with Savannah Tracking.
- Fork this repo
- Implement your own actions in
actions/handlers.py
- Define configurations needed for your actions in
action/configurations.py
- Or implement a webhooks handler in
webhooks/handlers.py
- and define configurations needed for your webhooks in
webhooks/configurations.py
- Optionally, add the
@activity_logger()
decorator in actions to log common events which you can later see in the portal:- Action execution started
- Action execution complete
- Error occurred during action execution
- Optionally, add the
@webhook_activity_logger()
decorator in the webhook handler to log common events which you can later see in the portal:- Webhook execution started
- Webhook execution complete
- Error occurred during webhook execution
- Optionally, use
log_action_activity()
orlog_webhook_activity()
to log custom messages which you can later see in the portal
# actions/configurations.py
from .core import PullActionConfiguration
class PullObservationsConfiguration(PullActionConfiguration):
lookback_days: int = 10
# actions/handlers.py
from app.services.activity_logger import activity_logger, log_activity
from app.services.gundi import send_observations_to_gundi
from gundi_core.events import LogLevel
from .configurations import PullObservationsConfiguration
@activity_logger()
async def action_pull_observations(integration, action_config: PullObservationsConfiguration):
# Add your business logic to extract data here...
# Optionally, log a custom messages to be shown in the portal
await log_activity(
integration_id=integration.id,
action_id="pull_observations",
level=LogLevel.INFO,
title="Extracting observations with filter..",
data={"start_date": "2024-01-01", "end_date": "2024-01-31"},
config_data=action_config.dict()
)
# Normalize the extracted data into a list of observations following to the Gundi schema:
observations = [
{
"source": "collar-xy123",
"type": "tracking-device",
"subject_type": "puma",
"recorded_at": "2024-01-24 09:03:00-0300",
"location": {
"lat": -51.748,
"lon": -72.720
},
"additional": {
"speed_kmph": 10
}
}
]
# Send the extracted data to Gundi
await send_observations_to_gundi(observations=observations, integration_id=integration.id)
# The result will be recorded in the portal if using the activity_logger decorator
return {"observations_extracted": 10}
This framework provides a way to handle incoming webhooks from external services. You can define a handler function in webhooks/handlers.py
and define the expected payload schema and configurations in webhooks/configurations.py
. Several base classes are provided in webhooks/core.py
to help you define the expected schema and configurations.
If you expect to receive data with a fixed schema, you can define a Pydantic model for the payload and configurations. These models will be used for validating and parsing the incoming data.
# webhooks/configurations.py
import pydantic
from .core import WebhookPayload, WebhookConfiguration
class MyWebhookPayload(WebhookPayload):
device_id: str
timestamp: str
lat: float
lon: float
speed_kmph: float
class MyWebhookConfig(WebhookConfiguration):
custom_setting: str
another_custom_setting: bool
Your webhook handler function must be named webhook_handler and it must accept the payload and config as arguments. The payload will be validated and parsed using the annotated Pydantic model. The config will be validated and parsed using the annotated Pydantic model. You can then implement your business logic to extract the data and send it to Gundi.
# webhooks/handlers.py
from app.services.activity_logger import webhook_activity_logger
from app.services.gundi import send_observations_to_gundi
from .configurations import MyWebhookPayload, MyWebhookConfig
@webhook_activity_logger()
async def webhook_handler(payload: MyWebhookPayload, integration=None, webhook_config: MyWebhookConfig = None):
# Implement your custom logic to process the payload here...
# If the request is related to an integration, you can use the integration object to access the integration's data
# Normalize the extracted data into a list of observations following to the Gundi schema:
transformed_data = [
{
"source": payload.device_id,
"type": "tracking-device",
"recorded_at": payload.timestamp,
"location": {
"lat": payload.lat,
"lon": payload.lon
},
"additional": {
"speed_kmph": payload.speed_kmph
}
}
]
await send_observations_to_gundi(
observations=transformed_data,
integration_id=integration.id
)
return {"observations_extracted": 1}
If you expect to receive data with different schemas, you can define a schema per integration using JSON schema. To do that, annotate the payload arg with the GenericJsonPayload
model, and annotate the webhook_config arg with the DynamicSchemaConfig
model or a subclass. Then you can define the schema in the Gundi portal, and the framework will build the Pydantic model on runtime based on that schema, to validate and parse the incoming data.
# webhooks/configurations.py
import pydantic
from .core import DynamicSchemaConfig
class MyWebhookConfig(DynamicSchemaConfig):
custom_setting: str
another_custom_setting: bool
# webhooks/handlers.py
from app.services.activity_logger import webhook_activity_logger
from .core import GenericJsonPayload
from .configurations import MyWebhookConfig
@webhook_activity_logger()
async def webhook_handler(payload: GenericJsonPayload, integration=None, webhook_config: MyWebhookConfig = None):
# Implement your custom logic to process the payload here...
return {"observations_extracted": 1}
For simple JSON to JSON transformations, you can use the JQ language to transform the incoming data. To do that, annotate the webhook_config arg with the GenericJsonTransformConfig
model or a subclass. Then you can specify the jq_filter
and the output_type
(ev
for event or obv
for observation) in Gundi.
# webhooks/configurations.py
import pydantic
from .core import WebhookPayload, GenericJsonTransformConfig
class MyWebhookPayload(WebhookPayload):
device_id: str
timestamp: str
lat: float
lon: float
speed_kmph: float
class MyWebhookConfig(GenericJsonTransformConfig):
custom_setting: str
another_custom_setting: bool
# webhooks/handlers.py
import json
import pyjq
from app.services.activity_logger import webhook_activity_logger
from app.services.gundi import send_observations_to_gundi
from .configurations import MyWebhookPayload, MyWebhookConfig
@webhook_activity_logger()
async def webhook_handler(payload: MyWebhookPayload, integration=None, webhook_config: MyWebhookConfig = None):
# Sample implementation using the JQ language to transform the incoming data
input_data = json.loads(payload.json())
transformation_rules = webhook_config.jq_filter
transformed_data = pyjq.all(transformation_rules, input_data)
print(f"Transformed Data:\n: {transformed_data}")
# webhook_config.output_type == "obv":
response = await send_observations_to_gundi(
observations=transformed_data,
integration_id=integration.id
)
data_points_qty = len(transformed_data) if isinstance(transformed_data, list) else 1
print(f"{data_points_qty} data point(s) sent to Gundi.")
return {"data_points_qty": data_points_qty}
You can combine the dynamic schema and JSON transformations by annotating the payload arg with the GenericJsonPayload
model, and annotating the webhook_config arg with the GenericJsonTransformConfig
models or their subclasses. Then you can define the schema and the JQ filter in the Gundi portal, and the framework will build the Pydantic model on runtime based on that schema, to validate and parse the incoming data, and apply a JQ filter to transform the data.
# webhooks/handlers.py
import json
import pyjq
from app.services.activity_logger import webhook_activity_logger
from app.services.gundi import send_observations_to_gundi
from .core import GenericJsonPayload, GenericJsonTransformConfig
@webhook_activity_logger()
async def webhook_handler(payload: GenericJsonPayload, integration=None, webhook_config: GenericJsonTransformConfig = None):
# Sample implementation using the JQ language to transform the incoming data
input_data = json.loads(payload.json())
filter_expression = webhook_config.jq_filter.replace("\n", ""). replace(" ", "")
transformed_data = pyjq.all(filter_expression, input_data)
print(f"Transformed Data:\n: {transformed_data}")
# webhook_config.output_type == "obv":
response = await send_observations_to_gundi(
observations=transformed_data,
integration_id=integration.id
)
data_points_qty = len(transformed_data) if isinstance(transformed_data, list) else 1
print(f"{data_points_qty} data point(s) sent to Gundi.")
return {"data_points_qty": data_points_qty}
If you expect to receive payloads containing binary data encoded as hex strings (e.g. ), you can use StructHexString, HexStringPayload and HexStringConfig which facilitate validation and parsing of hex strings. The user will define the name of the field containing the hex string and will define the structure of the data in the hex string, using Gundi. The fields are defined in the hex_format attribute of the configuration, following the struct module format string syntax. The fields will be extracted from the hex string and made available as sub-fields in the data field of the payload. THey will be extracted in the order they are defined in the hex_format attribute.
# webhooks/configurations.py
from app.services.utils import StructHexString
from .core import HexStringConfig, WebhookConfiguration
# Expected data: {"device": "BF170A","data": "6881631900003c20020000c3", "time": "1638201313", "type": "bove"}
class MyWebhookPayload(HexStringPayload, WebhookPayload):
device: str
time: str
type: str
data: StructHexString
class MyWebhookConfig(HexStringConfig, WebhookConfiguration):
custom_setting: str
another_custom_setting: bool
"""
Sample configuration in Gundi:
{
"hex_data_field": "data",
"hex_format": {
"byte_order": ">",
"fields": [
{
"name": "start_bit",
"format": "B",
"output_type": "int"
},
{
"name": "v",
"format": "I"
},
{
"name": "interval",
"format": "H",
"output_type": "int"
},
{
"name": "meter_state_1",
"format": "B"
},
{
"name": "meter_state_2",
"format": "B",
"bit_fields": [
{
"name": "meter_batter_alarm",
"end_bit": 0,
"start_bit": 0,
"output_type": "bool"
},
{
"name": "empty_pipe_alarm",
"end_bit": 1,
"start_bit": 1,
"output_type": "bool"
},
{
"name": "reverse_flow_alarm",
"end_bit": 2,
"start_bit": 2,
"output_type": "bool"
},
{
"name": "over_range_alarm",
"end_bit": 3,
"start_bit": 3,
"output_type": "bool"
},
{
"name": "temp_alarm",
"end_bit": 4,
"start_bit": 4,
"output_type": "bool"
},
{
"name": "ee_error",
"end_bit": 5,
"start_bit": 5,
"output_type": "bool"
},
{
"name": "transduce_in_error",
"end_bit": 6,
"start_bit": 6,
"output_type": "bool"
},
{
"name": "transduce_out_error",
"end_bit": 7,
"start_bit": 7,
"output_type": "bool"
},
{
"name": "transduce_out_error",
"end_bit": 7,
"start_bit": 7,
"output_type": "bool"
}
]
},
{
"name": "r1",
"format": "B",
"output_type": "int"
},
{
"name": "r2",
"format": "B",
"output_type": "int"
},
{
"name": "crc",
"format": "B"
}
]
}
}
"""
# The data extracted from the hex string will be made available as new sub-fields as follows:
"""
{
"device": "AB1234",
"time": "1638201313",
"type": "bove",
"data": {
"value": "6881631900003c20020000c3",
"format_spec": ">BIHBBBBB",
"unpacked_data": {
"start_bit": 104,
"v": 1663873,
"interval": 15360,
"meter_state_1": 32,
"meter_state_2": 2,
"r1": 0,
"r2": 0,
"crc": 195,
"meter_batter_alarm": True,
"empty_pipe_alarm": True,
"reverse_flow_alarm": False,
"over_range_alarm": False,
"temp_alarm": False,
"ee_error": False,
"transduce_in_error": False,
"transduce_out_error": False
}
}
}
"""
Notice: This can also be combined with Dynamic Schema and JSON Transformations. In that case the hex string will be parsed first, adn then the JQ filter can be applied to the extracted data.