This will pull objects from S3 as they are delivered and will post them into your ElasticSearch cluster. It gets its privileges from an IAM role (see ). It gets all its configuration from Lambda environment variables. You should not need to edit the source code to deploy it.
Once you've got CloudTrail reliably inserted into your ElasticSearch cluster, start looking at stuff like Traildash2 or AWS cloudwatch logs subscription consumer. Both are very old, but they contain some useful widgets for visualisations.
- Access to AWS Lambda
- CloudTrail already set to store logs in an s3 bucket. Don't have that? Follow this documentation.
- An elasticsearch cluster. Either build it yourself, or follow the tutorial in the documentation.
Once you've got CloudTrail logging to S3 and your ElasticSearch cluster accepting input, you're ready to do this work. This will set up an S3 event listener. When an object is put in S3, this code gets called. This code unzips the S3 object and squirts it into ElasticSearch.
You need to create some policies and attach them to a role. That will give your Lambda function the ability to execute. The easiest way to do this is to create a single IAM policy that grants all the permissions you need, and then attach that one policy to the Lambda role. (You could have a few policies—one for elasticsearch, one for S3, one for CloudWatch Logs—and then attach 3 policies to the one role)
The IAM policy allows 3 things: Reading your S3 bucket to get cloudtrail, posting records to your ElasticSearch cluster, and CloudWatch Logs for writing any errors or logging.
- Edit the
lambda-iam-policy.json
file- Add in the bucket name for your bucket.
- Add in the domain name you assigned to your ElasticSearch domain.
- Create an IAM policy, name it something like
CloudTrailESAccess
and set its contents to be thelambda-iam-policy.json
file.
Create an AWS IAM role.
- Choose Lambda as the service for the role.
- Attach the policy you created.
- Clone this repo
- On your terminal, install the requests module on the same folder with
pip install requests -t .
- Make a zip file that includes the python file and the requests library. From within the repo, run
zip -r lambda-function.zip cloudtrail2ES.py *
These are instructions for doing it by hand in the console. There's also a cloudtrail2ES.yaml file that contains some CloudFormation.
- Create a new Lambda Function
- Choose Python 3.6 as a runtime.
- Choose to start with an empty function.
- Set the handler for the function is
lambda_handler
- Set the environment variables:
ES_INDEX
set to something likecloudtrail
. That will result in ElasticSearch indexes likecloudtrail-2019-01-19
.ES_HOST
set to the name of your ElasticSearch endpoint. You can get that from the console or by runningaws es describe-elasticsearch-domain
. The value will look something like:search-DOMAIN-abcdefgabcdefg.eu-west-1.es.amazonaws.com
. You don't want anything else (no 'https' or / or anything).
- Test the Lambda function.
- Edit the
test-cloudtrail-event.json
file and change the bucket name to be your CloudTrail bucket. Go find a real key ("file name") of a CloudTrail object in your bucket and put that in thekey
parameter. - Invoke the test and set your test event to be this JSON object. Debug any problems.
- If the test was successful, you have a few objects in your ElasticSearch cluster now.
- If the test was unsuccessful, go into the CloudWatch Logs of your lambda function and look at the error messages.
- Edit the
- Go to your S3 Bucket in the console.
- Click on Properties
- Click on Events
- Click + Add Notification
- Name the event
- For Events, select "All object create events"
- For Prefix, enter an appropriate prefix. Probably
AWSLogs/
- For Send to, select the lambda function you created
- Click Save.
It will take time. CloudTrail logs are delivered every few minutes. The S3 events fire pretty much immediately. Then the records are sent to ElasticSearch.
- Click on the Kibana link in the ElasticSearch console.
- Click on the Management gear icon
- Click on + Create Index Pattern
- Name the index pattern
cloudtrail-*
(or whatever you used for theES_INDEX
value). You should see something helpful like Your index pattern matches 2 indices - Choose the
@timestamp
field for your timestamp - Save the index
- Click on the Discover link at the top left. You should see some CloudTrail records.
If you look at your function in the Lambda console, you'll see a tab labeled Monitoring. There's a link for its logs on cloudwatch, you can see what the lambda function is doing.
You will want to click on the log group and set its retention time to something. By default, CloudWatch Logs are set to Never Expire and that will store every log entry forever. I set mine to 3 days. That's probably generous. I really don't need these logs at all.
If, like me, you turned on CloudTrail a long time ago, and now you're just starting to analyse it with ElasticSearch, you might want to import a lot of S3 objects from your CloudTrail bucket. There's a script loadCloudTrail2ES.py
that will do that. You invoke it something like this:
python3 loadCloudTrail2ES.py \
--bucket MYCLOUDTRAILBUCKET \
--endpoint MYELASTICSEARCHDOMAIN \
--region eu-west-1 \
--prefix AWSLogs/111111111111/CloudTrail/
Note that it takes a few other optional arguments that you can use to test before you turn it loose:
--limit X
will stop after processing X S3 objects.--dryrun
will cause it to fetch and parse the S3 objects, but it will notPOST
them to ElasticSearch.--index
will name the index something different. By default it names the indexcloudtrail-YYYY-MM-DD
. If you give it--index foo
on the command line, it will usefoo-YYYY-MM-DD
as the index instead.--profile
will use theSTS::AssumeRole
feature to assume the role for invoking AWS API calls. See the named profiles documentation for more information on how that works.--prefix
is optional. If you leave it out, it defaults toAWSLogs/
with the assumption that you probably want to limit yourself to CloudTrail logs, and that's the prefix where they're written by default. If you want no prefix at all, so that the script parses every single object in the bucket, you need to specify--prefix=/
.
The loadCloudTrail2ES.py
script uses the bulk upload API of ElasticSearch. It does not stop to think about whether that would be a good idea. It batches up all the CloudTrail events in the S3 object and sends them to ElasticSearch in a single request. My S3 objects rarely have more than 100 CloudTrail events in them and this always succeeds for me. But if you have a really active account and you have hundreds or thousnds of events in a single S3 object, this might fail.
If you screw up, remember that curl -X DELETE https://endpoint/cloudtrail-*
is your friend. You can delete ElasticSearch indexes fairly easily.
I updated it to run on Python 3.6 and to draw its configuration from the environment. It should not need to be modified code-wise. I also added a quick check in case the object isn't a CloudTrail record. I enable CloudTrail Digests on my logs, and those end up in somewhat similar paths.
I also ran into trouble with the apiVersion
attribute of many CloudTrail records. ElasticSearch wants to treat it as a date, because it often looks like one. (e.g., 2012-10-17
). I find it causes problems and there are blog posts about it. Frankly, I never search on apiVersion
and I don't care. So my Lambda function is removing it. It doesn't store apiVersion
in ElasticSearch at all.
You might want to look at Fernando's blog.
Thanks to the original and to pavan3401 for forking and improving some.