Aiven-Open/karapace

Unable to create schema with importing custom shared schema

nahidupa opened this issue · 2 comments

What happened?

The Karapace schema creation process encounters a failure when attempting to create a schema that includes an import statement, utilizes a type defined in a shared custom schema.

curl 'http://localhost:8081/subjects/shared%2Fcustomer%2Fbenefits%2Fdiscounts.proto/versions' \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data-raw '{"schema":"syntax = \"proto3\";\npackage shared.customer.benefits.v1beta1;\n\nmessage DiscountAmount {\n  oneof options {\n    int64 fraction = 2;\n  }\n}\n","schemaType":"PROTOBUF","references":[]}' \
  --compressed

Now create another schema that uses a type DiscountAmount which is created in the previous command.

curl 'http://localhost:8081/subjects/karapace%2Fbug%2Fv1%2Fbug.proto/versions' \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data-raw '{"schema":"syntax = \"proto3\";\npackage karapace.bug.v1;\n\nimport \"shared/customer/benefits/discounts.proto\";\n\nmessage ProductDiscount {\n  map<string, shared.customer.benefits.v1beta1.DiscountAmount> discounts = 1;\n}\n","schemaType":"PROTOBUF","references":[]}' \
  --compressed

{"error_code":42201,"message":"Invalid PROTOBUF schema. Error: type \"shared.customer.benefits.v1beta1.DiscountAmount\" is not defined"}%

What did you expect to happen?

The second schema should be created without any error.

What else do we need to know?

I have tried with confluent schema registry as well as Karapace, here is the step to reproduce with a local docker setup.

cat docker-compose-confluent.yml

---
version: '2'
services:

  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.4.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081


  connect:
    image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.4.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.4.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.4.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.4.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.4.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.4.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

Create the confluent schema registry

docker-compose -f docker-compose-confluent.yml up

Create the both schema with curl command.


curl 'http://localhost:8081/subjects/shared%2Fcustomer%2Fbenefits%2Fdiscounts.proto/versions' \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data-raw '{"schema":"syntax = \"proto3\";\npackage shared.customer.benefits.v1beta1;\n\nmessage DiscountAmount {\n  oneof options {\n    int64 fraction = 2;\n  }\n}\n","schemaType":"PROTOBUF","references":[]}' \
  --compressed



curl 'http://localhost:8081/subjects/karapace%2Fbug%2Fv1%2Fbug.proto/versions' \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data-raw '{"schema":"syntax = \"proto3\";\npackage karapace.bug.v1;\n\nimport \"shared/customer/benefits/discounts.proto\";\n\nmessage ProductDiscount {\n  map<string, shared.customer.benefits.v1beta1.DiscountAmount> discounts = 1;\n}\n","schemaType":"PROTOBUF","references":[]}' \
  --compressed

It is also possible to retrieve the created schema

 
curl 'http://localhost:8081/subjects/shared%2Fcustomer%2Fbenefits%2Fdiscounts.proto/versions/1'
{"id":1,"schema":"syntax = \"proto3\";\npackage shared.customer.benefits.v1beta1;\n\nmessage DiscountAmount {\n  oneof options {\n    int64 fraction = 2;\n  }\n}\n","schemaType":"PROTOBUF","subject":"shared\/customer\/benefits\/discounts.proto","version":1}%

Now bring down the previous docker-compose

docker-compose -f docker-compose-confluent.yml down
docker network prune

Here is a docker-compose file that runs the karapace schema registry

cat docker-compose-karapace.yml

---
version: '2'
services:

  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'


  schema-registry:
    image: local/karapace
    hostname: schema-registry
    container_name: schema-registry
    build:
      context: ..
      dockerfile: container/Dockerfile
    entrypoint:
    - /bin/bash
    - /opt/karapace/start.sh
    - registry
    depends_on:
    - broker
    ports:
    - "8081:8081"
    environment:
      KARAPACE_ADVERTISED_HOSTNAME: karapace-registry
      KARAPACE_BOOTSTRAP_URI: broker:29092
      KARAPACE_PORT: 8081
      KARAPACE_HOST: 0.0.0.0
      KARAPACE_CLIENT_ID: karapace
      KARAPACE_GROUP_ID: karapace-registry
      KARAPACE_MASTER_ELIGIBILITY: "true"
      KARAPACE_TOPIC_NAME: _schemas
      KARAPACE_LOG_LEVEL: WARNING
      KARAPACE_COMPATIBILITY: FULL

  karapace-rest:
    image: local/karapace
    build:
      context: ..
      dockerfile: container/Dockerfile
    entrypoint:
    - /bin/bash
    - /opt/karapace/start.sh
    - rest
    depends_on:
    - broker
    - schema-registry
    ports:
    - "8082:8082"
    environment:
      KARAPACE_PORT: 8082
      KARAPACE_HOST: 0.0.0.0
      KARAPACE_ADVERTISED_HOSTNAME: karapace-rest
      KARAPACE_BOOTSTRAP_URI: broker:29092
      KARAPACE_REGISTRY_HOST: schema-registry
      KARAPACE_REGISTRY_PORT: 8081
      KARAPACE_ADMIN_METADATA_MAX_AGE: 0
      KARAPACE_LOG_LEVEL: WARNING

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.4.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.4.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.4.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.4.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.4.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081


docker-compose -f docker-compose-karapace.yml up

Use the same previous command.

curl 'http://localhost:8081/subjects/shared%2Fcustomer%2Fbenefits%2Fdiscounts.proto/versions' \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data-raw '{"schema":"syntax = \"proto3\";\npackage shared.customer.benefits.v1beta1;\n\nmessage DiscountAmount {\n  oneof options {\n    int64 fraction = 2;\n  }\n}\n","schemaType":"PROTOBUF","references":[]}' \
  --compressed



curl 'http://localhost:8081/subjects/karapace%2Fbug%2Fv1%2Fbug.proto/versions' \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data-raw '{"schema":"syntax = \"proto3\";\npackage karapace.bug.v1;\n\nimport \"shared/customer/benefits/discounts.proto\";\n\nmessage ProductDiscount {\n  map<string, shared.customer.benefits.v1beta1.DiscountAmount> discounts = 1;\n}\n","schemaType":"PROTOBUF","references":[]}' \
  --compressed

{"error_code":42201,"message":"Invalid PROTOBUF schema. Error: type \"shared.customer.benefits.v1beta1.DiscountAmount\" is not defined"}%

it is showing error like bellow which is unexpected.

{"error_code":42201,"message":"Invalid PROTOBUF schema. Error: type \"shared.customer.benefits.v1beta1.DiscountAmount\" is not defined"}%

Here is the schemas in simple format

cat discounts.proto

syntax = "proto3";
package shared.customer.benefits.v1beta1;

message DiscountAmount {
  oneof options {
    int64 fraction = 2;
  }
}

cat bug.proto

syntax = "proto3";
package karapace.bug.v1;
import "shared/customer/benefits/discounts.proto";

message ProductDiscount {
  map<string, shared.customer.benefits.v1beta1.DiscountAmount> discounts = 1;
}

As karapace is compatible with confluent schema registry we are expecting the same behaviour.

If I use following curl command to register second schema that uses references (passing additional references: [...] filled with a reference, then it works just fine for me.

curl 'http://localhost:8081/subjects/karapace%2Fbug%2Fv1%2Fbug.proto/versions' -H "Content-Type: application/vnd.schemaregistry.v1+json" --data-raw '{"schema":"syntax = \"proto3\";\npackage karapace.bug.v1;\n\nimport \"shared/customer/benefits/discounts.proto\";\n\nmessage ProductDiscount {\n map<string, shared.customer.benefits.v1beta1.DiscountAmount> discounts = 1;\n}\n","schemaType":"PROTOBUF","references":[{"name": "shared/customer/benefits/v1beta1/DiscountAmount.proto", "subject": "shared/customer/benefits/discounts.proto", "version": -1}]}' --compressed

Does this help you?

@tvainika thanks for the suggestion, the references were not the issue this is fixed with PR #705