[BUG] ControllerObjectManager/ControllerStreamManager batch request may use the wrong nodeId
superhx opened this issue · 0 comments
superhx commented
When failing over to the other node, #closeStreams might generate a batch request with the failoverNodeId. If the current node is closing a stream, the CloseStreamsRequest could merge with the previous request, resulting in an incorrect request nodeId.
automq/core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java
Lines 295 to 341 in 02e0898
public CompletableFuture<Void> closeStream(long streamId, long epoch, int nodeId, long nodeEpoch) { | |
CloseStreamRequest request = new CloseStreamRequest() | |
.setStreamId(streamId) | |
.setStreamEpoch(epoch); | |
WrapRequest req = new BatchRequest() { | |
@Override | |
public Builder addSubRequest(Builder builder) { | |
CloseStreamsRequest.Builder realBuilder = (CloseStreamsRequest.Builder) builder; | |
realBuilder.addSubRequest(request); | |
return realBuilder; | |
} | |
@Override | |
public ApiKeys apiKey() { | |
return ApiKeys.CLOSE_STREAMS; | |
} | |
@Override | |
public Builder toRequestBuilder() { | |
return new CloseStreamsRequest.Builder( | |
new CloseStreamsRequestData() | |
.setNodeId(nodeId) | |
.setNodeEpoch(nodeEpoch)).addSubRequest(request); | |
} | |
}; | |
CompletableFuture<Void> future = new CompletableFuture<>(); | |
RequestTask<CloseStreamResponse, Void> task = new RequestTask<CloseStreamResponse, Void>(req, future, resp -> { | |
switch (Errors.forCode(resp.errorCode())) { | |
case NONE: | |
return ResponseHandleResult.withSuccess(null); | |
case NODE_EPOCH_EXPIRED: | |
case NODE_EPOCH_NOT_EXIST: | |
LOGGER.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode())); | |
throw Errors.forCode(resp.errorCode()).exception(); | |
case STREAM_NOT_EXIST: | |
case STREAM_FENCED: | |
case STREAM_INNER_ERROR: | |
LOGGER.error("Unexpected error while closing stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); | |
throw Errors.forCode(resp.errorCode()).exception(); | |
default: | |
LOGGER.warn("Error while closing stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode())); | |
return ResponseHandleResult.withRetry(); | |
} | |
}); | |
this.requestSender.send(task); | |
return future; | |
} |