Why are the events received from Kafka Source not in CloudEvents format?
Closed this issue · 0 comments
liuyuxuan0723 commented
I followed the documentation [here](https://knative.dev/docs/eventing/sources/kafka-source/#create-a-service) and successfully deployed the KafkaSource. However, the events I am receiving are in the raw Kafka message format, not the standard CloudEvents format as described in the documentation, which states that the events should be sent in CloudEvents format. Am I missing some configuration?
Configuration:
-
KafkaSource
apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: creationTimestamp: "2024-11-01T08:36:54Z" finalizers: - kafkasources.sources.knative.dev generation: 1 labels: env: prod eventTrigger: fnt-22471-3e3b121ee7b73079 svc: event-trigger-consumer name: fnt-22471-3e3b121ee7b73079 namespace: faasprod-prod resourceVersion: "2447964835" uid: 1caf2488-d808-4b7e-8e69-479421d55daa spec: bootstrapServers: - xx - xx - xx consumers: 1 initialOffset: latest net: sasl: password: {} type: {} user: {} tls: caCert: {} cert: {} key: {} ordering: ordered sink: uri: http://kafka-liuyuxuan-test.faasprod-prod.svc.cluster.local/ topics: - yanbo-test1 status: conditions: - lastTransitionTime: "2024-11-01T08:36:55Z" status: "True" type: ConsumerGroup - lastTransitionTime: "2024-11-01T08:36:55Z" status: "True" type: Ready - lastTransitionTime: "2024-11-01T08:36:55Z" status: "True" type: SinkProvided consumers: 1 observedGeneration: 1 placements: - podName: kafka-source-dispatcher-0 vreplicas: 1 selector: eventing.knative.dev/source=kafka-source-controller,eventing.knative.dev/sourceName=fnt-22471-3e3b121ee7b73079 sinkUri: http://kafka-liuyuxuan-test.faasprod-prod.svc.cluster.local/
-
My Event Display Sink
package main import ( "fmt" "io" "github.com/gin-gonic/gin" ) func main() { r := gin.Default() r.POST("/", func(c *gin.Context) { fmt.Println("Header:", c.Request.Header) data, _ := io.ReadAll(c.Request.Body) fmt.Println("Body:", string(data)) c.JSON(200, gin.H{ "message": "hello world", }) }) r.GET("/", func(c *gin.Context) { c.JSON(200, gin.H{ "message": "hello world", }) }) r.Run(":8080") }
Expected Behavior:
I expect to receive events in the standard CloudEvents format, with the raw Kafka message being placed in the data
field of the CloudEvent.
Environment:
eventing-kafka-controller
: v1.11.2eventing-kafka-source
: v1.11.2