/flume-plugin-amqp

A plugin for Flume that allows you to use an AMQP broker as a source.

Primary LanguageJava

flume-plugin-amqp

The flume-amqp-plugin allows you to use an AMQP broker as a Flume Source.

Installation

  1. Run mvn package

  2. Copy the resulting flume-plugin-amqp-${VERSION}-jar-with-dependencies.jar to your Flume lib directory (Default: /usr/lib/flume/lib)

  3. Modify the Flume site config (Default: /etc/flume/conf/flume-site.xml)

<property>
  <name>flume.plugin.classes</name>
  <value>amqp.AmqpEventSource</value>
</property>

Usage

The AMQP Event Source will take message from an AMQP broker and create Flume events from the message's body.

The only required argument is the exchangeName parameter. Note that each parameter needs to be enclosed in quotes because the plugin uses named parameters for setting up the source.

amqp("exchangeName=exchangeName",
     "[,host=host] "
     "[,port=port] "
     "[,virtualHost=virtualHost] "
     "[,userName=user] "
     "[,password=password] "
     "[,exchangeType=direct] "
     "[,durableExchange=false] "
     "[,queueName=queueName] "
     "[,durableQueue=false] "
     "[,exclusiveQueue=false] "
     "[,autoDeleteQueue=false] "
     "[,bindings=binding1,binding2,bindingN]
     "[,useMessageTimestamp=false]
     "[,keystoreFile=/path/to/file.jks]
     "[,keystorePassword=password]
     "[,truststoreFile=/path/to/file.jks]
     "[,truststorePassword=password]
     "[,ciphers=TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA])"
  • exchangeName - this is the name of the AMQP exchange we are getting messages from.
  • host - the host name or IP address of the broker. Default to localhost when not specified.
  • port - the port on which the broker is accepting connections. Defaults to 5672 when not specified.
  • virtualHost - the virtual host to use when connecting to the broker. Default to "/" when not specified.
  • userName - the AMQP user name to use when connecting to the broker. Defaults to "guest" when not specified.
  • password - the password to use when connecting to the broker. Defaults to "guest" when not specified.
  • exchangeType - the type exchange. Valid types are direct, fanout, topic, and header. Defaults to direct when not specified.
  • durableExchange - true if we are declaring a durable exchange (the exchange will survive a server restart). Defaults to false.
  • queueName - if left unspecified, the server chooses a name and provides this to the client. Generally, when applications share a message queue they agree on a message queue name beforehand, and when an application needs a message queue for its own purposes, it lets the server provide a name.
  • durableQueue - if true, the message queue remains present and active when the server restarts. It may lose transient messages if the server restarts.
  • exclusiveQueue - if true, the queue belongs to the current connection only, and is deleted when the connection closes.
  • autoDeleteQueue - true if we are declaring an autodelete queue (server will delete it when no longer in use).
  • bindings - comma separated list of strings that will be used to bind the queue to the exchange. This is not required for certain exchange types.
  • useMessageTimestamp - if true, the timestamp for the Flume event will be based on the timestamp from the AMQP message.
  • keystoreFile - HDFS path to a JKS formatted keystore
  • keystorePassword - Password to load the keystore
  • truststoreFile = HDFS path to a JKS formatted keystore
  • truststorePassword - Password to load the keystore
  • ciphers - Comma separated list of ciphers to use for this connection

Examples

  1. Broker is on same node as flume agent, direct exchange type and two bindings

    amqp("exchangeName=alertMessages", "bindings=system, application")

  2. Broker is on different machine, direct exchange type, shared queue

    amqp("exchangeName=stockTrades", "host=10.23.224.2", "bindings=IBM,GOOG,MSFT")