Base64 decoding the received records
Opened this issue ยท 2 comments
sashman commented
Hey, I've noticed that the incoming messages are base64 encoded, which is still usable ๐ , however, the kinesis adapter defines a parser https://github.com/ex-aws/ex_aws_kinesis/blob/v2.0.1/lib/ex_aws/kinesis.ex#L68 which I assumed should be invoked somewhere?
I'm not 100% familiar with ex_aws conventions, and I could not find where ex_aws uses the parser, but given that it is available at the stage here https://github.com/uberbrodt/kcl_ex/blob/master/lib/kinesis_client/kinesis.ex#L66, in the ExAws.Operation.JSON
struct, should the messages be decoded there too?
uberbrodt commented
Are you sure it's not decoding? They should be decoded when ExAWS makes the request?
sashman commented
Screen.Recording.2021-03-10.at.16.29.14.mov
My consumer hanlder looks like this:
def handle_message(_processor_name, message, _context) do
message
|> Message.update_data(&process_data/1)
end
defp process_data(raw_data) do
IO.inspect(raw_data |> inspect(), label: "Raw:")
IO.inspect(raw_data |> Base.decode64!() |> inspect(), label: "base64 decoded:")
end
and my config
opts = [
stream_name: stream_name,
app_name: app_name,
shard_consumer: KinesisConsumer.Consumer,
processors: [
default: [
concurrency: 1,
min_demand: 100,
max_demand: 1000
]
],
batchers: [
default: [
batch_timeout: 5_000
]
]
]
children = [
{KinesisClient.Stream, opts}
]