repeatedly/fluent-plugin-netflow

Warning: Skip unsupported field, even when field is defined in Netflow definitions yaml

Dieff opened this issue · 8 comments

Dieff commented

I'm trying to use this plugin to process Netflow v9 data from a Ubiquiti EdgeRouter Pro. When I start the plugin, I continually receive the error "[warn]: #0 Skip unsupported field type=201 length=4". This continues no matter how long I run fFuentd, and no netflow records are outputted.

I'm using the latest Fluentd Docker image, and the Dockerfile command:

RUN apk add --no-cache --update --virtual .build-deps build-base ruby-dev \
  && gem install fluent-plugin-elasticsearch \
  && gem install fluent-plugin-netflow \
  && gem sources --clear-all \
  && apk del .build-deps \
  && rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem

to install this plugin.

When I saw the error, I added the "201" field to a Netflow custom definitions file, like so:

  201:
  - 4
  - :mpls_label_stack_octets

Even after restarting fluentd (many times), I still recieve the same error. It's almost as if the plugin is ignoring the field definitions. I have also tried using :skip on the field with no luck. I have confirmed that the config file is being read, as putting invalid syntax in the fields file will crash the plugin.

Here is my fluentd config

<source>
  @type netflow
  tag netflow
  bind 0.0.0.0
  port 2055
  versions [9]
  definitions /fluentd/etc/netflow_fields.yaml
  switched_times_from_uptime yes
</source>

<match netflow>
  @type elasticsearch
  host elasticsearch
  port 9200
  index_name netflow
  user XXXXXXXXXX
  password XXXXXXXXX
  include_timestamp true
</match>

And here is my netflow_fields.yaml

---
option:
  1:
  - 4
  - :in_bytes
  2:
  - 4
  - :in_pkts
  3:
  - 4
  - :flows
  4:
  - :uint8
  - :protocol
  5:
  - :uint8
  - :src_tos
  6:
  - :uint8
  - :tcp_flags
  7:
  - :uint16
  - :l4_src_port
  8:
  - :ip4_addr
  - :ipv4_src_addr
  9:
  - :uint8
  - :src_mask
  10:
  - 2
  - :input_snmp
  11:
  - :uint16
  - :l4_dst_port
  12:
  - :ip4_addr
  - :ipv4_dst_addr
  13:
  - :uint8
  - :dst_mask
  14:
  - 2
  - :output_snmp
  15:
  - :ip4_addr
  - :ipv4_next_hop
  16:
  - 2
  - :src_as
  17:
  - 2
  - :dst_as
  18:
  - :ip4_addr
  - :bgp_ipv4_next_hop
  19:
  - 4
  - :mul_dst_pkts
  20:
  - 4
  - :mul_dst_bytes
  21:
  - :uint32
  - :last_switched
  22:
  - :uint32
  - :first_switched
  23:
  - 4
  - :out_bytes
  24:
  - 4
  - :out_pkts
  25:
  - :uint16
  - :min_pkt_length
  26:
  - :uint16
  - :max_pkt_length
  27:
  - :ip6_addr
  - :ipv6_src_addr
  28:
  - :ip6_addr
  - :ipv6_dst_addr
  29:
  - :uint8
  - :ipv6_src_mask
  30:
  - :uint8
  - :ipv6_dst_mask
  31:
  - 3
  - :ipv6_flow_label
  32:
  - :uint16
  - :icmp_type
  33:
  - :uint8
  - :mul_igmp_type
  34:
  - :uint32
  - :sampling_interval
  35:
  - :uint8
  - :sampling_algorithm
  36:
  - :uint16
  - :flow_active_timeout
  37:
  - :uint16
  - :flow_inactive_timeout
  38:
  - :uint8
  - :engine_type
  39:
  - :uint8
  - :engine_id
  40:
  - 4
  - :total_bytes_exp
  41:
  - 4
  - :total_pkts_exp
  42:
  - 4
  - :total_flows_exp
  43:
  - :skip
  44:
  - :ip4_addr
  - :ipv4_src_prefix
  45:
  - :ip4_addr
  - :ipv4_dst_prefix
  46:
  - :uint8
  - :mpls_top_label_type
  47:
  - :uint32
  - :mpls_top_label_ip_addr
  48:
  - 1
  - :flow_sampler_id
  49:
  - :uint8
  - :flow_sampler_mode
  50:
  - :uint32
  - :flow_sampler_random_interval
  51:
  - :skip
  52:
  - :uint8
  - :min_ttl
  53:
  - :uint8
  - :max_ttl
  54:
  - :uint16
  - :ipv4_ident
  55:
  - :uint8
  - :dst_tos
  56:
  - :mac_addr
  - :in_src_mac
  57:
  - :mac_addr
  - :out_dst_mac
  58:
  - :uint16
  - :src_vlan
  59:
  - :uint16
  - :dst_vlan
  60:
  - :uint8
  - :ip_protocol_version
  61:
  - :uint8
  - :direction
  62:
  - :ip6_addr
  - :ipv6_next_hop
  63:
  - :ip6_addr
  - :bgp_ipv6_next_hop
  64:
  - :uint32
  - :ipv6_option_headers
  65:
  - :skip
  66:
  - :skip
  67:
  - :skip
  68:
  - :skip
  69:
  - :skip
  70:
  - :mpls_label
  - :mpls_label_1
  71:
  - :mpls_label
  - :mpls_label_2
  72:
  - :mpls_label
  - :mpls_label_3
  73:
  - :mpls_label
  - :mpls_label_4
  74:
  - :mpls_label
  - :mpls_label_5
  75:
  - :mpls_label
  - :mpls_label_6
  76:
  - :mpls_label
  - :mpls_label_7
  77:
  - :mpls_label
  - :mpls_label_8
  78:
  - :mpls_label
  - :mpls_label_9
  79:
  - :mpls_label
  - :mpls_label_10
  80:
  - :mac_addr
  - :in_dst_mac
  81:
  - :mac_addr
  - :out_src_mac
  82:
  - :string
  - :if_name
  83:
  - :string
  - :if_desc
  84:
  - :string
  - :sampler_name
  89:
  - :uint8
  - :forwarding_status
  91:
  - :uint8
  - :mpls_prefix_len
  95:
  - 4
  - :app_id
  150:
  - :uint32
  - :flowStartSeconds
  151:
  - :uint32
  - :flowEndSeconds
  152:
  - :uint64
  - :flowStartMilliseconds
  153:
  - :uint64
  - :flowEndMilliseconds
  154:
  - :uint64
  - :flowStartMicroseconds
  155:
  - :uint64
  - :flowEndMicroseconds
  156:
  - :uint64
  - :flowStartNanoseconds
  157:
  - :uint64
  - :flowEndNanoseconds
  201:
  - 4
  - :mpls_label_stack_octets
  234:
  - :uint32
  - :ingress_vrf_id
  235:
  - :uint32
  - :egress_vrf_id
  236:
  - :string
  - :vrf_name

scope:
  1:
  - :ip4_addr
  - :system
  2:
  - :skip
  3:
  - :skip
  4:
  - :skip
  5:
  - :skip

A capture in Wireshark of incoming netflow packets. Note the type 201 field.
netflow

Hmm... that's weird. Do you have an example code or something to reproduce the problem?
I don't have EdgeRouter Pro so I can't test this issue.

Dieff commented

I have a packet capture from the incoming packets if that would be helpful. It was used in the screenshot above.

You could replay the packets with tcpreplay -i lo netflow.pcap. I think you could then set up networking to direct the packets at fluentd.

Here is a link
netflow.pcap.gz
Here is an uncompressed link, will expire after 1 week
https://send.firefox.com/download/951d7b8c21c5281d/#ca2diSKPguLN_1t2u9dHDg

I do have the field of type 201 correctly defined in the netflow_fields.yaml file? Let me know if other info would be more helpful.

I dumped the content of @definitions and it contains 201 value correctly so the mismatch is type or something.
I tried tcpreplay on my Mac but it didn't work yet. Do I need to update pcap's source or destination for localhost testing?

Hi, I am experiencing the same issue. I copy pasted netflow yaml file with paloalto definitions and fluentd netflow keeps skipping the custom fields

ttyv commented

@repeatedly
i fixed problem with

➤ git diff
diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb
index b71495b..1d54ea2 100644
--- a/lib/fluent/plugin/parser_netflow.rb
+++ b/lib/fluent/plugin/parser_netflow.rb
@@ -40,7 +40,7 @@ module Fluent
         if @definitions
           raise Fluent::ConfigError, "definitions file #{@definitions} doesn't exist" unless File.exist?(@definitions)
           begin
-            @template_fields['option'].merge!(YAML.load_file(@definitions))
+            @template_fields['option'].merge!(YAML.load_file(@definitions)['option'])
           rescue => e
             raise Fluent::ConfigError, "Bad syntax in definitions file #{@definitions}, error_class = #{e.class.name}, error = #{e.message}"
           end

configs:

# cat fluent.conf 
<source>
  @type netflow
  tag netflow.test


  # optional parameters
  bind 10.0.10.40
#  port 2055
#  cache_ttl 6000
  versions [5, 9]
  definitions /etc/fluent/mikrotik_fields.yaml
</source>


<match **>
  @type stdout
</match>
# cat mikrotik_fields.yaml 
---
option:
  225:
  - :ip4_addr
  - :postNATSourceIPv4Address
  226:
  - :ip4_addr
  - :postNATDestinationIPv4Address
  227:
  - :uint16
  - :postNAPTSourceTransportPort
  228:
  - :uint16
  - :postNAPTDestinationTransportPort

before fix you get option dict inside option dict (@template_fields dump)

{"option"=>{1=>[4, :in_bytes], 2=>[4, :in_pkts], 3=>[4, :flows], 4=>[:uint8, :protocol], 5=>[:uint8, :src_tos], 6=>[:uint8, :tcp_flags], 7=>[:uint16, :l4_src_port], 8=>[:ip4_addr, :ipv4_src_addr], 9=>[:uint8, :src_mask], 10=>[2, :input_snmp], 11=>[:uint16, :l4_dst_port], 12=>[:ip4_addr, :ipv4_dst_addr], 13=>[:uint8, :dst_mask], 14=>[2, :output_snmp], 15=>[:ip4_addr, :ipv4_next_hop], 16=>[2, :src_as], 17=>[2, :dst_as], 18=>[:ip4_addr, :bgp_ipv4_next_hop], 19=>[4, :mul_dst_pkts], 20=>[4, :mul_dst_bytes], 21=>[:uint32, :last_switched], 22=>[:uint32, :first_switched], 23=>[4, :out_bytes], 24=>[4, :out_pkts], 25=>[:uint16, :min_pkt_length], 26=>[:uint16, :max_pkt_length], 27=>[:ip6_addr, :ipv6_src_addr], 28=>[:ip6_addr, :ipv6_dst_addr], 29=>[:uint8, :ipv6_src_mask], 30=>[:uint8, :ipv6_dst_mask], 31=>[3, :ipv6_flow_label], 32=>[:uint16, :icmp_type], 33=>[:uint8, :mul_igmp_type], 34=>[:uint32, :sampling_interval], 35=>[:uint8, :sampling_algorithm], 36=>[:uint16, :flow_active_timeout], 37=>[:uint16, :flow_inactive_timeout], 38=>[:uint8, :engine_type], 39=>[:uint8, :engine_id], 40=>[4, :total_bytes_exp], 41=>[4, :total_pkts_exp], 42=>[4, :total_flows_exp], 43=>[:skip], 44=>[:ip4_addr, :ipv4_src_prefix], 45=>[:ip4_addr, :ipv4_dst_prefix], 46=>[:uint8, :mpls_top_label_type], 47=>[:uint32, :mpls_top_label_ip_addr], 48=>[1, :flow_sampler_id], 49=>[:uint8, :flow_sampler_mode], 50=>[:uint32, :flow_sampler_random_interval], 51=>[:skip], 52=>[:uint8, :min_ttl], 53=>[:uint8, :max_ttl], 54=>[:uint16, :ipv4_ident], 55=>[:uint8, :dst_tos], 56=>[:mac_addr, :in_src_mac], 57=>[:mac_addr, :out_dst_mac], 58=>[:uint16, :src_vlan], 59=>[:uint16, :dst_vlan], 60=>[:uint8, :ip_protocol_version], 61=>[:uint8, :direction], 62=>[:ip6_addr, :ipv6_next_hop], 63=>[:ip6_addr, :bgp_ipv6_next_hop], 64=>[:uint32, :ipv6_option_headers], 65=>[:skip], 66=>[:skip], 67=>[:skip], 68=>[:skip], 69=>[:skip], 70=>[:mpls_label, :mpls_label_1], 71=>[:mpls_label, :mpls_label_2], 72=>[:mpls_label, :mpls_label_3], 73=>[:mpls_label, :mpls_label_4], 74=>[:mpls_label, :mpls_label_5], 75=>[:mpls_label, :mpls_label_6], 76=>[:mpls_label, :mpls_label_7], 77=>[:mpls_label, :mpls_label_8], 78=>[:mpls_label, :mpls_label_9], 79=>[:mpls_label, :mpls_label_10], 80=>[:mac_addr, :in_dst_mac], 81=>[:mac_addr, :out_src_mac], 82=>[:string, :if_name], 83=>[:string, :if_desc], 84=>[:string, :sampler_name], 89=>[:uint8, :forwarding_status], 91=>[:uint8, :mpls_prefix_len], 95=>[4, :app_id], 150=>[:uint32, :flowStartSeconds], 151=>[:uint32, :flowEndSeconds], 152=>[:uint64, :flowStartMilliseconds], 153=>[:uint64, :flowEndMilliseconds], 154=>[:uint64, :flowStartMicroseconds], 155=>[:uint64, :flowEndMicroseconds], 156=>[:uint64, :flowStartNanoseconds], 157=>[:uint64, :flowEndNanoseconds], 234=>[:uint32, :ingress_vrf_id], 235=>[:uint32, :egress_vrf_id], 236=>[:string, :vrf_name], "option"=>{225=>[:ip4_addr, :postNATSourceIPv4Address], 226=>[:ip4_addr, :postNATDestinationIPv4Address], 227=>[:uint16, :postNAPTSourceTransportPort], 228=>[:uint16, :postNAPTDestinationTransportPort]}}, "scope"=>{1=>[:ip4_addr, :system], 2=>[:skip], 3=>[:skip], 4=>[:skip], 5=>[:skip]}}

this>

236=>[:string, :vrf_name], "option"=>{225=>[:ip4_addr, :postNATSourceIPv4Address], 226=>[:ip4_addr, :postNATDestinationIPv4Address], 227=>[:uint16, :postNAPTSourceTransportPort], 228=>[:uint16, :postNAPTDestinationTransportPort]}},

and after fix it merged correctly:

{"option"=>{1=>[4, :in_bytes], 2=>[4, :in_pkts], 3=>[4, :flows], 4=>[:uint8, :protocol], 5=>[:uint8, :src_tos], 6=>[:uint8, :tcp_flags], 7=>[:uint16, :l4_src_port], 8=>[:ip4_addr, :ipv4_src_addr], 9=>[:uint8, :src_mask], 10=>[2, :input_snmp], 11=>[:uint16, :l4_dst_port], 12=>[:ip4_addr, :ipv4_dst_addr], 13=>[:uint8, :dst_mask], 14=>[2, :output_snmp], 15=>[:ip4_addr, :ipv4_next_hop], 16=>[2, :src_as], 17=>[2, :dst_as], 18=>[:ip4_addr, :bgp_ipv4_next_hop], 19=>[4, :mul_dst_pkts], 20=>[4, :mul_dst_bytes], 21=>[:uint32, :last_switched], 22=>[:uint32, :first_switched], 23=>[4, :out_bytes], 24=>[4, :out_pkts], 25=>[:uint16, :min_pkt_length], 26=>[:uint16, :max_pkt_length], 27=>[:ip6_addr, :ipv6_src_addr], 28=>[:ip6_addr, :ipv6_dst_addr], 29=>[:uint8, :ipv6_src_mask], 30=>[:uint8, :ipv6_dst_mask], 31=>[3, :ipv6_flow_label], 32=>[:uint16, :icmp_type], 33=>[:uint8, :mul_igmp_type], 34=>[:uint32, :sampling_interval], 35=>[:uint8, :sampling_algorithm], 36=>[:uint16, :flow_active_timeout], 37=>[:uint16, :flow_inactive_timeout], 38=>[:uint8, :engine_type], 39=>[:uint8, :engine_id], 40=>[4, :total_bytes_exp], 41=>[4, :total_pkts_exp], 42=>[4, :total_flows_exp], 43=>[:skip], 44=>[:ip4_addr, :ipv4_src_prefix], 45=>[:ip4_addr, :ipv4_dst_prefix], 46=>[:uint8, :mpls_top_label_type], 47=>[:uint32, :mpls_top_label_ip_addr], 48=>[1, :flow_sampler_id], 49=>[:uint8, :flow_sampler_mode], 50=>[:uint32, :flow_sampler_random_interval], 51=>[:skip], 52=>[:uint8, :min_ttl], 53=>[:uint8, :max_ttl], 54=>[:uint16, :ipv4_ident], 55=>[:uint8, :dst_tos], 56=>[:mac_addr, :in_src_mac], 57=>[:mac_addr, :out_dst_mac], 58=>[:uint16, :src_vlan], 59=>[:uint16, :dst_vlan], 60=>[:uint8, :ip_protocol_version], 61=>[:uint8, :direction], 62=>[:ip6_addr, :ipv6_next_hop], 63=>[:ip6_addr, :bgp_ipv6_next_hop], 64=>[:uint32, :ipv6_option_headers], 65=>[:skip], 66=>[:skip], 67=>[:skip], 68=>[:skip], 69=>[:skip], 70=>[:mpls_label, :mpls_label_1], 71=>[:mpls_label, :mpls_label_2], 72=>[:mpls_label, :mpls_label_3], 73=>[:mpls_label, :mpls_label_4], 74=>[:mpls_label, :mpls_label_5], 75=>[:mpls_label, :mpls_label_6], 76=>[:mpls_label, :mpls_label_7], 77=>[:mpls_label, :mpls_label_8], 78=>[:mpls_label, :mpls_label_9], 79=>[:mpls_label, :mpls_label_10], 80=>[:mac_addr, :in_dst_mac], 81=>[:mac_addr, :out_src_mac], 82=>[:string, :if_name], 83=>[:string, :if_desc], 84=>[:string, :sampler_name], 89=>[:uint8, :forwarding_status], 91=>[:uint8, :mpls_prefix_len], 95=>[4, :app_id], 150=>[:uint32, :flowStartSeconds], 151=>[:uint32, :flowEndSeconds], 152=>[:uint64, :flowStartMilliseconds], 153=>[:uint64, :flowEndMilliseconds], 154=>[:uint64, :flowStartMicroseconds], 155=>[:uint64, :flowEndMicroseconds], 156=>[:uint64, :flowStartNanoseconds], 157=>[:uint64, :flowEndNanoseconds], 234=>[:uint32, :ingress_vrf_id], 235=>[:uint32, :egress_vrf_id], 236=>[:string, :vrf_name], 225=>[:ip4_addr, :postNATSourceIPv4Address], 226=>[:ip4_addr, :postNATDestinationIPv4Address], 227=>[:uint16, :postNAPTSourceTransportPort], 228=>[:uint16, :postNAPTDestinationTransportPort]}, "scope"=>{1=>[:ip4_addr, :system], 2=>[:skip], 3=>[:skip], 4=>[:skip], 5=>[:skip]}}

this>

236=>[:string, :vrf_name], 225=>[:ip4_addr, :postNATSourceIPv4Address], 226=>[:ip4_addr, :postNATDestinationIPv4Address], 227=>[:uint16, :postNAPTSourceTransportPort], 228=>[:uint16, :postNAPTDestinationTransportPort]},

Bumping this because I have also just run into the same issue

@ttyv Could you send a patch?

ttyv commented

@repeatedly i did it #45 sorry im not familiar with github so i cant figure it out how to link pr to issue correctly ) also i dont write ruby so i could write it not best practice way