wandenberg/nginx-push-stream-module

Publish from other nginx modules

Closed this issue · 8 comments

Is it possible to publish events into channels from other nginx modules?

Not yet. Only if the module can execute a POST to 'publish' location.

I use curl_easy in module to execute a POST to publish location, but it is very bad

I want call some internal method for publish without any location etc.

Feel free to expose the function that adds a message to a channel and call it from the other module. Pull requests are more than welcome. ;)

Unfortunatelly, I don't understood, how I can

expose the function that adds a message to a channel

After all the validations and parsers done when a message is posted the module uses the function ngx_http_push_stream_add_msg_to_channel to really publish the message.
One way would be your other module prepare all the parameters this function requires and call it.
Then would be possible to 'publish' a message from another module without having to post to Nginx

I seen this function ngx_http_push_stream_add_msg_to_channel already.
Is it really possible to prepare all its parameters from other module?

I did it!!! Great thank You!

diff --git a/include/ngx_http_push_stream_module_utils.h b/include/ngx_http_push_stream_module_utils.h
index 8c99cf1..8d29fd8 100644
--- a/include/ngx_http_push_stream_module_utils.h
+++ b/include/ngx_http_push_stream_module_utils.h
@@ -263,6 +263,7 @@ static void                 ngx_http_push_stream_complex_value(ngx_http_request_
 
 
 ngx_int_t                   ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool);
+void                        ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool);
 ngx_int_t                   ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_id, ngx_pool_t *temp_pool, ngx_http_request_t *r);
 
 static void                 ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
diff --git a/src/ngx_http_push_stream_module_utils.c b/src/ngx_http_push_stream_module_utils.c
index 6f35a59..e47ac50 100644
--- a/src/ngx_http_push_stream_module_utils.c
+++ b/src/ngx_http_push_stream_module_utils.c
@@ -371,6 +371,27 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con
     return msg;
 }
 
+void
+ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool)
+{
+    ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
+    for (ngx_queue_t *data_q = ngx_queue_head(&global_data->shm_datas_queue); data_q != ngx_queue_sentinel(&global_data->shm_datas_queue); data_q = ngx_queue_next(data_q)) {
+        ngx_http_push_stream_shm_data_t *data = ngx_queue_data(data_q, ngx_http_push_stream_shm_data_t, shm_data_queue);
+        ngx_http_push_stream_main_conf_t *mcf = data->mcf;
+        ngx_http_push_stream_channel_t *channel = ngx_http_push_stream_find_channel(id, log, mcf);
+        if (channel != NULL) {
+            if (ngx_http_push_stream_add_msg_to_channel(mcf, log, channel, text->data, text->len, event_id, event_type, store_messages, temp_pool) != NGX_OK) {
+                ngx_log_error(NGX_LOG_ERR, log, 0, "ngx_http_push_stream_add_msg_to_channel != NGX_OK");
+            }
+        }
+    }
+}
 
 ngx_int_t
 ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool)