/cloudtrail-logs-to-AWS-Elasticsearch-Service

Sending cloudtrail logs from s3 to AWS Elasticsearch using Lambda

Primary LanguagePython

cloudtrail-logs-to-AWS-Elasticsearch-Service

This will pull objects from S3 as they are delivered and will post them into your ElasticSearch cluster. It gets its privileges from an IAM role (see ). It gets all its configuration from Lambda environment variables. You should not need to edit the source code to deploy it.

Once you've got CloudTrail reliably inserted into your ElasticSearch cluster, start looking at stuff like Traildash2 or AWS cloudwatch logs subscription consumer. Both are very old, but they contain some useful widgets for visualisations.

CloudTrail Sample Dashboard

Prerequisites

Installation

Once you've got CloudTrail logging to S3 and your ElasticSearch cluster accepting input, you're ready to do this work. This will set up an S3 event listener. When an object is put in S3, this code gets called. This code unzips the S3 object and squirts it into ElasticSearch.

Set Up IAM permissions

You need to create some policies and attach them to a role. That will give your Lambda function the ability to execute. The easiest way to do this is to create a single IAM policy that grants all the permissions you need, and then attach that one policy to the Lambda role. (You could have a few policies—one for elasticsearch, one for S3, one for CloudWatch Logs—and then attach 3 policies to the one role)

IAM Policy

The IAM policy allows 3 things: Reading your S3 bucket to get cloudtrail, posting records to your ElasticSearch cluster, and CloudWatch Logs for writing any errors or logging.

  1. Edit the lambda-iam-policy.json file
    1. Add in the bucket name for your bucket.
    2. Add in the domain name you assigned to your ElasticSearch domain.
  2. Create an IAM policy, name it something like CloudTrailESAccess and set its contents to be the lambda-iam-policy.json file.

Create a Role For your Lambda Function

Create an AWS IAM role.

  1. Choose Lambda as the service for the role.
  2. Attach the policy you created.

Prep the Lambda Function

  1. Clone this repo
  2. On your terminal, install the requests module on the same folder with pip install requests -t .
  3. Make a zip file that includes the python file and the requests library. From within the repo, run zip -r lambda-function.zip cloudtrail2ES.py *

Create the Lambda Function

These are instructions for doing it by hand in the console. There's also a cloudtrail2ES.yaml file that contains some CloudFormation.

  1. Create a new Lambda Function
  2. Choose Python 3.6 as a runtime.
  3. Choose to start with an empty function.
  4. Set the handler for the function is lambda_handler
  5. Set the environment variables:
    • ES_INDEX set to something like cloudtrail. That will result in ElasticSearch indexes like cloudtrail-2019-01-19.
    • ES_HOST set to the name of your ElasticSearch endpoint. You can get that from the console or by running aws es describe-elasticsearch-domain. The value will look something like: search-DOMAIN-abcdefgabcdefg.eu-west-1.es.amazonaws.com. You don't want anything else (no 'https' or / or anything).
  6. Test the Lambda function.
    1. Edit the test-cloudtrail-event.json file and change the bucket name to be your CloudTrail bucket. Go find a real key ("file name") of a CloudTrail object in your bucket and put that in the key parameter.
    2. Invoke the test and set your test event to be this JSON object. Debug any problems.
    3. If the test was successful, you have a few objects in your ElasticSearch cluster now.
    4. If the test was unsuccessful, go into the CloudWatch Logs of your lambda function and look at the error messages.

Launch the Lambda from S3 Events

  1. Go to your S3 Bucket in the console.
  2. Click on Properties
  3. Click on Events
  4. Click + Add Notification
  5. Name the event
  6. For Events, select "All object create events"
  7. For Prefix, enter an appropriate prefix. Probably AWSLogs/
  8. For Send to, select the lambda function you created
  9. Click Save.

Seeing it Work

It will take time. CloudTrail logs are delivered every few minutes. The S3 events fire pretty much immediately. Then the records are sent to ElasticSearch.

  1. Click on the Kibana link in the ElasticSearch console.
  2. Click on the Management gear icon
  3. Click on + Create Index Pattern
  4. Name the index pattern cloudtrail-* (or whatever you used for the ES_INDEX value). You should see something helpful like Your index pattern matches 2 indices
  5. Choose the @timestamp field for your timestamp
  6. Save the index
  7. Click on the Discover link at the top left. You should see some CloudTrail records.

CloudWatch Logs

If you look at your function in the Lambda console, you'll see a tab labeled Monitoring. There's a link for its logs on cloudwatch, you can see what the lambda function is doing.

You will want to click on the log group and set its retention time to something. By default, CloudWatch Logs are set to Never Expire and that will store every log entry forever. I set mine to 3 days. That's probably generous. I really don't need these logs at all.

Preloading Events

If, like me, you turned on CloudTrail a long time ago, and now you're just starting to analyse it with ElasticSearch, you might want to import a lot of S3 objects from your CloudTrail bucket. There's a script loadCloudTrail2ES.py that will do that. You invoke it something like this:

python3 loadCloudTrail2ES.py \
    --bucket MYCLOUDTRAILBUCKET \
    --endpoint MYELASTICSEARCHDOMAIN \
    --region eu-west-1 \
    --prefix AWSLogs/111111111111/CloudTrail/

Other Options

Note that it takes a few other optional arguments that you can use to test before you turn it loose:

  • --limit X will stop after processing X S3 objects.
  • --dryrun will cause it to fetch and parse the S3 objects, but it will not POST them to ElasticSearch.
  • --index will name the index something different. By default it names the index cloudtrail-YYYY-MM-DD. If you give it --index foo on the command line, it will use foo-YYYY-MM-DD as the index instead.
  • --profile will use the STS::AssumeRole feature to assume the role for invoking AWS API calls. See the named profiles documentation for more information on how that works.
  • --prefix is optional. If you leave it out, it defaults to AWSLogs/ with the assumption that you probably want to limit yourself to CloudTrail logs, and that's the prefix where they're written by default. If you want no prefix at all, so that the script parses every single object in the bucket, you need to specify --prefix=/.

Notes

The loadCloudTrail2ES.py script uses the bulk upload API of ElasticSearch. It does not stop to think about whether that would be a good idea. It batches up all the CloudTrail events in the S3 object and sends them to ElasticSearch in a single request. My S3 objects rarely have more than 100 CloudTrail events in them and this always succeeds for me. But if you have a really active account and you have hundreds or thousnds of events in a single S3 object, this might fail.

If you screw up, remember that curl -X DELETE https://endpoint/cloudtrail-* is your friend. You can delete ElasticSearch indexes fairly easily.

My changes

I updated it to run on Python 3.6 and to draw its configuration from the environment. It should not need to be modified code-wise. I also added a quick check in case the object isn't a CloudTrail record. I enable CloudTrail Digests on my logs, and those end up in somewhat similar paths.

I also ran into trouble with the apiVersion attribute of many CloudTrail records. ElasticSearch wants to treat it as a date, because it often looks like one. (e.g., 2012-10-17). I find it causes problems and there are blog posts about it. Frankly, I never search on apiVersion and I don't care. So my Lambda function is removing it. It doesn't store apiVersion in ElasticSearch at all.

Acknowledgements

You might want to look at Fernando's blog.

Thanks to the original and to pavan3401 for forking and improving some.