odpi/egeria-docs

[BUG] local.server.id configuration in event bus is not used in access services configs

juergenhemelt opened this issue · 13 comments

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

When I configure the local.server.id the default event bus this configuration has no effect on the access services created for the metadata server:

curl -f -k --verbose --basic admin:admin \
  --header "Content-Type: application/json" \
  "${EGERIA_ENDPOINT}/open-metadata/admin-services/users/${EGERIA_USER}/servers/${EGERIA_SERVER}/event-bus?topicURLRoot=itg.egeria" \
  --data @- <<EOF
  {
    "local.server.id": "itg.${EGERIA_SERVER}",
    "producer": {
      "bootstrap.servers": "${KAFKA_ENDPOINT}",
      "acks": "all",
      "security.protocol": "SSL",
      "ssl.keystore.location": "${SSL_KEYSTORE_LOCATION}",
      "ssl.keystore.password": "${SSL_KEYSTORE_PASSWORD}",
      "ssl.truststore.location": "${SSL_TRUSTSTORE_LOCATION}",
      "ssl.truststore.password": "${SSL_TRUSTSTORE_PASSWORD}"
    },
    "consumer": {
      "bootstrap.servers": "${KAFKA_ENDPOINT}",
      "group.id": "${KAFKA_CONSUMER_GROUP_ID}",
      "security.protocol": "SSL",
      "ssl.keystore.location": "${SSL_KEYSTORE_LOCATION}",
      "ssl.keystore.password": "${SSL_KEYSTORE_PASSWORD}",
      "ssl.truststore.location": "${SSL_TRUSTSTORE_LOCATION}",
      "ssl.truststore.password": "${SSL_TRUSTSTORE_PASSWORD}"
    }
  }
EOF

The resulting configuration for the access service (e. g. datamanager) is:

        {
            "class": "AccessServiceConfig",
            "accessServiceId": 210,
            "accessServiceDevelopmentStatus": "TECHNICAL_PREVIEW",
            "accessServiceAdminClass": "org.odpi.openmetadata.accessservices.datamanager.admin.DataManagerAdmin",
            "accessServiceName": "Data Manager",
            "accessServiceFullName": "Data Manager OMAS",
            "accessServiceURLMarker": "data-manager",
            "accessServiceDescription": "Capture changes to the data stores and data set managed by a data manager such as a database server, content manager or file system.",
            "accessServiceWiki": "https://egeria-project.org/services/omas/data-manager/overview/",
            "accessServiceOperationalStatus": "ENABLED",
            "accessServiceInTopic": {
                "class": "Connection",
                "headerVersion": 0,
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "type": {
                        "class": "ElementType",
                        "headerVersion": 0,
                        "elementOrigin": "LOCAL_COHORT",
                        "elementVersion": 0,
                        "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                        "elementTypeName": "ConnectorType",
                        "elementTypeVersion": 1,
                        "elementTypeDescription": "A set of properties describing a type of connector."
                    },
                    "guid": "3851e8d0-e343-400c-82cb-3918fed81da6",
                    "qualifiedName": "Egeria:OpenMetadataTopicConnector:Kafka",
                    "displayName": "Apache Kafka Open Metadata Topic Connector",
                    "description": "Apache Kafka Open Metadata Topic Connector supports string based events over an Apache Kafka event bus.",
                    "supportedAssetTypeName": "KafkaTopic",
                    "expectedDataFormat": "PLAINTEXT",
                    "connectorProviderClassName": "org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicProvider",
                    "connectorInterfaces": [
                        "org.odpi.openmetadata.frameworks.connectors.Connector",
                        "org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopic",
                        "org.odpi.openmetadata.frameworks.auditlog.AuditLoggingComponent"
                    ],
                    "recognizedConfigurationProperties": [
                        "producer",
                        "consumer",
                        "local.server.id",
                        "sleepTime"
                    ]
                },
                "endpoint": {
                    "class": "Endpoint",
                    "headerVersion": 0,
                    "address": "itg.egeria.server.mds1.omas.datamanager.inTopic"
                },
                "configurationProperties": {
                    "producer": {
                        "bootstrap.servers": "repnest-kafka-kafka-bootstrap.repnest-etu.svc.cluster.local:9093",
                        "acks": "all",
                        "security.protocol": "SSL",
                        "ssl.keystore.location": "/jks/keystore.jks",
                        "ssl.keystore.password": "***",
                        "ssl.truststore.location": "/jks/truststore.jks",
                        "ssl.truststore.password": "***"
                    },
                    "local.server.id": "040e0598-ff99-46a0-8e8f-c515df3f3688",
                    "consumer": {
                        "bootstrap.servers": "repnest-kafka-kafka-bootstrap.repnest-etu.svc.cluster.local:9093",
                        "group.id": "itg.egeria",
                        "security.protocol": "SSL",
                        "ssl.keystore.location": "/jks/keystore.jks",
                        "ssl.keystore.password": "***",
                        "ssl.truststore.location": "/jks/truststore.jks",
                        "ssl.truststore.password": "***"
                    }
                }
            },
            "accessServiceOutTopic": {
                "class": "Connection",
                "headerVersion": 0,
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "type": {
                        "class": "ElementType",
                        "headerVersion": 0,
                        "elementOrigin": "LOCAL_COHORT",
                        "elementVersion": 0,
                        "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                        "elementTypeName": "ConnectorType",
                        "elementTypeVersion": 1,
                        "elementTypeDescription": "A set of properties describing a type of connector."
                    },
                    "guid": "3851e8d0-e343-400c-82cb-3918fed81da6",
                    "qualifiedName": "Egeria:OpenMetadataTopicConnector:Kafka",
                    "displayName": "Apache Kafka Open Metadata Topic Connector",
                    "description": "Apache Kafka Open Metadata Topic Connector supports string based events over an Apache Kafka event bus.",
                    "supportedAssetTypeName": "KafkaTopic",
                    "expectedDataFormat": "PLAINTEXT",
                    "connectorProviderClassName": "org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicProvider",
                    "connectorInterfaces": [
                        "org.odpi.openmetadata.frameworks.connectors.Connector",
                        "org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopic",
                        "org.odpi.openmetadata.frameworks.auditlog.AuditLoggingComponent"
                    ],
                    "recognizedConfigurationProperties": [
                        "producer",
                        "consumer",
                        "local.server.id",
                        "sleepTime"
                    ]
                },
                "endpoint": {
                    "class": "Endpoint",
                    "headerVersion": 0,
                    "address": "itg.egeria.server.mds1.omas.datamanager.outTopic"
                },
                "configurationProperties": {
                    "producer": {
                        "bootstrap.servers": "repnest-kafka-kafka-bootstrap.repnest-etu.svc.cluster.local:9093",
                        "acks": "all",
                        "security.protocol": "SSL",
                        "ssl.keystore.location": "/jks/keystore.jks",
                        "ssl.keystore.password": "***",
                        "ssl.truststore.location": "/jks/truststore.jks",
                        "ssl.truststore.password": "***"
                    },
                    "local.server.id": "040e0598-ff99-46a0-8e8f-c515df3f3688",
                    "consumer": {
                        "bootstrap.servers": "repnest-kafka-kafka-bootstrap.repnest-etu.svc.cluster.local:9093",
                        "group.id": "itg.egeria",
                        "security.protocol": "SSL",
                        "ssl.keystore.location": "/jks/keystore.jks",
                        "ssl.keystore.password": "***",
                        "ssl.truststore.location": "/jks/truststore.jks",
                        "ssl.truststore.password": "***"
                    }
                }
            }
        }

Expected Behavior

local.server.id of all topics for all access services should be the same as configured for the event bus. This is important for the Kafka consumer and producer to use it as a key of the Kafka messages and as the consumer group ID of the consumer. The consumer group ID must be configurable to allow authorization based on consumer groups, see https://kafka.apache.org/documentation/#security_authz

Steps To Reproduce

See above, inspect the logs of the Egeria plattform. If your Kafka user is only authorized for limited consumer groups, you get log messages like this:

Wed Jun 29 06:33:26 GMT 2022 mds1 Error OCF-KAFKA-TOPIC-CONNECTOR-0008 The connector listening on topic itg.egeria.server.mds1.omas.datamanager.outTopic received an unexpected exception org.apache.kafka.common.errors.GroupAuthorizationException from Apache Kafka.  The message in the exception was Not authorized to access group: 040e0598-ff99-46a0-8e8f-c515df3f3688

Environment

- Egeria: 3.9.
- OS: Using helm chart egeria-base
- Java: 11.0.15

Any Further Information?

No response

Thanks @juergenhemelt

You have a need to set a specific consumer group id for their egeria servers (corporate policy on naming).

It's possible to set ‘local.server.id’ in the default event bus config. It looks as if the intent for this was to act as the consumer group id. However when we setup the connection configs from this default the value is not carried over/is overridden (and is set to the server id). I was able to reproduce this in master.

I also tried setting group.id within the producer properties. This is the standard approach for kafka, and our docs do link to these kafka docs for advice with configuration in general, so a user might reasonably expect this to work. It does not (it’s overwritten in the code).

The local.server.id can be set for each service independently, but this is much more effort.

My summary is

  • it’s important to be able to set the consumer group
  • Defaults should be applied when setting up connections
  • Using a non-standard approach is more confusing

My thought is to propose:

  • review code to confirm my understanding is correct
  • fix the current behaviour, and ensure local.server.id is propogated when connections are setup
  • Do not override group.id using this value if it is explicitly set in producer properties - perhaps...
  • update docs to clarify the behaviour

In ConnectorConfigurationFactory.java we have a call

        configurationProperties.put("local.server.id", serverId);

This is overwriting the previously set value each time. if no value is set, this is an appropriate default. However if set, should be skipped. (in addition to overrides in consumer properties)

In KafkaOpenMetadataTopicConnector.java we have

 consumerProperties.put("group.id", serverId);

This then overrides any setting of group.id in producer properties. Again, this should only be done if the group.id is not set.

This two changes should preserve the existing intended default behaviour, whilst respecting an override using the local.server.id (as previously intended) but also, higher priority, group.id (standard kafka)

We currently record an audit log entry on connect of

    SERVICE_INITIALIZING("OCF-KAFKA-TOPIC-CONNECTOR-0001",
              OMRSAuditLogRecordSeverity.STARTUP,
              "Connecting to Apache Kafka Topic {0} with a server identifier of {1}",
              "The local server has started up the Apache Kafka connector.",
              "No action is required.  This is part of the normal operation of the server."),

The intent of this could be interpreted to be logging the consumer group id, so this should also change

^^ UPDATE: I later realised this was incorrect, so have reverted that change. it reports the server id which is used for the message key

I've done some testing and have a PR which should implement the above. For example I can check the consumer groups on my topics and see:

GROUP           TOPIC                                                                              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
itg.set.gi      egeria.omag.openmetadata.repositoryservices.cohort.myCohort.OMRSTopic.instances    0          0               0               0               consumer-itg.set.gi-5-f4922482-4a6e-4ff9-b739-128b8faf1771  /127.0.0.1      consumer-itg.set.gi-5
itg.set.gi      egeria.omag.server.mds1.omas.assetowner.outTopic                                   0          0               0               0               consumer-itg.set.gi-23-92b35315-d7f7-463d-afd4-7a406014871a /127.0.0.1      consumer-itg.set.gi-23
itg.set.gi      egeria.omag.server.mds1.omas.securitymanager.inTopic                               0          0               0               0               consumer-itg.set.gi-39-6ea33a09-5b98-4008-9ed4-58952b1db04c /127.0.0.1      consumer-itg.set.gi-39
itg.set.gi      egeria.omag.server.mds1.omas.communityprofile.outTopic                             0          0               0               0               consumer-itg.set.gi-17-fe9cabde-da1f-48ff-bd0d-06394c3f0de0 /127.0.0.1      consumer-itg.set.gi-17
itg.set.gi      egeria.omag.server.mds1.omas.subjectarea.outTopic                                  0          0               0               0               consumer-itg.set.gi-9-df7f86e1-ab50-428d-bb18-e2ac544bf75b  /127.0.0.1      consumer-itg.set.gi-9
itg.set.gi      egeria.omag.server.mds1.omas.assetlineage.outTopic                                 0          0               0               0               consumer-itg.set.gi-27-2a12c680-e0c7-411f-8d22-eb155e13a578 /127.0.0.1      consumer-itg.set.gi-27
itg.set.gi      egeria.omag.server.mds1.omas.assetmanager.outTopic                                 0          10              10              0               consumer-itg.set.gi-11-2aa702a8-ff7c-4a85-9ea1-8509402f3a91 /127.0.0.1      consumer-itg.set.gi-11
itg.set.gi      egeria.omag.server.mds1.omas.assetconsumer.outTopic                                0          0               0               0               consumer-itg.set.gi-33-855c7611-803c-43d8-87e7-68dc0342a74c /127.0.0.1      consumer-itg.set.gi-33
itg.set.gi      egeria.omag.server.mds1.omas.governanceengine.outTopic                             0          0               0               0               consumer-itg.s

In this case 'itg.set.gi' is the consumer group I specify in producer properties.

I'll open up a draft PR and check out the other cases (ie no override, local.server.id override)

It may be important to say that this must not change the local.server.id as this property is also used as key of the Kafka messages.

You should now just be able to set group.id in the consumer properties - would this be sufficient? This is also standard kafka... ?
I did change the way local.server.id is setup in embedded connectors, but only in terms of not overwriting a value if provided for local.server.id

There's a container image from my pr on docker.io at planetf1/egeria:latest

You can try by setting:

 image:
   egeria:
     name: egeria
     namespace: planetf1
     registry: docker.io
     tag: latest

I've still got some testing to do - and will check the key as you mentioned

  • Setting 'local.server.id' will affect the key used for messages sent (before, this was also the server guid, now it picks up the default from the event bus defaults, but if not set the default there is still the server guid). It will also set the consumer group unless group.id is set
  • Setting 'producer properties/group.id' will only affect the group id. Again it will be picked up from event bus defaults, but default to the above if not set. (This is what you need)
  • This is resolved at config time, and can be updated by setting on each connection explicitly if needed
  • Setting 'local.server.id' will affect the key used for messages sent (before, this was also the server guid, now it picks up the default from the event bus defaults, but if not set the default there is still the server guid). It will also set the consumer group unless group.id is set

    • Setting 'producer properties/group.id' will only affect the group id. Again it will be picked up from event bus defaults, but default to the above if not set. (This is what you need)

I suppose you mean the consumer properties and not the producer properties.

* This is resolved at config time, and can be updated by setting on each connection explicitly if needed

You must be aware that if you change the group id at runtime you will reset the offset and the server will process all existing events again. I don't know if this has any side-effects.

I tested the fix. When I set local.server.id everything works as expected. The Kafka messages have the local.server.idas key and the consumer group id is also set to the local.server.id

BUT if I only set the group.id in the consumer group properties the Kafka messages have an empty key. Furthermore I get a NullPointerException in the log:

Thu Jun 30 06:28:23 GMT 2022 mds1 Error OCF-KAFKA-TOPIC-CONNECTOR-0008 The connector listening on topic itg.egeria.openmetadata.repositoryservices.cohort.mds.OMRSTopic.registration received an unexpected exception java.lang.NullPointerException from Apache Kafka.  The message in the exception was null

@juergenhemelt thanks for the review. I pushed a new image to docker.io. make sure you also set
pullPolicy: Always
as otherwise your runtime may have cached the previous version, and I reused the same tag

[INFO] 3.11-SNAPSHOT: digest: sha256:67f2bc173a39f3c85eb1385daca965ab1ca8cd959a2527bd21ead30de0c9c94d size: 904

Tests have been successful, issue can be closed. Thanks for your support @planetf1 !

Great to hear. Thanks. I'll do some doc updates in the coming days

Transferring to docs repo for completion