wmo-im/pywis-pubsub

Adding MQTT Notification for Global Cache

Opened this issue · 0 comments

Adding MQTT Notification for Global Cache

While this is likely possible using the "hook.py" method, I was not able to locate any examples on how to code a correct hook.
The following addition submits a notification message following a successful download. The approach reuses the existing subscribe client connection and saves the overhead of establishing a client session for each notification message - a possible side effect of using the "hook.py" approach.

$ git diff pywis_pubsub/subscribe.py
diff --git a/pywis_pubsub/subscribe.py b/pywis_pubsub/subscribe.py
index 5034e66..e869e5f 100644
--- a/pywis_pubsub/subscribe.py
+++ b/pywis_pubsub/subscribe.py
@@ -23,9 +23,11 @@ import json
import logging
from pathlib import Path
import random
-
import click
+import uuid
+from copy import deepcopy
+from datetime import datetime
from pywis_pubsub import cli_options
from pywis_pubsub import util
from pywis_pubsub.geometry import is_message_within_bbox
@@ -39,7 +41,6 @@ from pywis_pubsub.verification import data_verified
LOGGER = logging.getLogger(__name__)
-
def on_message_handler(client, userdata, msg):
"""message handler"""
@@ -140,6 +141,18 @@ def on_message_handler(client, userdata, msg):
else:
storage_object.save(data, filename)
+ if userdata.get('cache_url') is not None:
+ cachetopic = msg.topic.replace("origin","cache",1)
+ pub_mesg = deepcopy(msg_dict)
+ pub_mesg['id'] = str(uuid.uuid4())
+ pub_mesg['properties']['pubtime'] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
+ for indx, elem in enumerate(pub_mesg['links']):
+ if elem['rel'] == 'canonical':
+ pub_mesg['links'][indx]['href'] = userdata.get('cache_url') + filename
+ LOGGER.debug(f"Publish Cache Topic: {cachetopic}")
+ LOGGER.debug(f"Publish Cache Message:\n{json.dumps(pub_mesg)}")
+ client.publish(cachetopic, json.dumps(pub_mesg))
+
if userdata.get('hook') is not None:
LOGGER.debug(f"Hook detected: {userdata['hook']}")
try:
@@ -163,7 +176,6 @@ def subscribe(ctx, config, download, bbox=[], verbosity='NOTSET'):
if config is None:
raise click.ClickException('missing --config')
config = util.yaml_load(config)
-
broker = config.get('broker')
default_client_id = f'pywis-pubsub-{random.randint(0, 1000)}'
client_id = config.get('client_id', default_client_id)
@@ -185,6 +197,7 @@ def subscribe(ctx, config, download, bbox=[], verbosity='NOTSET'):
options['verify_data'] = config.get('verify_data', True)
options['validate_message'] = config.get('validate_message', False)
+ options['cache_url'] = config.get('cache_url')
options['hook'] = config.get('hook')
client = MQTTPubSubClient(broker, options)