Failed to fetch message after resetting fetch_offset when hitting OffsetOutOfRange error
Closed this issue · 3 comments
bash-4.4# /opt/kafka/bin/kafka-consumer-groups.sh --describe --group "Email Dispatch Consumer" --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'Email Dispatch Consumer' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
bp.crinoid.v1.emailRule 0 4 14 10 - - -
kafka topic dumps, there are message with offset 13 :-)
```bash-4.4# kafka_dump_topic bp.crinoid.v1.emailRule --property print.offset=true
Offset (9) {"header": {"roleIds": [], "upstreamId": "c6763161-8ee8-40d2-99c9-f1e942b7439a", "traceId": "388586e5-77a3-4db0-8c83-3e2df04915c7", "timestamp": "2018-12-13T16:25:07.090725Z", "envelopeId": "d1b4f522-cd4d-4193-bcb9-e3e6c7a7ccb3"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "0ab5bc5e-5ec7-48b6-a88c-d16472d80e89", "upstreamId": "4194c1c3-82c9-4477-be48-3ff4a331e16a", "traceId": "a034e966-a456-444c-b8df-624e22230a20", "timestamp": "2018-12-13T16:25:05.428370Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64", "upstream_id": "00a743dc-296c-4a80-ab97-a38d9979e01b", "object_id": "", "trace_id": "a034e966-a456-444c-b8df-624e22230a20", "hop_id": "4194c1c3-82c9-4477-be48-3ff4a331e16a", "object_data": {"confirmation_link": "https://10.180.71.18/platform-ui/#/security/reset-password?token=IjEyNDk5NjdmLTBjNzktNDAxOS1iNTk4LWQ0ZGM0Yjk2NGQ2NCI:1gXTnN:Al5DS9_uIqSbGsP9mNLLbbqSe8A", "email": "dzheng@ciean.com", "name": "Du"}, "op": "update_user_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "update_user_reset_password"}, "name": "Update_User_Reset_Password", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password expired"}, "template": "4d338952-7783-44cf-b40d-1fb4e68564b9", "notification_type": "email", "id": 24, "uuid": "68e502b8-8286-47b2-a15c-7f4d00824247"}}}}
Offset (10) {"header": {"roleIds": [], "upstreamId": "a5c630c3-7eaf-4442-8db2-652dcdd04b1e", "traceId": "ed9656b1-d364-4b00-ab73-e453e3915b05", "timestamp": "2018-12-13T16:40:46.198530Z", "envelopeId": "08f9ca93-64ca-4b2b-8f80-bd89cf27ab84"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "ee4c0637-88a1-4d46-b49f-eec5a2ec7c52", "upstreamId": "0b39d9e1-45df-43f2-b740-6afa8388d25c", "traceId": "ed9656b1-d364-4b00-ab73-e453e3915b05", "timestamp": "2018-12-13T16:40:42.893394Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64/reset_password", "upstream_id": "cec5f81d-5fe8-4f2d-9614-600cc61791f1", "object_id": "1249967f-0c79-4019-b598-d4dc4b964d64", "trace_id": "ed9656b1-d364-4b00-ab73-e453e3915b05", "hop_id": "0b39d9e1-45df-43f2-b740-6afa8388d25c", "object_data": {"email": "dzheng@ciean.com", "name": "Du"}, "op": "email_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "email_reset_password"}, "name": "Admin_Password_Reset", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password reset"}, "template": "bf89ff20-5cc9-4325-b392-98a3670e8f93", "notification_type": "email", "id": 6, "uuid": "3541f8ca-d162-4480-afb0-e3d312d3e221"}}}}
Offset (11) {"header": {"roleIds": [], "upstreamId": "82d29005-f22f-4ab1-9970-807888ffcb07", "traceId": "256b58e4-7f89-4c4d-9dc3-cc52260f77ad", "timestamp": "2018-12-13T16:45:20.707517Z", "envelopeId": "19157fff-e097-4015-9503-c59314ab1e8c"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "a86e1467-9933-437c-9423-29dd7343851a", "upstreamId": "887b944a-077a-48bb-b4f1-ef6acf9f78ea", "traceId": "256b58e4-7f89-4c4d-9dc3-cc52260f77ad", "timestamp": "2018-12-13T16:45:13.762890Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64", "upstream_id": "3a73daf6-969b-43e3-83b8-669661e8f878", "object_id": "", "trace_id": "256b58e4-7f89-4c4d-9dc3-cc52260f77ad", "hop_id": "887b944a-077a-48bb-b4f1-ef6acf9f78ea", "object_data": {"confirmation_link": "https://10.180.71.18/platform-ui/#/security/reset-password?token=IjEyNDk5NjdmLTBjNzktNDAxOS1iNTk4LWQ0ZGM0Yjk2NGQ2NCI:1gXU6r:LpXhAB8Pyws4_qchzX9fx6C0lz0", "email": "dzheng@ciean.com", "name": "Du"}, "op": "update_user_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "update_user_reset_password"}, "name": "Update_User_Reset_Password", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password expired"}, "template": "4d338952-7783-44cf-b40d-1fb4e68564b9", "notification_type": "email", "id": 24, "uuid": "68e502b8-8286-47b2-a15c-7f4d00824247"}}}}
Offset (12) {"header": {"roleIds": [], "upstreamId": "6ba43bdc-d66a-45c4-a6e3-de424b4fd598", "traceId": "f5ad34b0-3db1-4e00-9c10-3ad15abdfdbb", "timestamp": "2018-12-13T16:45:51.962691Z", "envelopeId": "8339e2a5-c35c-45d3-83dc-8577ad95a57c"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "32a7f929-01f5-4b26-a5e9-5aaa4a27b3b9", "upstreamId": "9a34be1d-cf58-4f2d-a379-8a800518b279", "traceId": "f5ad34b0-3db1-4e00-9c10-3ad15abdfdbb", "timestamp": "2018-12-13T16:45:48.068736Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64/reset_password", "upstream_id": "92ab38c8-4206-4eea-b3f7-0175ff079b20", "object_id": "1249967f-0c79-4019-b598-d4dc4b964d64", "trace_id": "f5ad34b0-3db1-4e00-9c10-3ad15abdfdbb", "hop_id": "9a34be1d-cf58-4f2d-a379-8a800518b279", "object_data": {"email": "dzheng@ciean.com", "name": "Du"}, "op": "email_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "email_reset_password"}, "name": "Admin_Password_Reset", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password reset"}, "template": "bf89ff20-5cc9-4325-b392-98a3670e8f93", "notification_type": "email", "id": 6, "uuid": "3541f8ca-d162-4480-afb0-e3d312d3e221"}}}}
Offset (13) {"header": {"roleIds": [], "upstreamId": "b3bdb785-e2c3-4221-a061-62fa1ec377f7", "traceId": "93e015b8-a93f-4c50-9dca-6544c83fe89b", "timestamp": "2018-12-13T17:12:56.515914Z", "envelopeId": "597367d8-d7eb-425b-8156-53933e375820"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "da8eca48-364c-42a2-b19b-8839125886b7", "upstreamId": "ca6be94a-a24e-4638-aec1-6ec22937f58c", "traceId": "93e015b8-a93f-4c50-9dca-6544c83fe89b", "timestamp": "2018-12-13T17:12:51.277018Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64", "upstream_id": "4ab1feb7-9f67-448f-b63a-be570258971e", "object_id": "", "trace_id": "93e015b8-a93f-4c50-9dca-6544c83fe89b", "hop_id": "ca6be94a-a24e-4638-aec1-6ec22937f58c", "object_data": {"confirmation_link": "https://10.180.71.18/platform-ui/#/security/reset-password?token=IjEyNDk5NjdmLTBjNzktNDAxOS1iNTk4LWQ0ZGM0Yjk2NGQ2NCI:1gXUXb:wx7lSZThjXEVdtn8YjOikPpomkA", "email": "dzheng@ciean.com", "name": "Du"}, "op": "update_user_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "update_user_reset_password"}, "name": "Update_User_Reset_Password", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password expired"}, "template": "4d338952-7783-44cf-b40d-1fb4e68564b9", "notification_type": "email", "id": 24, "uuid": "68e502b8-8286-47b2-a15c-7f4d00824247"}}}}```
Added following log messages to afkak installed with ntf-dispatcher to track down why consumer stops consuming messages after offset reset to latest when hitting the OffsetOutOfRange error.
2018-12-13T16:52:05.128724+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses: [FetchResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=1, highwaterMark=-1, messages=<generator object _decode_message_set_iter at 0x7f3a04d67b40>)]", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 566, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.127854Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.129702+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses _check_error: FetchResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=1, highwaterMark=-1, messages=<generator object _decode_message_set_iter at 0x7f3a04d67b40>)", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 569, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.128474Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.130384+00:00 onxv1259 dispatcher-email_0: {"msg": "isinstance(responseOrErrcode, int): False", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 763, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.128818Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.130722+00:00 onxv1259 dispatcher-email_0: {"msg": "errno: 1", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 768, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.129217Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.131078+00:00 onxv1259 dispatcher-email_0: {"msg": "cls: <class 'afkak.common.OffsetOutOfRangeError'>", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 774, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.129565Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.131441+00:00 onxv1259 dispatcher-email_0: {"msg": "Reset offset to -1", "namespace": "afkak.consumer", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/consumer.py", "code_line": 725, "code_func": "_handle_fetch_error", "timestamp": "2018-12-13T16:52:05.130108Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.138426+00:00 onxv1259 dispatcher-email_0: {"msg": "send_offset_request [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 508, "code_func": "send_offset_request", "timestamp": "2018-12-13T16:52:05.133847Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.138767+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses: [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 566, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.134468Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.139132+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses _check_error: OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 569, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.134844Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.139456+00:00 onxv1259 dispatcher-email_0: {"msg": "isinstance(responseOrErrcode, int): False", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 763, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.135194Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.139756+00:00 onxv1259 dispatcher-email_0: {"msg": "errno: 0", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 768, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.135484Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.140090+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses out: [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 589, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.135788Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.140398+00:00 onxv1259 dispatcher-email_0: {"msg": "consumer._handle_fetch_response: [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]", "namespace": "afkak.consumer", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/consumer.py", "code_line": 775, "code_func": "_handle_fetch_response", "timestamp": "2018-12-13T16:52:05.136192Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.140711+00:00 onxv1259 dispatcher-email_0: {"msg": "self._msg_block_d: None", "namespace": "afkak.consumer", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/consumer.py", "code_line": 788, "code_func": "_handle_fetch_response", "timestamp": "2018-12-13T16:52:05.136506Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.141153+00:00 onxv1259 dispatcher-email_0: {"msg": "messages to deliver: []", "namespace": "afkak.consumer", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/consumer.py", "code_line": 853, "code_func": "_handle_fetch_response", "timestamp": "2018-12-13T16:52:05.136835Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
@rthille Can you please provide any insight and appreciated the help?
This doesn't make sense: 2018-12-13T16:52:05.138426+00:00 onxv1259 dispatcher-email_0: {"msg": "send_offset_request [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]"
Why is the send_offset_request sending an OffsetResponse? Or is the send logging what it got back? That's unclear.
Then later: 2018-12-13T16:52:05.140398+00:00 onxv1259 dispatcher-email_0: {"msg": "consumer._handle_fetch_response: [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]"
The consumer handling the fetch response shouldn't be handling an OffsetResponse.
Yah, thx for the response!
Two lines in https://github.com/ciena/afkak/blame/master/afkak/consumer.py#L735-L736 I should use _handle_offset_response
and _handle_offset_error
.