AutoMQ/automq

[BUG] ControllerObjectManager/ControllerStreamManager batch request may use the wrong nodeId

superhx opened this issue · 0 comments

When failing over to the other node, #closeStreams might generate a batch request with the failoverNodeId. If the current node is closing a stream, the CloseStreamsRequest could merge with the previous request, resulting in an incorrect request nodeId.

public CompletableFuture<Void> closeStream(long streamId, long epoch, int nodeId, long nodeEpoch) {
CloseStreamRequest request = new CloseStreamRequest()
.setStreamId(streamId)
.setStreamEpoch(epoch);
WrapRequest req = new BatchRequest() {
@Override
public Builder addSubRequest(Builder builder) {
CloseStreamsRequest.Builder realBuilder = (CloseStreamsRequest.Builder) builder;
realBuilder.addSubRequest(request);
return realBuilder;
}
@Override
public ApiKeys apiKey() {
return ApiKeys.CLOSE_STREAMS;
}
@Override
public Builder toRequestBuilder() {
return new CloseStreamsRequest.Builder(
new CloseStreamsRequestData()
.setNodeId(nodeId)
.setNodeEpoch(nodeEpoch)).addSubRequest(request);
}
};
CompletableFuture<Void> future = new CompletableFuture<>();
RequestTask<CloseStreamResponse, Void> task = new RequestTask<CloseStreamResponse, Void>(req, future, resp -> {
switch (Errors.forCode(resp.errorCode())) {
case NONE:
return ResponseHandleResult.withSuccess(null);
case NODE_EPOCH_EXPIRED:
case NODE_EPOCH_NOT_EXIST:
LOGGER.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
case STREAM_NOT_EXIST:
case STREAM_FENCED:
case STREAM_INNER_ERROR:
LOGGER.error("Unexpected error while closing stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
default:
LOGGER.warn("Error while closing stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode()));
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}