spring-cloud/spring-functions-catalog

debezium-supplier: DebeziumFormat.JSON results in unexpected Debezium Output Message Format: JsonByteArray

Closed this issue · 1 comments

Description

After updating from cdc-debezium-supplier to the new debezium-supplier we encountered some problems with the message headers in our application. The header values are now byte[] by default which may result in errors using them in messageKeyExpression or routing decisions. It is quite common to use message headers for these use cases as Strings.
In cdc-debezium-supplier the header values were String serialized.

Reproduction

/*
 * Copyright 2020-2023 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.app.source.debezium.databases;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.app.source.debezium.integration.DebeziumTestUtils;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.util.CollectionUtils;
import org.testcontainers.containers.GenericContainer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/**
 * Tests integration with supported Debezium connector datastores. It uses the Debezium pre-build example-images for
 * those datastores and pre-generated data for them.
 *
 * @author Christian Tzolov
 * @author David Turanski
 * @author Artem Bilan
 */
@Tag("integration")
public class DebeziumHeaderFormatIntegrationTest {

	private static final Log logger = LogFactory.getLog(DebeziumHeaderFormatIntegrationTest.class);

	private final SpringApplicationBuilder applicationBuilder = new SpringApplicationBuilder(
			TestChannelBinderConfiguration
					.getCompleteConfiguration(DebeziumHeaderFormatIntegrationTest.TestApplication.class))
							.web(WebApplicationType.NONE)
							.properties(
									"spring.cloud.function.definition=debeziumSupplier",
									// Flattening:
									// https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
									"debezium.properties.transforms=unwrap,insertAppIdHeader",
									"debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState",
									"debezium.properties.transforms.unwrap.drop.tombstones=false",
									"debezium.properties.transforms.unwrap.delete.handling.mode=rewrite",
									"debezium.properties.transforms.unwrap.add.fields=name,db,op,table",
									"debezium.properties.transforms.insertAppIdHeader.type=org.apache.kafka.connect.transforms.InsertHeader",
									"debezium.properties.transforms.insertAppIdHeader.header=app.id",
									"debezium.properties.transforms.insertAppIdHeader.value.literal=best-app-ever",


									"debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory",
									"debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore",

									"debezium.properties.schema=false",
									"debezium.properties.header.converter.schemas.enable=false",

									"debezium.properties.topic.prefix=my-topic",
									"debezium.properties.name=my-connector",
									"debezium.properties.database.server.id=85744");

	@Test
	public void postgres() {
		try (GenericContainer<?> postgres = new GenericContainer<>(DebeziumTestUtils.DEBEZIUM_EXAMPLE_POSTGRES_IMAGE)
				.withEnv("POSTGRES_USER", "postgres")
				.withEnv("POSTGRES_PASSWORD", "postgres")
				.withExposedPorts(5432)
				.withStartupTimeout(Duration.ofSeconds(120))
				.withStartupAttempts(3)) {

			postgres.start();

			try (ConfigurableApplicationContext context = applicationBuilder.run(
					"--debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector",
					"--debezium.properties.database.user=postgres",
					"--debezium.properties.database.password=postgres",
					"--debezium.properties.slot.name=debezium",
					"--debezium.properties.database.dbname=postgres",
					"--debezium.properties.database.hostname=localhost",
					"--debezium.properties.database.port=" + postgres.getMappedPort(5432))) {

				OutputDestination outputDestination = context.getBean(OutputDestination.class);

				List<Message<?>> allMessages = new ArrayList<>();
				Awaitility.await().atMost(Duration.ofMinutes(5)).until(() -> {
					List<Message<?>> messageChunk = DebeziumTestUtils.receiveAll(outputDestination);
					if (!CollectionUtils.isEmpty(messageChunk)) {
						logger.info("Chunk size: " + messageChunk.size());
						allMessages.addAll(messageChunk);
					}
					// strange string value encoding without schema is an unresolved bug in debezium: https://issues.redhat.com/browse/DBZ-7171
					assertThat(allMessages.stream().filter(message -> message.getHeaders().get("app.id").equals("\"best-app-ever\"")).toList()).hasSize(29);
					// Message size should correspond to the number of insert statements in the sample inventor DB:
					// https://github.com/debezium/container-images/blob/main/examples/postgres/2.3/inventory.sql
					return allMessages.size() == 29; // Inventory DB entries
				});
			}

			postgres.stop();
		}
	}


	@SpringBootConfiguration
	@EnableAutoConfiguration(exclude = { MongoAutoConfiguration.class, DataSourceAutoConfiguration.class })
	public static class TestApplication {
/*		Remove comment to make test green
		@Bean
		public DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder(
				OffsetCommitPolicy offsetCommitPolicy, DebeziumEngine.CompletionCallback completionCallback,
				DebeziumEngine.ConnectorCallback connectorCallback, DebeziumProperties properties, Clock debeziumClock) {

			return DebeziumEngine
					.create(KeyValueHeaderChangeEventFormat.of(io.debezium.engine.format.JsonByteArray.class, io.debezium.engine.format.JsonByteArray.class, Json.class))
					.using(properties.getDebeziumNativeConfiguration())
					.using(debeziumClock)
					.using(completionCallback)
					.using(connectorCallback)
					.using(offsetCommitPolicy);
		}
		*/
	}

}

Analysis

The header format can also be specified when calling DebeziumEngine#create(). Allowed values are:

Json.class - the header values are encoded as JSON strings

JsonByteArray.class - the header values are formatted as JSON and encoded as UTF-8 byte arrays

  • The supplier allows this values for payload and format.
public enum DebeziumFormat {
		/**
		 * JSON change event format.
		 */
		JSON("application/json"),
		/**
		 * AVRO change event format.
		 */
		AVRO("application/avro"),
		/**
		 * ProtoBuf change event format.
		 */
		PROTOBUF("application/x-protobuf"),;
  • these are than mapped to their corresponding Debezium Serialization Format:
		switch (debeziumFormat) {
		case JSON:
			return io.debezium.engine.format.JsonByteArray.class;
		case AVRO:
			return io.debezium.engine.format.Avro.class;
		case PROTOBUF:
			return io.debezium.engine.format.Protobuf.class;
		default:
			throw new IllegalArgumentException("Unknown debezium format: " + debeziumFormat);
		}

Current Mitigation possibilities

  • Use a processor in the application to transform byte[] to String[]
  • Provide your own bean for debeziumEngineBuilder
		@Bean
		public DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder(
				OffsetCommitPolicy offsetCommitPolicy, DebeziumEngine.CompletionCallback completionCallback,
				DebeziumEngine.ConnectorCallback connectorCallback, DebeziumProperties properties, Clock debeziumClock) {

			return DebeziumEngine
					.create(KeyValueHeaderChangeEventFormat.of(io.debezium.engine.format.JsonByteArray.class, io.debezium.engine.format.JsonByteArray.class, Json.class))
					.using(properties.getDebeziumNativeConfiguration())
					.using(debeziumClock)
					.using(completionCallback)
					.using(connectorCallback)
					.using(offsetCommitPolicy);
		}

Proposed changes

  • Either make clear that DebeziumFormat.JSON != io.debezium.engine.format.JSON or change the default for header serialization to io.debezium.engine.format.JSON
  • It would be great to use io.debezium.engine.format.JSON without providing your own Bean, but this would require a rename from the previous DebeziumFormat.JSON to DebeziumFormat.JSON_BYTE_ARRAY

Transferred an issue to https://github.com/spring-cloud/spring-functions-catalog since this is where we manage those functions from now on.