not consuming existing flowlogs
rojomisin opened this issue · 10 comments
I have about 2 weeks of flow logs that I've collected into a log group, and I am able to spin this stack up successfully (thank you great work on this!)
But it will only index and visualize data that came into the log group after I spun up the stack, how do I get this system to grab everything in the log group for X number of days backwards?
Unfortunately this solution will only help you ingest log data from the time you spin up the stack. This is using a feature in CloudWatch Logs (CWL) called Subscriptions, which is a mechanism to stream real-time logs from CWL to another destination while the logs are getting ingested into CWL.
CWL has another feature to export archived data to S3, and in theory you could use this feature to populate your Elasticsearch cluster with historical data in your log group. However, as far as I know, there aren't any tools to facilitate that setup at the moment. I think the easiest way to set this up would be to write something that consumes the exported data from S3, and then insert them into the Kinesis stream (using the PutRecord API) using the same format that CWL uses. (The format is documented in the developer guide; See the Examples section). Then this stack would consume those records and insert them to your Elasticsearch cluster normally.
hi @dvassallo ok understood, so I've exported the data to s3 and sync'd it locally. I'm going to use the awscli and gzcat
to process these and put-record into kinesis. I'm kind of stuck on what the partition-key would be, how do I find this or is it arbitrary?
put-record --stream-name CWL-Elasticsearch-KinesisSubscriptionStream-1O1NMVORNI953 --partition-key 0 --data '2016-02-16T21:01:11.000Z 2 0123456789012 eni-abcd1234 5.6.7.8 1.2.3.4 35434 443 6 5 825 1455656471 1455656531 ACCEPT OK' --profile stg
I set it to 0 for better or worse for the moment.
This works though i'll dump in the whole set of .gz files and report back
Your choice of the partition key values is not important for this to function correctly, but if you have more than 1 Kinesis shard you may want to use a random value (ideally) for each PutRecord API call. Data sent with a particular partition key will end up going to a single Kinesis shard, and since Kinesis shards have limited bandwidth you may exceed the shard's capacity, especially if you start replaying data very quickly (which would result in ProvisionedThroughputExceededException on PutRecord calls).
BTW - If you think that whatever you end up scripting may benefit other users of this project, we would welcome a pull request for it! :) I think something like what you're doing with the AWS CLI could go in a "scripts" directory in this repo. No obligation obviously.
It's interesting you mention that throughput on PutRecord calls, my one liner worked, but was really slow, so I killed it. I was trying to do this quickly via...
dump records
gzcat -r ./data/*.gz >file.txt
see how many records there are
ruby -e 'arr1=ARGF.read;p arr1.split("\n").length' file.txt
845798
process them
ruby -e 'arr1=ARGF.read;arr1.split("\n").each {|line| cmd=`aws kinesis put-record --stream-name #{streamname} --partition-key 0 --data "#{line}" --profile stg`;puts cmd }' file.txt
i might have to look at the put-records
command to do in batch
this is very slow, ill reply back or create a pull request for a decent script should it work :)
Note that you can put more than one log event in a single Kinesis.PutRecord (singular) call. The "data" payload is already expected to contain an array of log events. Here's an example of how 3 log events would get serialized into a singe Kinesis record (posted via the data attribute of a single PutRecord call):
{
"owner": "123456789012",
"logGroup": "VPCFlowLogs",
"logStream": "eni-12345678",
"subscriptionFilters": [
"FlowLogs"
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "31953106606966983378809025079804211143289615424298221568",
"timestamp": 1432826855000,
"message": "record 1"
},
{
"id": "31953106606966983378809025079804211143289615424298221569",
"timestamp": 1432826855000,
"message": "record 2"
},
{
"id": "31953106606966983378809025079804211143289615424298221570",
"timestamp": 1432826855000,
"message": "record 3"
}
]
}
You may want to check out Step 8 of Example 1 in the developer guide for an example of how the data should look like in Kinesis. That example shows you how to use the AWS CLI to deserialize the Kinesis record into the above JSON structure with basic unix tools. (Note that the data attribute is expected to be gzipped.) This project expects the Kinesis data to be in that exact format for it to be able to read from Kinesis and send to Elasticsearch. This is where the parsing happens: CloudWatchLogsSubscriptionTransformer.java L50-L101
thanks @dvassallo that's good information I have bookmarked it. We do not know how much we will have to do this, so my evolving quick and dirty solution is now to pipe output to gnu parallel
Is put-records not a decent solution, I mean I assume the aws cli is doing that behind the scenes anyways right for xml raw api calls?
gzcat -r ./export1/74dad32f-97af-4987-8df8-20ee2ea4fb10/eni-abcd1234-all/*.gz | parallel -j 10 aws kinesis put-record --stream-name CWL-ElasticsearcKinesisSubscriptionStream-1O1NMVORNI953 --partition-key 0 --data "{}" --profile stg
parallel, while not elegant, allowed me to now get up to over 1000 put requests per period using 10 concurrent jobs
Yes, PutRecords would work as well.
Your one-liner solution is quite impressive actually!
... though I just realized from re-reading your second comment that you initially attempted to pass a sample log event directly to the Kinesis.PutRecord "data" attribute:
--data '2016-02-16T21:01:11.000Z 2 0123456789012 eni-abcd1234 5.6.7.8 1.2.3.4 35434 443 6 5 825 1455656471 1455656531 ACCEPT OK'
But what the "data" attribute should have contained is the following json blob, all gzip'ed:
{
"owner": "<Your AWS Account Id>",
"logGroup": "<Your Log Group Name>",
"logStream": "< Your Log Stream Name>",
"subscriptionFilters": [
"<Not Important In This Case""
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "1000000",
"timestamp": 1455656471000,
"message": "2016-02-16T21:01:11.000Z 2 0123456789012 eni-abcd1234 5.6.7.8 1.2.3.4 35434 443 6 5 825 1455656471 1455656531 ACCEPT OK"
}
]
}
The "id" attribute for log events is just a unique ID that Elasticsearch would use to de-duplicate. You shouldn't use the same id string for different events.
Ah yes, I'm passing garbage as --data then that makes sense why I'm not seeing events show up in the kibana dashboard. I will have to script this a bit to generate the id timestamp and format the message properly.
One note, on the shard iterator and validating test data, on a mac base64 uses -D
as -d is debug
but this way very helpful in pointing me in the right direction
echo -n "H4sIAAAAAAAAADWOwQqCQBRFf2WYdUSRiriLMBdZQgYtImLSlz7SGZk3FiH+e6PW8nAv956O10AkCjh9GuAB3ySH0zGJb/swTddRyGdcvSXoIalUm7+FycpYFWSDShWRVm1js4lSo0HUE1J7p0xjY1DJLVYGNPHgch174QukGbDjmE91g1bDiNqOLR3X9RzPcxYr35/99QaBc8x+euynF7BNCdkTZcFKEJUpmXqw3C6hFMMz26EEQmI0qs15f+2/+3ON6vIAAAA=" | \
base64 -D | zcat
{"messageType":"CONTROL_MESSAGE","owner":"CloudwatchLogs","logGroup":"","logStream":"","subscriptionFilters":[],"logEvents":[{"id":"","timestamp":1455646640388,"message":"CWL CONTROL MESSAGE: Checking health of destination Kinesis stream."}]}
will reply back later when I have something working