uberbrodt/kcl_ex

Base64 decoding the received records

Opened this issue ยท 2 comments

Hey, I've noticed that the incoming messages are base64 encoded, which is still usable ๐Ÿ‘ , however, the kinesis adapter defines a parser https://github.com/ex-aws/ex_aws_kinesis/blob/v2.0.1/lib/ex_aws/kinesis.ex#L68 which I assumed should be invoked somewhere?

I'm not 100% familiar with ex_aws conventions, and I could not find where ex_aws uses the parser, but given that it is available at the stage here https://github.com/uberbrodt/kcl_ex/blob/master/lib/kinesis_client/kinesis.ex#L66, in the ExAws.Operation.JSON struct, should the messages be decoded there too?

Are you sure it's not decoding? They should be decoded when ExAWS makes the request?

Screen.Recording.2021-03-10.at.16.29.14.mov

My consumer hanlder looks like this:

  def handle_message(_processor_name, message, _context) do
    message
    |> Message.update_data(&process_data/1)
  end

  defp process_data(raw_data) do
    IO.inspect(raw_data |> inspect(), label: "Raw:")

    IO.inspect(raw_data |> Base.decode64!() |> inspect(), label: "base64 decoded:")
  end

and my config

    opts = [
      stream_name: stream_name,
      app_name: app_name,
      shard_consumer: KinesisConsumer.Consumer,
      processors: [
        default: [
          concurrency: 1,
          min_demand: 100,
          max_demand: 1000
        ]
      ],
      batchers: [
        default: [
          batch_timeout: 5_000
        ]
      ]
    ]

    children = [
      {KinesisClient.Stream, opts}
    ]