knative-extensions/eventing-kafka-broker

Why are the events received from Kafka Source not in CloudEvents format?

Closed this issue · 0 comments

I followed the documentation [here](https://knative.dev/docs/eventing/sources/kafka-source/#create-a-service) and successfully deployed the KafkaSource. However, the events I am receiving are in the raw Kafka message format, not the standard CloudEvents format as described in the documentation, which states that the events should be sent in CloudEvents format. Am I missing some configuration?

Configuration:

  • KafkaSource

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      creationTimestamp: "2024-11-01T08:36:54Z"
      finalizers:
      - kafkasources.sources.knative.dev
      generation: 1
      labels:
        env: prod
        eventTrigger: fnt-22471-3e3b121ee7b73079
        svc: event-trigger-consumer
      name: fnt-22471-3e3b121ee7b73079
      namespace: faasprod-prod
      resourceVersion: "2447964835"
      uid: 1caf2488-d808-4b7e-8e69-479421d55daa
    spec:
      bootstrapServers:
      - xx
      - xx
      - xx
      consumers: 1
      initialOffset: latest
      net:
        sasl:
          password: {}
          type: {}
          user: {}
        tls:
          caCert: {}
          cert: {}
          key: {}
      ordering: ordered
      sink:
        uri: http://kafka-liuyuxuan-test.faasprod-prod.svc.cluster.local/
      topics:
      - yanbo-test1
    status:
      conditions:
      - lastTransitionTime: "2024-11-01T08:36:55Z"
        status: "True"
        type: ConsumerGroup
      - lastTransitionTime: "2024-11-01T08:36:55Z"
        status: "True"
        type: Ready
      - lastTransitionTime: "2024-11-01T08:36:55Z"
        status: "True"
        type: SinkProvided
      consumers: 1
      observedGeneration: 1
      placements:
      - podName: kafka-source-dispatcher-0
        vreplicas: 1
      selector: eventing.knative.dev/source=kafka-source-controller,eventing.knative.dev/sourceName=fnt-22471-3e3b121ee7b73079
      sinkUri: http://kafka-liuyuxuan-test.faasprod-prod.svc.cluster.local/
    
  • My Event Display Sink

    package main
    
    import (
    	"fmt"
    	"io"
    
    	"github.com/gin-gonic/gin"
    )
    
    func main() {
    	r := gin.Default()
    
    	r.POST("/", func(c *gin.Context) {
    		fmt.Println("Header:", c.Request.Header)
    		data, _ := io.ReadAll(c.Request.Body)
    		fmt.Println("Body:", string(data))
    		c.JSON(200, gin.H{
    			"message": "hello world",
    		})
    	})
    
    	r.GET("/", func(c *gin.Context) {
    		c.JSON(200, gin.H{
    			"message": "hello world",
    		})
    	})
    	r.Run(":8080")
    }

image

Expected Behavior:

I expect to receive events in the standard CloudEvents format, with the raw Kafka message being placed in the data field of the CloudEvent.

Environment:

  • eventing-kafka-controller: v1.11.2
  • eventing-kafka-source: v1.11.2