amazon-archives/logstash-input-dynamodb

Logstash-input-dynamoDB

bretd25 opened this issue · 23 comments

I am having trouble using logstash-input-dynamodb with multiple tables. Here is my configuration file

input {
dynamodb {
endpoint => "dynamodb.us-west-2.amazonaws.com"
streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com"
view_type => "new_and_old_images"
aws_access_key_id => "myAwsKey"
aws_secret_access_key => "MyAwsSecretKey"
perform_scan => "false"
table_name => "Table1"
}
dynamodb {
endpoint => "dynamodb.us-west-2.amazonaws.com"
streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com"
view_type => "new_and_old_images"
aws_access_key_id => "SameKeyAsAbove"
aws_secret_access_key => "SameKeyAsAbove"
perform_scan => "false"
table_name => "Table2"
}
}
output {
elasticsearch {
hosts => ["MyElasticSearchHost"]
ssl => "true"
}
stdout { }
}

The problem is only table2 is correctly streaming to Elastic Search and it is not streaming table1. I have changed the order and it is always the last table that works. I looked in some documentation and is says

"You can also configure the plugin to index multiple tables by adding additional dynamodb { } sections to the input section."

Any idea why only the last table is working?

Bret

@bretd25 do you need both tables to go to the same elasticsearch index?.

I believe the problem is the output plugin. I've managed to do this but with a slight different setup.

Here's my configuration file:

input {
  dynamodb {
    type => "moderation"
    table_name => "backoffice_mim_moderation"
    perform_scan => true
    view_type => "new_image"
    endpoint => "dynamodb.us-east-1.amazonaws.com"
    streams_endpoint => "streams.dynamodb.us-east-1.amazonaws.com"
    aws_access_key_id => "xxxxxxxxxx"
    aws_secret_access_key => "xxxxxxxxxx"
  }
  dynamodb {
    type => "image"
    table_name => "backoffice_mim_image"
    perform_scan => true
    view_type => "new_image"
    endpoint => "dynamodb.us-east-1.amazonaws.com"
    streams_endpoint => "streams.dynamodb.us-east-1.amazonaws.com"
    aws_access_key_id => "xxxxxxxxx"
    aws_secret_access_key => "xxxxxxxx"
  }
}

filter {
  dynamodb {}
  mutate {
    remove_field => ["message"]
  }
}

output {
  if [type] == "moderation" {
      elasticsearch {
        host => "elasticsearch"
        index => "moderation"
        document_id => "%{image_id}"
        manage_template => true
        template => "/config/templates/moderation.json"
        template_name => "moderation"
      }
  }
  if [type] == "image" {
      elasticsearch {
        host => "elasticsearch"
        index => "image"
        document_id => "%{id}"
        manage_template => true
        template => "/config/templates/image.json"
        template_name => "image"
      }
  }
}

The reason why I did this is because I needed the tables to go to different elasticsearch indexes with different templates. I'm not sure what happens when you configure the output plugin as you did (basically without any configuration), but the configuration I've provided above it definitely works.

Hope it's useful for you

note: the dynamodb-filter plugin is something that we developed ourselves (https://github.com/mantika/logstash-filter-dynamodb) in order to get the documents indexed properly in ES.

@marcosnils Thank you very much for your response. I have changed my configuration file to

input {

dynamodb {
endpoint => "dynamodb.us-west-2.amazonaws.com"
streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com"
view_type => "new_image"
aws_access_key_id => "XXXXXX"
aws_secret_access_key => "XXXXX"
perform_scan => "false"
type => "person"
table_name => "Person"
}

dynamodb {
endpoint => "dynamodb.us-west-2.amazonaws.com"
streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com"
view_type => "new_image"
aws_access_key_id => "XXXX"
aws_secret_access_key => "XXXXX"
perform_scan => "false"
type => "test"
table_name => "Test"
}

}
output {
if [type] == "person" {
elasticsearch {
hosts => ["XXXX.us-west-2.es.amazonaws.com:443"]
ssl => "true"
index => "person"
document_id => "%person_id"
}
}
if [type] == "test" {
elasticsearch {
hosts => ["XXXXX.us-west-2.es.amazonaws.com:443"]
ssl => "true"
index => "test"
document_id => "%test_id"
}
}

 stdout { }

}

If I insert a document into person I do not see the insertion in the console window. If i insert a document into test then the document gets inserted and i can see it in the console window.

@bretd25 which logstash version are you running?

2.2

Hmm that might be the problem. I use 1.5.6...

If you have Docker you can use our image to see if it works:

https://hub.docker.com/r/mantika/logstash-dynamodb-streams/

@marcosnils I do not have Docker installed, but I actually used some of your docker configuration to get this far.

@bretd25 seems like it might be a compatibility issue with the logstash version as it works fine for me with 1.5.6. Unfortunately I can't provide further assistance as I haven't tried with logstash 2.x

@marcosnils In order to get it to work i had to give it a

checkpointer => "logstash_input_dynamodb_cptr_Test"

Is seems like the default checkpointer for both tables was not working.

@marcosnils Thanks for all your help. I greatly appreciate it.

Bret

@bretd25 interesting. In with logstash 1.5.6 checkpointer tables were created automatically depending on the supplied table name. Weird that it was using the same checkpointer for both tables.

Anyways, amazing that you figured out!.

hi, @marcosnils I use your image, it is working for update and insert, but for remove it is not removing the item from elastic search, is there any additional config I need to setup for that?

@sidaYewno this is because the default action of the logstash ES output plugin is index (https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-action) if you want to remove your items as well, you have to define a delete action as well for the ES logstash output plugin.

hi @marcosnils I tried define delete action, but it is deleting the item regardless of the event, even when I am creating or updating, it is deleting the item. i added action=>"delete" in my command,am i doing it wrong? or is there other config I need to add?

@sidaYewno have you checked #12 ?

@marcosnils I tried this
input {
dynamodb{
endpoint => "dynamodb.us-west-2.amazonaws.com"
streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com"
view_type => "new_and_old_images"
aws_access_key_id => "access"
aws_secret_access_key => "secret"
table_name => "companies"
}
}
filter {
dynamodb {
}
}
output {
if [eventName] == "REMOVE" {
elasticsearch {
host => "search-endpoint.es.amazonaws.com"
document_id => "%{[keys][company_uuid]}"
protocol => "http"
port => "80"
index => "companies"
action => "delete"
}
stdout { }
} else {
elasticsearch {
host => "search-endpoint.es.amazonaws.com"
document_id => "%{[keys][company_uuid]}"
protocol => "http"
port => "80"
index => "companies"
action => "index"
}
stdout { }
}
}

it says config not valid
anything I am doing wrong?

@sidaYewno if you launch logstash with debug enabled it should say why the config is not valid.

error is Expected one of #, input, filter, output at line 37, column 1 (byte 948) after

@marcosnils how do I do that? I run it via your docker image, what command should I use to pass debug flag

which docker image are you using? v1 or v2?

@marcosnils I am using docker run mantika/logstash-dynamodb-streams this command, not sure which image I am using, should I use v2? how to do that

@marcosnils so I was able to get the config to run, but now it is not outputing anything for any event, here is my config
docker run mantika/logstash-dynamodb-streams -e '
input {
dynamodb{
endpoint => "..."
streams_endpoint => "..."
view_type => "..."
table_name => "..."
perform_scan=>false
}
}
filter {
dynamodb {}
}
output {
if [eventName] == "REMOVE"{
elasticsearch {
hosts => "..."
user => "..."
password => "..."
index=>"..."
document_id=>"..."
action=>"delete"
}
stdout { }
}else{
elasticsearch {
hosts => "..."
user => "..."
password => "..."
index=>"..."
document_id=>"..."
action=>"index"
}
stdout { }
}
}'

@sidaYewno seems like the config is right. I'd suggest setting logstash in debug mode -D flag and check if you see something in the logs.