DataSQRL/sqrl

Vertx Kafka Configuration Bug

Closed this issue · 0 comments

In 0.5, we use a default configuration for Vertx to read/write to Kafka based on the Flink configuration:

    vertxConfig.put(BOOTSTRAP_SERVERS_CONFIG, (String)map.get("properties.bootstrap.servers"));
    vertxConfig.put(GROUP_ID_CONFIG, (String)map.get("properties.group.id"));
    vertxConfig.put(KEY_DESERIALIZER_CLASS_CONFIG, "com.datasqrl.graphql.kafka.JsonDeserializer");
    vertxConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, "com.datasqrl.graphql.kafka.JsonDeserializer");
    vertxConfig.put(AUTO_OFFSET_RESET_CONFIG, "earliest");

This has the following issues:

  • The auto offset should be set to "latest" so that a new server starts reading the latest messages and does not replay the entire kafka message history which would surface outdated messages.
  • Each vertx server must have a unique group id, otherwise they won't be receiving all of the messages.

This applies to mutation and subscriptions.

In general, we should be moving to template approach for vertx properties similar to what we do in Flink. I have already added the IConnectorFactoryContext to the Log object so the same variables can be retrieved.