Connector instantiation requires information that may not be present at the moment
Closed this issue · 0 comments
sujuka99 commented
Example code:
class CustomApp(StreamsApp):
def inflate(self) -> list[PipelineComponent]:
return [self, *self.derive_connectors()]
def derive_connectors(self) -> list[PipelineComponent]:
connectors = []
for topic_name in [...]:
connectors.append(self.to_connector(topic_name))
return connectors
def to_connector(self, topic_name) -> KafkaSinkConnector:
name = f"{topic_name}"
return KafkaSinkConnector(
name=name,
config=self.config,
handlers=self.handlers,
app=KafkaConnectorConfig(
**{
"name": "${pipeline.name}-" + name, # using `self.full_name` results in an error
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", # Cannot be set in `defaults.yaml`
}
),
)
When initializing the connector in the above example, self.full_name == pipeline_name-connector_name
, but ${pipeline.name}-connector_name
is expected, i.e. self.prefix
is already substituted.
It can be circumvented by setting the name in KafkaConnectorConfig
to KafkaSinkConnector.model_fields["prefix"].default + name
, but it is a workaround.
Closely related to #412