fluent/fluentd-docs-gitbook

Failure to resend secondary file through "fluent-cat command"

goslim56 opened this issue · 11 comments

I am sending data to kafka through fluentd config as below

I am accumulating logs of data that failed to be transmitted through secondary_file.

I followed the method you guided for retransmission, but only the phrase "Input must be a map (got Array)" is displayed.

Which part did I make a mistake?

====How i used flunetd-cat====
cat /home/temp/eventqueue.log.0 | fluent-cat --format msgpack --host 127.0.0.1 --port 24224 tag

=====my fluentd config======
image

Is /home/temp/eventqueue.log.0 saved in msgpack format?
I guess that it may be saved as json format.

Is it possible to save secondary_file in json format?
I tried to save it in not msgpack format but json format, however couldn't find a way.
If you know how, please share it.

Thanks,
If the format of /home/temp/eventqueue.log.0 is msgpack, it is expected.
so this failure may be caused by another reason.

This error is raised from

https://github.com/fluent/fluentd/blob/2cbb650b748c38602898af58c2d0eb251946a230/lib/fluent/command/cat.rb#L157

raise ArgumentError, "Input must be a map (got #{record.class})"

so there is something weird with /home/temp/eventqueue.log.0 🤔

It is expected to be caused by requesting multiple messages in the form of an array in /home/temp/eventqueue.log.0 to "fluentd-cat" (because chunk_id is made by Kafka topic unit).

Is there a way to retransmit the secondary_file consisting of multiple messages rather than a single message?

Then, it may be better to use https://github.com/fluent/fluent-logger-ruby with Fluent::MessagePackFactory.

require 'fluent-logger'
require 'fluent/msgpack_factory'

host = "localhost"
port = 24224
file = ARGV.first
File.open(file) do |f|
  begin
    u = Fluent::MessagePackFactory.msgpack_unpacker(f)
    u.each do |obj|
      time = obj.delete('time')
      logger = Fluent::Logger::FluentLogger.new(nil, :host => host, :port => port)
      logger.post_with_time('tag', obj, time)
    end
  rescue EOFError => e
  end
end

I used python's "msgpack.Unpacker" to retransmit after decoding.

Thanks for the kind explanation.

It may be useful if fluent-cat supports these files. 🤔

It should be fixed by fluent/fluentd#3368

fluent/fluentd#3368 has been merged 👍

Thank you so much for reflecting my opinion.
I'm really happy that I think I contributed to your source.