O.G.R.E TOOLBOX =============== General ----------------------------------------------------------------------- Ogre is a suite of tools to facilitate for Extract, Load and Transform data in different formats in a AWS S3 based data Lake. Ogre is open sourced by Widespace AB under Apache License, Version 2.0: https://opensource.org/licenses/Apache-2.0 From Widespaces point of view the AWS S3 data lake is the company wide repository where different applications publish their data and making it available to others. The data lake is hosted in S3 buckets and there are some basic rules to follow in order to publish data there: 1. The data should be packed as records in AVRO container files. 2. Time series data records should contain an id and timestamp in UTC. 3. All records within a data file must belong to the same hour. 4. Data files are uploaded and published into the data lake using following S3 URL pattern: s3://<bucket>/<format>/<component>/<source>/<type>/d=<yyyy-MM-dd>/h=<hour>/<filename> Where: - <bucket> The Data Lake bucket - <format> Format of the files under the component (e.g. avro, json) - <component> The application/component producing data (e.g. engine, rs, apls, inscreenservice) - <source> The name of log/data (e.g. rl2, accesslogs, perflogs, inscreenlogs) - <type> The log record type (e.g delivery, impression, click, currentposition) - <yyyy-MM-dd> Origination date of the file. The year using 4 digits (e.g. 2015) The month in year using 2 digits (i.e. 01, 02, ... , 11, 12) The day of month using 2 digits (i.e. 01, 02, ... , 30, 31) - <hour> The hour of day using 2 digits (e.g. 00, 01, ..., 22, 23) - <filename> The data filename having the ".avro" extension. As a guideline also start names with <minute><second> so they are listed in chronological order for readability. Also add a random number to make sure to avoid name conflicts if more than one nodes in a cluster uploading data files. 5. Upload data file using appropriate storage plan to save costs. The S3 Infrequent Access plan is the one to use in 99% of the cases. Many options exists to join and process the data in the Data Lake: - Redshift (Ogre can load/sync data from lake into a custom Redshift) - Athena (Athena can operate directly on the datalake avro files) - EMR (can work with avro files in S3 natively) - Spark (can work with avro files in S3 natively) - Kinesis (Ogre can load/sync data from lake into custom Kinesis streams) The Ogre tools are implemented as simple CLI tools. Following tools exist to help making the data in lake available in Redshift, Kinesis etc: 1. avro2json: Converts avro files to json. Nested lists and maps are also explodes into own separate two dimensional json files. Exploded maps and lists will have foreign keys to main data file for joining. This tool is mainly used to convert avro files to json for speedier imports into Redshift. The tool supports bulk converts for a period or continuously converting new data as it is published in the lake. 2. data2redshift: Imports data into a Redshift. Handles automatic AVRO/DDL schema changes and data partitioning seamlessly. The tool tool supports bulk import for a period or continuously import of new data as it is published in the lake. 3. avro2kinesis: Loads data into a custom Kinesis Stream. The tool supports bulk loading for a period or continuously loading new data into the stream as it is published in the lake. 4. avro2s3: Copies data files from a S3 data lake location into another. This to be able to copy data from the old unstructured path into new structured data lake path. Also, for any path, we can copy data from 1 location to another location (e.g. bucket). 5. db2avro: Runs arbitrary SQL scripts in a relational database (like MySQL or Redshift), the result is dynamically converted to AVRO and uploaded to S3. Support for dependency check to make sure required data is in place before script is executed. This tool is mainly used to dump dimension tables to the data lake as well as running ETL jobs aggregating data in e.g. a Redshift and dump the result in datalake. An ETL chain can be build using this tool. 6. data2rds: Imports data into a RDS DB. Handles automatic AVRO/DDL schema changes and data partitioning seamlessly. The tool tool supports bulk import for a period or continuously import of new data as it is published in the lake. Installation ----------------------------------------------------------------------- - Import the Athena and Redshift drivers into your local maven repository: mvn install:install-file -Dfile=lib/RedshiftJDBC41.jar -DgroupId=com.amazon.redshift -DartifactId=aws-redshift-jdbc41 -Dversion=1.1.13.1013 -Dpackaging=jar mvn install:install-file -Dfile=lib/AthenaJDBC41_2.0.9.jar -DgroupId=com.amazonaws.athena.jdbc -DartifactId=AthenaJDBC41 -Dversion=2.0.9 -Dpackaging=jar - Build the ogre.jar and the ogre-x.x.x-bin.tar.gz package (output dir: target folder) mvn install - Extract the ogre-x.x.x-bin.tar.gz file on the machine to use to install it avro2json - - - - - - - - - - - - - - - - - - Objective: To convert and explode AVRO data files preparing for import into Redshift. Prerequisites: - IAM user(s) with access to S3 where to read the avro files and where to write the converted to json files. Command: > avro2json sync <int> <lookback> -threads <x> -types <x,y,z> -config <x> > avro2json load <from> <to> -threads <x> -types <x,y,z> -config <x> -replace Command args: - <int> The interval in seconds to poll for new Avro files to convert. - <lookback> The time in hours to search backwards for new files to sync - <from> The start of period to convert Avro files from in format yyyy-MM-dd:HH - <end> The end of period to convert Avro files from in format yyyy-MM-dd:HH Command options: - threads The number of threads to use for converting and upload the the files. Default: 1. - replace Will replace already converted JSON files. Default: Files are not replaced - types Comma separated list of types to convert. Default: All types defined in config file - config The configuration file to use. Default: avro2json.conf Configuration file: log4j.configuration = log4j.xml types = delivery, impression, click type.*.include = $.deliveryId,$.timestamp src.s3.accessKeyId = xxxx src.s3.secretKey = xxxx src.s3.rootPath = s3:// dst.s3.accessKeyId = xxxx dst.s3.secretKey = xxxx dst.s3.rootPath = s3:// dst.s3.storageClass = STANDARD_IA Configuration parameters: - <log4j.configuration> (optional) To point out an external log4j.xml file to use - <types> (mandatory) To define the types to convert. This can be overriden by the -types CLI arg. - <type.[type].include> (optional) Comma separated list of json paths to Avro fields which to include in extracted arrays/maps. The '[type]' should be replaced with the type/name of array/map where to include fields into. '[type]' can include the glob wildcards * and ?. This is useful for adding important fields such as RedShift dist and sort keys. - <src.s3.accessKeyId> (mandatory) AWS key with permissions to read avro files in <src.s3.rootPath> - <src.s3.secretKey> (mandatory) AWS secret key - <src.s3.rootPath> (mandatory) The S3 url to Avro root folder. I.e. the folder containing '<type>' folders. - <dst.s3.accessKeyId> (mandatory) AWS key with permissions to read/write JSON files in <dst.s3.rootPath> - <dst.s3.secretKey> (mandatory) AWS secret key - <dst.s3.rootPath> (mandatory) The S3 url to JSON root folder. I.e. the folder containing '<type>' folders. - <dst.s3.storageClass> (optional) The S3 storage class to use for uploaded json data files. Available: STANDARD, STANDARD_IA, REDUCED_REDUNDANCY, GLACIER. Default: STANDARD_IA Examples: > avro2json sync 60 1 -threads 20 This will continuously scan S3 every minute for new Avro files. Ogre will look back one hour and new files are converted and shipped to S3 in parallel by 20 threads. > avro2json load 2015-10-01:00 2015-10-21:23 -threads 20 -replace This will bulk convert Avro files from 2015-10-01 00:00:00 to 2015-10-21 23:59:59 to JSON. Existing JSON files for period will be replaced. data2redshift - - - - - - - - - - - - - - - - - - Ogre needs to bootstrap Redshift with some Ogre tables keeping track of import state before any files can be imported. This is done using the "data2redshift init" command. Multiple Ogre pipelines importing data from different data sources into same Redshift cluster is a valid use case when to join different types of data together and doing analytics. +--------+ | S3 | Ogre X | Data X | --------+ | JSON | | +----------+ +--------+ +------> | | | Redshift | <----- Analytic queries... +--------+ +------> | | | S3 | | +----------+ | Data Y | --------+ | JSON | Ogre Y +--------+ E.g. Data X resp Y could be the RL2 resp Inscreen Logs to join and doing analytics on. NOTE: Each Ogre process MUST upload its data into a dedicated Redshift schema else there will be conflicts and deadlocks. I.e. Ogre processes CANNOT share the same Redshift schema. Each Ogre process should have a dedicated src.s3.ddldir and s3.tempdir while sharing the same s3.avroroot and s3.jsonroot is perfectly fine. Objective: To import data into a Redshift. Prerequisites: - IAM user(s) with access to S3 - Redshift cluster - S3 folder with the DDLs to Ogre into Redshift - S3 folder for temporary files Command: > data2redshift -init -config <x> > data2redshift -sync <int> <lookback> -types <x,y,z> -config <x> > data2redshift -load <from> <to> -replace -types <x,y,z> -config> <x> Command args: - <int> The interval in seconds to poll for new data files to import. - <lookback> The time in hours to search backwards for new files to import - <from> The start of period to import data files from (in format yyyy-MM-dd:HH) - <end> The end of period to import data files from (in format yyyy-MM-dd:HH) Command options: - replace Will replace already imported data files. Default: Data is not replaced - types Comma separated list of types to import. Default: All types defined in config file - config The configuration file to use. Default: data2redshift.conf Configuration file: log4j.configuration = log4j.xml src.s3.accessKeyId = xxx src.s3.secretKey = src.s3.rootdir = s3://xxx src.s3.tmpdir = s3://xxx src.s3.ddldir = s3://xxx dst.redshift.host = xxx dst.redshift.database = xxx dst.redshift.schema = xxx dst.redshift.user = xxx dst.redshift.password = xxx partitioning.xxx = public.xxx:daily:30 Configuration parameters: - log4j.configuration (optional) To point out an external log4j.xml file to use - <src.s3.accessKeyId> (mandatory) AWS key with permissions to read avro files in <src.s3.rootPath> - <src.s3.secretKey> (mandatory) AWS secret key - <src.s3.rootdir> (mandatory) The S3 url to data root folder. I.e. the folder containing '<type>' folders. - <src.s3.tmpdir> (mandatory) An temp dir on S3 for Ogre to write temporary files - <src.s3.ddldir> (mandatory) The S3 folder containing the Redshift DDL files where to import data in - <dst.redshift.host> (mandatory) The Redshift host name - <dst.redshift.database> (mandatory) The Redshift database to import data into - <dst.redshift.schema> (mandatory) The schema where to import data. Every Ogre import must have a dedicated schema - <dst.redshift.user> (mandatory) The user name - <dst.redshift.password> (mandatory) The password - <partitioning.<type>> (optional) If to partition a type. Format <view name>:<partition type>:<nbr of partitions> The <view name> is the union all view consolidating all partition tables, <partition type> is one of hourly, daily, weekly, monthly. Examples: > data2redshift sync 60 1 This will continuously scan S3 every minute for new json data files. Ogre will look back one hour and new files are imported into redshift. > data2redshift load 2015-10-01:00 2015-10-21:23 -replace This will bulk import json data files from 2015-10-01 00:00:00 to 2015-10-21 23:59:59 into Redshift. Existing data for period will be replaced. Ogre instance type considerations ----------------------------------------------------------------------- The EC2 instance type to chose for Ogre depends on the task. For Ogre to convert AVRO to Gzipped JSON requires an instance with relatively high network capacity as well as CPU. For long batches of jsonload c4.2xlarge is recommended. For jsonsync a suitable instance type would be a m3.medium. For dbload and dbsync a t2.medium is fine. Surveillance and alarming ----------------------------------------------------------------------- Setup Cloudwatch alarms for Ogre instance health. Setup Ogre application SNS alarming by editing the log4j.xml file and enable the SnsAsyncAppender: <appender name="sns" class="com.ws.ogre.utils.SnsAsyncAppender"> <param name="threshold" value="WARN"/> <param name="topicArn" value="arn:aws:sns:eu-west-1:xxx"/> <param name="awsKeyId" value="xxx"/> <param name="awsSecretKey" value="xxx"/> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%p %t %c - %m%n"/> </layout> </appender> <root> <appender-ref ref="stdout"/> <appender-ref ref="sns"/> </root> This will send application alarms as SNS notifications.