Metorikku is a library that simplifies writing and executing ETLs on top of Apache Spark.
It is based on simple YAML configuration files and runs on any Spark cluster.
The platform also includes a simple way to write unit and E2E tests.
To run Metorikku you must first define 2 files.
A metric file defines the steps and queries of the ETL as well as where and what to output.
For example a simple configuration YAML (JSON is also supported) should be as follows:
steps:
- dataFrameName: df1
sql:
SELECT *
FROM input_1
WHERE id > 100
- dataFrameName: df2
sql:
SELECT *
FROM df1
WHERE id < 1000
output:
- dataFrameName: df2
outputType: Parquet
outputOptions:
saveMode: Overwrite
path: df2.parquet
You can check out a full example file for all possible values in the sample YAML configuration file.
Make sure to also check out the full Spark SQL Language manual for the possible queries.
This file will include input sources, output destinations and the location of the metric config files.
So for example a simple YAML (JSON is also supported) should be as follows:
metrics:
- /full/path/to/your/metric/file.yaml
inputs:
input_1: parquet/input_1.parquet
input_2: parquet/input_2.parquet
output:
file:
dir: /path/to/parquet/output
You can check out a full example file for all possible values in the sample YAML configuration file.
Also make sure to check out all our examples.
Currently Metorikku supports the following inputs: CSV, JSON, parquet, JDBC, Kafka, Cassandra
And the following outputs:
CSV, JSON, parquet, Redshift, Cassandra, Segment, JDBC, Kafka, Elasticsearch
NOTE: If you are using Kafka as input note that the only supported outputs are currently Kafka/Parquet/CSV/JSON and currently you can use just one output for streaming metrics
There are currently 3 options to run Metorikku.
To run on a cluster Metorikku requires Apache Spark v2.2+
- Download the last released JAR
- Run the following command:
spark-submit --class com.yotpo.metorikku.Metorikku metorikku.jar -c config.yaml
Metorikku is released with a JAR that includes a bundled spark.
- Download the last released Standalone JAR
- Run the following command:
java -Dspark.master=local[*] -cp metorikku-standalone.jar com.yotpo.metorikku.Metorikku -c config.yaml
It's also possible to use Metorikku inside your own software
Metorikku library requires scala 2.11
To use it add the following dependency to your build.sbt:
"com.yotpo" % "metorikku" % "LATEST VERSION"
In order to test and fully automate the deployment of metrics we added a method to run tests against a metric.
A test is comprised of the following:
This defines what to test and where to get the mocked data. For example, a simple test YAML (JSON is also supported) will be:
metric: "/path/to/metric"
mocks:
- name: table_1
path: mocks/table_1.jsonl
tests:
df2:
- id: 200
name: test
- id: 300
name: test2
And the corresponding mocks/table_1.jsonl
:
{ "id": 200, "name": "test" }
{ "id": 300, "name": "test2" }
{ "id": 1, "name": "test3" }
You can run Metorikku tester in any of the above methods (just like a normal Metorikku).
The main class changes from com.yotpo.metorikku.Metorikku
to com.yotpo.metorikku.MetorikkuTester
In Spark some behaviors are different when writing queries for streaming sources (for example kafka).
In order to make sure the test behaves the same as the real life queries, you can configure a mock to behave like a streaming input by writing the following:
metric: "/path/to/metric"
mocks:
- name: table_1
path: mocks/table_1.jsonl
# default is false
streaming: true
# default is append output mode
outputMode: update
tests:
df2:
- id: 200
name: test
- id: 300
name: test2
All configuration files support variable interpolation from environment variables and system properties using the following format:
${variable_name}
When using JDBC writer or input you must provide a path to the driver JAR.
For example to run with spark-submit with a mysql driver:
spark-submit --driver-class-path mysql-connector-java-5.1.45.jar --jars mysql-connector-java-5.1.45.jar --class com.yotpo.metorikku.Metorikku metorikku.jar -c config.yaml
If you want to run this with the standalone JAR:
java -Dspark.master=local[*] -cp metorikku-standalone.jar:mysql-connector-java-5.1.45.jar -c config.yaml
JDBC query output allows running a query for each record in the dataframe.
- query - defines the SQL query. In the query you can address the column of the DataFrame by their location using the dollar sign ($) followed by the column index. For example:
INSERT INTO table_name (column1, column2, column3, ...) VALUES ($1, $2, $3, ...);
- maxBatchSize - The maximum size of queries to execute against the DB in one commit.
- minPartitions - Minimum partitions in the DataFrame - may cause repartition.
- maxPartitions - Maximum partitions in the DataFrame - may cause coalesce.
Kafka output allows writing batch operations to kafka
We use spark-sql-kafka-0-10 as a provided jar - spark-submit command should look like so:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 --class com.yotpo.metorikku.Metorikku metorikku.jar
-
topic - defines the topic in kafka which the data will be written to. currently supported only one topic
-
valueColumn - defines the values which will be written to the Kafka topic, Usually a json version of data, For example:
SELECT keyColumn, to_json(struct(*)) AS valueColumn FROM table
- keyColumn - key that can be used to perform de-duplication when reading
Kafka input allows reading messages from topics
inputs:
testStream:
kafka:
servers:
- 127.0.0.1:9092
topic: test
consumerGroup: testConsumerGroupID
Using Kafka input will convert your application into a streaming application build on top of Spark Structured Streaming.
Please note the following while using streaming applications:
-
Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
-
Limit and take first N rows are not supported on streaming Datasets.
-
Distinct operations on streaming Datasets are not supported.
-
Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
-
Make sure to add the relevant Output Mode to your Metric as seen in the Examples
-
Make sure to add the relevant Triggers to your Metric if needed as seen in the Examples
-
For more information please go to Spark Structured Streaming WIKI
-
In order to measure your consumer lag you can use the
consumerGroup
parameter to track your application offsets against your kafka input.
This will commit the offsets to kafka, as a new dummy consumer group.
One of the most useful features in Metorikku is it's instrumentation capabilities.
Instrumentation metrics are written by default to what's configured in spark-metrics.
Metorikku sends automatically on top of what spark is already sending the following:
-
Number of rows written to each output
-
Number of successful steps per metric
-
Number of failed steps per metric
-
In streaming: records per second
-
In streaming: number of processed records in batch
You can also send any information you like to the instrumentation output within a metric.
Check out the example for further details.
You can also send metric directly to InfluxDB (gaining the ability to use tags and time field).
Check out the example and also the InfluxDB E2E test for further details.
Elasticsearch output allows bulk writing to elasticsearch We use elasticsearch-hadoop as a provided jar - spark-submit command should look like so:
spark-submit --packages org.elasticsearch:elasticsearch-hadoop:6.6.1 --class com.yotpo.metorikku.Metorikku metorikku.jar
Check out the example and also the Elasticsearch E2E test for further details.
Metorikku is provided with a docker image.
You can use this docker to deploy metorikku in container based environments (we're using Nomad by HashiCorp).
Check out this docker-compose for a full example of all the different parameters available and how to set up a cluster.
Currently the image only supports running metorikku in a spark cluster mode with the standalone scheduler.
The image can also be used to run E2E tests of a metorikku job. Check out an example of running a kafka 2 kafka E2E with docker-compose here
Metorikku supports adding custom code as a step. This requires creating a JAR with the custom code. Check out the UDF examples directory for a very simple example of such a JAR.
The only thing important in this JAR is that you have an object with the following method:
object SomeObject {
def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = {}
}
Inside the run function do whatever you feel like, in the example folder you'll see that we registered a new UDF.
Once you have a proper scala file and a build.sbt
file you can run sbt package
to create the JAR.
When you have the newly created JAR (should be in the target folder), copy it to the spark cluster (you can of course also deploy it to your favorite repo).
You must now include this JAR in your spark-submit command by using the --jars
flag, or if you're using java to run add it to the -cp
flag.
Now all that's left is to add it as a new step in your metric:
- dataFrameName: dataframe
classpath: com.example.SomeObject
params:
param1: value1
This will trigger your run
method with the above dataFrameName.
Check out the built-in code steps here.
NOTE: If you added some dependencies to your custom JAR build.sbt you have to either use sbt-assembly to add them to the JAR or you can use the --packages
when running the spark-submit command
Metorikku supports reading and saving tables with Apache hive metastore. To enable hive support via spark-submit (assuming you're using MySQL as Hive's DB but any backend can work) send the following configurations:
spark-submit \
--packages mysql:mysql-connector-java:5.1.75 \
--conf spark.sql.catalogImplementation=hive \
--conf spark.hadoop.javax.jdo.option.ConnectionURL="jdbc:mysql://localhost:3306/hive?useSSL=false&createDatabaseIfNotExist=true" \
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=com.mysql.jdbc.Driver \
--conf spark.hadoop.javax.jdo.option.ConnectionUserName=user \
--conf spark.hadoop.javax.jdo.option.ConnectionPassword=pass \
--conf spark.sql.warehouse.dir=/warehouse ...
NOTE: If you're running via the standalone metorikku you can use system properties instead (-Dspark.hadoop...
) and you must add the MySQL connector JAR to your class path via -cp
This will enable reading from the metastore.
To write an external table to the metastore you need to add tableName to your output configuration:
...
output:
- dataFrameName: moviesWithRatings
outputType: Parquet
outputOptions:
saveMode: Overwrite
path: moviesWithRatings.parquet
tableName: hiveTable
overwrite: true
Only file formats are supported for table saves (Parquet, CSV, JSON).
To write a managed table (that will reside in the warehouse dir) simply omit the path in the output configuration.
To change the default database you can add the following to the job configuration:
...
catalog:
database: some_database
...
Check out the examples and the E2E test
See the LICENSE file for license rights and limitations (MIT).