debezium-supplier: DebeziumFormat.JSON results in unexpected Debezium Output Message Format: JsonByteArray
Closed this issue · 1 comments
Description
After updating from cdc-debezium-supplier
to the new debezium-supplier
we encountered some problems with the message headers in our application. The header values are now byte[]
by default which may result in errors using them in messageKeyExpression
or routing decisions. It is quite common to use message headers for these use cases as Strings.
In cdc-debezium-supplier
the header values were String
serialized.
Reproduction
- Add Following Test-Class in database test package
/*
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.source.debezium.databases;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.app.source.debezium.integration.DebeziumTestUtils;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.util.CollectionUtils;
import org.testcontainers.containers.GenericContainer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests integration with supported Debezium connector datastores. It uses the Debezium pre-build example-images for
* those datastores and pre-generated data for them.
*
* @author Christian Tzolov
* @author David Turanski
* @author Artem Bilan
*/
@Tag("integration")
public class DebeziumHeaderFormatIntegrationTest {
private static final Log logger = LogFactory.getLog(DebeziumHeaderFormatIntegrationTest.class);
private final SpringApplicationBuilder applicationBuilder = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(DebeziumHeaderFormatIntegrationTest.TestApplication.class))
.web(WebApplicationType.NONE)
.properties(
"spring.cloud.function.definition=debeziumSupplier",
// Flattening:
// https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
"debezium.properties.transforms=unwrap,insertAppIdHeader",
"debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState",
"debezium.properties.transforms.unwrap.drop.tombstones=false",
"debezium.properties.transforms.unwrap.delete.handling.mode=rewrite",
"debezium.properties.transforms.unwrap.add.fields=name,db,op,table",
"debezium.properties.transforms.insertAppIdHeader.type=org.apache.kafka.connect.transforms.InsertHeader",
"debezium.properties.transforms.insertAppIdHeader.header=app.id",
"debezium.properties.transforms.insertAppIdHeader.value.literal=best-app-ever",
"debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory",
"debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore",
"debezium.properties.schema=false",
"debezium.properties.header.converter.schemas.enable=false",
"debezium.properties.topic.prefix=my-topic",
"debezium.properties.name=my-connector",
"debezium.properties.database.server.id=85744");
@Test
public void postgres() {
try (GenericContainer<?> postgres = new GenericContainer<>(DebeziumTestUtils.DEBEZIUM_EXAMPLE_POSTGRES_IMAGE)
.withEnv("POSTGRES_USER", "postgres")
.withEnv("POSTGRES_PASSWORD", "postgres")
.withExposedPorts(5432)
.withStartupTimeout(Duration.ofSeconds(120))
.withStartupAttempts(3)) {
postgres.start();
try (ConfigurableApplicationContext context = applicationBuilder.run(
"--debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector",
"--debezium.properties.database.user=postgres",
"--debezium.properties.database.password=postgres",
"--debezium.properties.slot.name=debezium",
"--debezium.properties.database.dbname=postgres",
"--debezium.properties.database.hostname=localhost",
"--debezium.properties.database.port=" + postgres.getMappedPort(5432))) {
OutputDestination outputDestination = context.getBean(OutputDestination.class);
List<Message<?>> allMessages = new ArrayList<>();
Awaitility.await().atMost(Duration.ofMinutes(5)).until(() -> {
List<Message<?>> messageChunk = DebeziumTestUtils.receiveAll(outputDestination);
if (!CollectionUtils.isEmpty(messageChunk)) {
logger.info("Chunk size: " + messageChunk.size());
allMessages.addAll(messageChunk);
}
// strange string value encoding without schema is an unresolved bug in debezium: https://issues.redhat.com/browse/DBZ-7171
assertThat(allMessages.stream().filter(message -> message.getHeaders().get("app.id").equals("\"best-app-ever\"")).toList()).hasSize(29);
// Message size should correspond to the number of insert statements in the sample inventor DB:
// https://github.com/debezium/container-images/blob/main/examples/postgres/2.3/inventory.sql
return allMessages.size() == 29; // Inventory DB entries
});
}
postgres.stop();
}
}
@SpringBootConfiguration
@EnableAutoConfiguration(exclude = { MongoAutoConfiguration.class, DataSourceAutoConfiguration.class })
public static class TestApplication {
/* Remove comment to make test green
@Bean
public DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder(
OffsetCommitPolicy offsetCommitPolicy, DebeziumEngine.CompletionCallback completionCallback,
DebeziumEngine.ConnectorCallback connectorCallback, DebeziumProperties properties, Clock debeziumClock) {
return DebeziumEngine
.create(KeyValueHeaderChangeEventFormat.of(io.debezium.engine.format.JsonByteArray.class, io.debezium.engine.format.JsonByteArray.class, Json.class))
.using(properties.getDebeziumNativeConfiguration())
.using(debeziumClock)
.using(completionCallback)
.using(connectorCallback)
.using(offsetCommitPolicy);
}
*/
}
}
Analysis
The header format can also be specified when calling DebeziumEngine#create(). Allowed values are:
Json.class - the header values are encoded as JSON strings
JsonByteArray.class - the header values are formatted as JSON and encoded as UTF-8 byte arrays
- The supplier allows this values for payload and format.
public enum DebeziumFormat {
/**
* JSON change event format.
*/
JSON("application/json"),
/**
* AVRO change event format.
*/
AVRO("application/avro"),
/**
* ProtoBuf change event format.
*/
PROTOBUF("application/x-protobuf"),;
- these are than mapped to their corresponding Debezium Serialization Format:
switch (debeziumFormat) {
case JSON:
return io.debezium.engine.format.JsonByteArray.class;
case AVRO:
return io.debezium.engine.format.Avro.class;
case PROTOBUF:
return io.debezium.engine.format.Protobuf.class;
default:
throw new IllegalArgumentException("Unknown debezium format: " + debeziumFormat);
}
Current Mitigation possibilities
- Use a processor in the application to transform byte[] to String[]
- Provide your own bean for debeziumEngineBuilder
@Bean
public DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder(
OffsetCommitPolicy offsetCommitPolicy, DebeziumEngine.CompletionCallback completionCallback,
DebeziumEngine.ConnectorCallback connectorCallback, DebeziumProperties properties, Clock debeziumClock) {
return DebeziumEngine
.create(KeyValueHeaderChangeEventFormat.of(io.debezium.engine.format.JsonByteArray.class, io.debezium.engine.format.JsonByteArray.class, Json.class))
.using(properties.getDebeziumNativeConfiguration())
.using(debeziumClock)
.using(completionCallback)
.using(connectorCallback)
.using(offsetCommitPolicy);
}
Proposed changes
- Either make clear that
DebeziumFormat.JSON
!=io.debezium.engine.format.JSON
or change the default for header serialization toio.debezium.engine.format.JSON
- It would be great to use
io.debezium.engine.format.JSON
without providing your own Bean, but this would require a rename from the previousDebeziumFormat.JSON to DebeziumFormat.JSON_BYTE_ARRAY
Transferred an issue to https://github.com/spring-cloud/spring-functions-catalog since this is where we manage those functions from now on.