The event suddenly died and could not be handled
Opened this issue · 0 comments
Problem
I register event consumers according to the following structure
My version core and message is 0.32.0.RELEASE
public DomainEventHandlers domainEventHandlers() {
log.debug("Building Domain Event Handler for Engine Event Consumer");
String engineEventKafkaTopic = globalSetting.get(AppSettingKey.EVENTUATE.CMD_TOPIC);
return DomainEventHandlersBuilder
.forAggregateType(engineEventKafkaTopic)
.onEvent(CmdUpdatedStatusEvent.class, this::handleCmdUpdatedStatusEvent)
.build();
}
private void handleCmdUpdatedStatusEvent(DomainEventEnvelope<CmdUpdatedStatusEvent> event) {
log.info("Received CmdUpdatedStatusEvent: {}", event.getEvent());
try {
cmdEventHandler.handleEvent(event.getEvent());
} catch (Exception exception) {
log.debug("Error when handle cmd updated status event!", exception);
saveErrorMessage(event, exception.getMessage());
}
}
private <T extends DomainEvent> void saveErrorMessage(DomainEventEnvelope<T> event, String messageLog) {
try {
Message message = event.getMessage();
CreateReceivedMessagesErrorCommand createReceivedMessagesErrorCommand = new CreateReceivedMessagesErrorCommand();
createReceivedMessagesErrorCommand.setMessageId(message.getId());
createReceivedMessagesErrorCommand.setEventType(event.getEvent().getClass().getSimpleName());
createReceivedMessagesErrorCommand.setHeader(objectMapper.writeValueAsString(message.getHeaders()));
createReceivedMessagesErrorCommand.setPayload(message.getPayload());
createReceivedMessagesErrorCommand.setLog(messageLog);
createReceivedMessagesErrorCommand.setStatus(ReceivedMessagesErrorStatus.PENDING);
createReceivedMessagesErrorCommand.setCreationTime(Instant.now());
saveReceivedMessagesErrorUseCase.create(createReceivedMessagesErrorCommand);
} catch (Exception e) {
log.error("An error occurred while trying convert and send command to create received message error", e);
}
}
Everything was fine, and then the event suddenly died for no apparent reason
When I restart the application the application continues to consume events and after a period of a few days it automatically stops consuming
Below is the processing log of eventuate
Log
When eventuate is consumed, this segment usually only has 1 offset. But when the event has a problem, all the offsets are gathered here
To commit CI_DOP_V3_EVENT_CONSUMER io.eventuate.messaging.kafka.basic.consumer.OffsetTracker@59d137a1[state={CI_DOP_V3_EVENT-0=io.eventuate.messaging.kafka.basic.consumer.TopicPartitionOffsets@1f75b245[unprocessed=[52274]
I using mysql and I see that it always locked on before the event break down
OBJECT_NAME received_messages
LOCK_TYPE TABLE
LOCK_MODE IX
Do you have any ideals about this issue? Thank you very much!