senzing-garage/stream-producer

Thread Process Dies if message size is greater than SQS Size limit (256KB)

Closed this issue · 4 comments

Describe the bug
Broker - SQS
Stream Producer initiates the worker with multiple process threads. Whenever the SQS Size Limit exception raised the Thread Process stopped processing or dies. When all the process threads dies due to this exception the stream producer stopped pushing the messages in the broker and become idle.

To Reproduce
Steps to reproduce the behavior:

  1. Create a file with a record of size greater than 256KB
  2. Use SQS Broker and Consume this file using stream-producer
  3. Use will see the following exception

[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) Exception in thread Process-0-FilterQueueDictToJsonSqsThread-3:
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) Traceback (most recent call last):
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) self.run()
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) File "/app/stream-producer.py", line 1511, in run
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) self.print(self.evaluate(message))
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) File "/app/stream-producer.py", line 1394, in print
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) MessageBody=(message),
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) File "/usr/local/lib/python3.7/dist-packages/botocore/client.py", line 316, in _api_call
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) return self._make_api_call(operation_name, kwargs)
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) File "/usr/local/lib/python3.7/dist-packages/botocore/client.py", line 635, in _make_api_call
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) raise error_class(parsed_response, operation_name)
[2020-09-30T17:53:59+05:30] (senzing_staging_stream_producer_definition/default/33aef3fc-2ff6-46d0-bc23-1c474255bef8) botocore.exceptions.ClientError: An error occurred (InvalidParameterValue) when calling the SendMessage operation: One or more parameters are invalid. Reason: Message must be shorter than 262144 bytes.

Expected behavior
Add support for "SENZING_MESSAGE_SIZE_MAX" parameter and if a message is larger than the max size, I just log it as a warning and continue processing. Additionally, if it also logs RECORD_ID of the record in the warning logs then it will be helpful.

Machine (please complete the following information):

  • senzing/stream-loader docker image

Additional context

  1. For more information please check the support case - 3012

Added the following environment variables:

  1. SENZING_RECORD_SIZE_MAX
  2. SENZING_RECORD_IDENTIFIER

Note: This differs from the original request of SENZING_MESSAGE_SIZE_MAX because SENZING_RECORD_* was an existing prefix to be built upon. So the request for SENZING_MESSAGE_SIZE_MAX is actually SENZING_RECORD_SIZE_MAX.

The logging of records the exceed SENZING_MESSAGE_SIZE_MAX look like this:

2020-10-09 17:49:18,534 senzing-50140310W Did not send record identified by RECORD_ID: 386820964.  Exceeds SENZING_RECORD_SIZE_MAX by 678 bytes.
2020-10-09 17:49:18,534 senzing-50140310W Did not send record identified by RECORD_ID: 181734352.  Exceeds SENZING_RECORD_SIZE_MAX by 80 bytes.
2020-10-09 17:49:18,534 senzing-50140310W Did not send record identified by RECORD_ID: 314249610.  Exceeds SENZING_RECORD_SIZE_MAX by 2600 bytes.
2020-10-09 17:49:18,535 senzing-50140310W Did not send record identified by RECORD_ID: 399059018.  Exceeds SENZING_RECORD_SIZE_MAX by 723 bytes.

Pull Requests:

  1. #37

@Sanyambansal76 Does this satisfy your request? If so, please close this issue. If not, let us know what else is requested.

Thanks @docktermj for the support. I had tested it and it is working fine

[2020-10-13T18:28:21+05:30] (senzing_staging_stream_producer_definition/default/***) 2020-10-13 12:58:21,514 senzing-50140310W Did not send record identified by RECORD_ID: 436457673. Exceeds SENZING_RECORD_SIZE_MAX by 39973 bytes.