fluent/fluent-plugin-sql

undefined method `key' for #<Fluent::Plugin::Buffer::MemoryChunk:...>

toastyblast opened this issue · 3 comments

My FluentD throws the error in the title with the fluent.conf I've added to this issue. It only happens if I'm adding any additional tables apart from the default table, as I've detailed further down in this issue.

I have tried changing match patterns for the additional tables, altering messages, etc, but none of it works. From what I can see, this issue is solely caused by adding additional tables to the PSQL system. Why this happens I don't know, but it is an issue either way - Correct me if I'm wrong and made a mistake elsewhere, but why else would it work fine if I only have the default table section?

Relevant FluentD Docker container output:

...
fluentd-host   | 2020-04-16 14:50:53 +0000 [warn]: #0 got unrecoverable error in primary and no secondary error_class=NoMethodError error="undefined method `key' for #<Fluent::Plugin::Buffer::MemoryChunk:0x0000557f15b530f8>"
fluentd-host   |   2020-04-16 14:50:53 +0000 [warn]: #0 suppressed same stacktrace
fluentd-host   | 2020-04-16 14:50:53 +0000 [warn]: #0 bad chunk is moved to /tmp/fluent/backup/worker0/object_2abf89f9725c/5a3698f1e8b21a4f90d6595343765202.log
fluentd-host   | 2020-04-16 14:50:53.343591600 +0000 fluent.warn: {"error":"#<NoMethodError: undefined method `key' for #<Fluent::Plugin::Buffer::MemoryChunk:0x0000557f15b530f8>>","message":"got unrecoverable error in primary and no secondary error_class=NoMethodError error=\"undefined method `key' for #<Fluent::Plugin::Buffer::MemoryChunk:0x0000557f15b530f8>\""}
fluentd-host   | 2020-04-16 14:50:53.344299800 +0000 fluent.warn: {"message":"bad chunk is moved to /tmp/fluent/backup/worker0/object_2abf89f9725c/5a3698f1e8b21a4f90d6595343765202.log"}

The weirdest thing is that my FluentD works fine if I only have a default table defined. As soon as I define additional tables, even if I know for sure that the tables and column mappings are correct with the message contents, it all fails with the error I showed above. Further down I've added my fluent.conf with a comment on the bottom next to the table that breaks everything.

I'm running FluentD through the FluentD docker image at version fluent/fluentd:1.10.2-1.0 and PostgreSQL with their official Docker image of version 11.

Non-working fluent.conf:

## FluentD itself logs information that is useful, but not necessary to include in our console outputs at all times.
<label @FLUENT_LOG>
  ## Error logs made by FluentD should always be printed on the console and backed up in file.
  <match fluent.{warn,error,fatal}.**>
    @type copy
    <store>
      @type relabel
      @label @CONSOLE
    </store>
    <store>
      @type relabel
      @label @BACKUP
    </store>
  </match>
  ## All other logs by FluentD (trace, debug & info) should only go to backup files.
  <match fluent.**>
    @type relabel
    @label @BACKUP
  </match>
</label>

## Source of our input, this is where we really start. This input refers to the FluentD container in the docker-compose.
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

## Catch the newly tagged messages that "rewrite_tag_filter" re-emits. Only those with the "action" prefix tag we
##  want to do something with. In essence, this is how you implement an if/else in FluentD (although a little hack-y).
<match {action,passed}.**>
  @type route
  <route action.**>
    copy
    @label @ACTION_PARSE
  </route>
  <route passed.**>
    remove_tag_prefix passed
    @label @OUT
  </route>
</match>

## Messages coming from Core or Echo running locally have their field named "msg" instead of "log" - Rename it.
<filter **.local.**>
  @type record_modifier
  <record>
    log ${record["msg"]}
  </record>
  remove_keys msg
</filter>

## All other messages (so those we log) should be valid JSON strings, which we parse to JSON correctly.
## HOWEVER, this does NOT parse any fields that are JSON within the log! These you will have to manually filter, like
##  we for instance do for these with the "action_data" field.
<filter **>
  @type parser
  key_name log
  reserve_data true
  remove_key_name_field true
  <parse>
    @type json
    time_format %Y-%m-%dT%H:%M:%S.%L%z
  </parse>
</filter>

<match {echo,core}.**>
  @type rewrite_tag_filter
  ## If there is an "action_data" field then there'll be nested JSON fields that FluentD doesn't parse but we need split.
  ## Give it a tag so we can split it from the rest of the config when it's re-emitted.
  <rule>
    key action_data
    pattern /^(.+)$/
    tag action.${tag}
  </rule>
  ## Catch-all for every other echo or core message we may receive (since not all have "action_data" and fluentd
  ##  doesn't support conditional checks on if fields exist), as otherwise this match consumes them.
  ## You can comment this out if you don't want any messages other than those of broadcasts going to the outputs.
  <rule>
    key message
    pattern /^(.+)$/
    tag passed.${tag}
  </rule>
</match>

## Fallback for all other logs from tags we do not recognise or want to do anything specific with.
## Should currently not be reached, only here to catch messages of new applications not yet implemented, for instance.
<match **>
  @type relabel
  @label @OUT
</match>

<label @ACTION_PARSE>
  ## Action data is always JSON, which FluentD won't automatically parse first time, so we need to parse it manually.
  <filter **>
    @type parser
    key_name action_data
    reserve_data true
    remove_key_name_field true
    hash_value_field action_data_json
    <parse>
      @type json
    </parse>
  </filter>

  ## If there is action data, we know for sure the message field is also a JSON object.
  <filter **>
    @type parser
    key_name message
    reserve_data true
    remove_key_name_field false
    hash_value_field message_json
    <parse>
      @type json
    </parse>
  </filter>

  ## Finally output these messages to our different output channels.
  <match **>
    @type relabel
    @label @ACTION_OUT
  </match>
</label>

## Route action messages to different paths than the rest.
<label @ACTION_OUT>
  <match **>
    @type copy
    ## Output these messages to our PSQL server, which we shouldn't do with any of the others.
    <store>
      @type relabel
      @label @PSQL
    </store>
    ## Also output them to all other sources just like any other message, but remove the "action" tag prefix.
    <store>
      @type route
      <route **>
        remove_tag_prefix action
        @label @OUT
      </route>
    </store>
  </match>
</label>

## Route all parsed messages to our outputs.
<label @OUT>
  <match **>
    @type copy
    <store>
      @type relabel
      @label @BACKUP
    </store>
    <store>
      @type relabel
      @label @CONSOLE
    </store>
  </match>
</label>

## Print to the console of the FluentD container, but only at level DEBUG or above.
<label @CONSOLE>
  <match **>
    @type stdout
    @log_level debug
  </match>
</label>

## Output the logs to the PSQL database.
<label @PSQL>
  <match **>
    @type sql
    host PSQL_HOST
    username PSQL_USERNAME
    password PSQL_PASSWORD
    port PSQL_PORT
    database PSQL_DB
    adapter postgresql

    <table>
      table simple_test
      column_mapping 'time:created_at,message'
    </table>

    ## If this table section is commented out, everything works fine... But I need this table!!!
    <table action.**>
      table simple_action
      column_mapping 'time:created_at,message,message_id,gatling_run_id'
    </table>

    flush_interval 1s
  </match>
</label>

## Output to files on given path, split in folders by date and hour and further subfolders by tag (separate applications).
<label @BACKUP>
  <match **>
    @type file
    path /fluentd/log/%Y-%m-%d@%H/${tag}/logs
    <buffer tag,time>
      timekey 1h
      timekey_wait 10m
      timekey_use_utc true
      flush_at_shutdown true
    </buffer>
  </match>
</label>

table SQL configs:

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE simple_test (
    base_id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
    created_at TIMESTAMPTZ NOT NULL,
    message TEXT NOT NULL
);

CREATE TABLE simple_action (
    base_id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
    created_at TIMESTAMPTZ NOT NULL,
-- Even when I change these two UUIDs below, FluentD gives the same 'key' error.
    gatling_run_id uuid NOT NULL,
    message_id uuid NOT NULL,
    message TEXT NOT NULL
);

Example "action.**" message inside FluentD before reaching the @psql label:

{"level":"DEBUG","gatling_run_id":"f67a3c48-c940-4c02-a995-bb06660b2308","logger":"class com.sqills.s3m.commons.messagebroker.MessageBrokerHandler [S3M-ECHO]","message_id":"62e2932c-7c03-4400-a38a-beb8204ddd79","thread":"DefaultDispatcher-worker-1","message":"{\"type\":\"DELETED\",\"call_id\":\"6e9f61e3-1903-45de-a96f-02e6cf78f939_call-99\",\"object\":\"BOOKING\",\"content\":{\"agent_id\":13,\"booking_id\":\"a9b337f8-3694-4229-8dc0-cea1a6a0c0cf\"}}","action_data_json":{"flag":"RECEIVE","application":"S3M-ECHO","topic":"s3m.booking_topic"},"message_json":{"type":"DELETED","call_id":"6e9f61e3-1903-45de-a96f-02e6cf78f939_call-99","object":"BOOKING","content":{"agent_id":13,"booking_id":"a9b337f8-3694-4229-8dc0-cea1a6a0c0cf"}}}

Maybe, key method is for old v0.12 style.
I will check it and write the patch.

Ah alright, @repeatedly! What is the alternative for > 1.0.0 then? Because I can't find any alternative in the fluent-plugin-sql docs.

As far as I'm aware I shouldn't have anything to do with FluentD 0.12 - I'm using the Fluent/FluentD-1.10.2-1.0 container (current latest) with fluent-plugin-sql 1.1.1 (current latest), if that's of any help to you :)

Release v2.0.0. This should fix this issue.
Note that v2.0.0 uses different buffer format, so v2 can't reuse v1's buffer.