/datagen

Datagenerator for CDP Platform services

Primary LanguageJavaApache License 2.0Apache-2.0

Datagen - Data Generation

Goal of this project is to generate meaningful Data into various Big Data Services and File systems.

Built originally for Cloudera Data Platform, it works with S3, ADLS, GCS, local file systems, and open-source services (such as kafka, hive etc…​).

Public Documentation

Public Documentation is available here https://frischhwc.github.io/datagen

Dependencies

Datagen is made with:

  • Java 11

  • Maven 2.5

It is built for:

  • CDP 7.1.7, 7.1.8, 7.1.9

Datagen is made as a Spring Boot project with dependencies on SDK to interact with various services.

How does it work ?

Datagen is launching a Web Server with a sets of API to interact with. It comes also with a handy swagger to easily interact with APIs.

These APIS lets you:

  • Ask for Data Generation to specified services

  • Schedule Data Generation to specified services

  • Test Data Generation

  • Read Data and create a model to generate similar data

  • Get Metrics and health status of the web server

Parameters have all default values and can be empty (they will be defaulted), but they all also can be overriden, letting clients customize fully the data generation.

By default, TLS is activated and a user/password is required (admin/admin by default).

Deployment

Datagen can be run in two ways:

  • As a standalone web server, anywhere, but you’ll need to provide configurations manually

  • Fully integrated into CDP using a parcel, a CSD and available as service through Cloudera Manager (configuration is automatic)

How to run it locally or on any remote machine ?

First, go to pom.xml and change cdp version to yours, change also if required, individual versions of each component.
Then Package the program:

mvn clean package
java -Dnashorn.args=--no-deprecation-warning --add-opens java.base/jdk.internal.ref=ALL-UNNAMED -jar random-datagen.jar

How to deploy it on CDP ?

This project aims to be bundled as a CSD and a Parcel, so it can be managed easily Cloudera Manager.

Repository containing all Datagen Versions of CSD and parcels: https://datagen-repo.s3.eu-west-3.amazonaws.com/index.html

To build & Deploy from scratch the project and deploy it to a cluster (especially useful for testing development), refer to following documentation dev-support/deployment/README.adoc

Deploying it to CDP enables auto-configuration from Cloudera Manager, hence datagen is already configured to interact with services from CDP. It also ensures that rights are well set and default directories are created.

Moreover, the full datagen service can be managed through Cloudera Manager (start/stop/configuration).

It also provides already set up commands directly available from Cloudera Manager, under Datagen > Actions:

Datagen Actions in CM

Use script src/main/resources/launchToPlatform.sh by setting your hostname, ssh user and key

It will create a directory on the platform and send all needed files there.
Then script src/main/resources/scripts/launch.sh will be launched on platform.

This script add required jars to classpath and launch jar.

Data

Where data will be generated ?

There are various possible services where data can be generated, these are called connectors and here is the full possible list:

  • HDFS-AVRO : generates an AVRO file sent to HDFS

  • HDFS-CSV : generates a CSV file sent to HDFS

  • HDFS-JSON : generates a JSON file sent to HDFS

  • HDFS-ORC : generates an ORC file sent to HDFS

  • HDFS-PARQUET : generates a PARQUET file sent to HDFS

  • HBASE

  • KAFKA : Generates an avro or json or csv message and send it to Kafka

  • KUDU

  • OZONE-AVRO : generates an AVRO file sent to OZONE

  • OZONE-JSON : generates a JSON file sent to OZONE

  • OZONE-CSV : generates a CSV file sent to OZONE

  • OZONE-ORC : generates an ORC file sent to OZONE

  • OZONE-PARQUET : generates a PARQUET file sent to OZONE

  • SOLR

  • HIVE : generate tables (External or Managed or Iceberg)

  • PARQUET: Generates a parquet file locally

  • ORC: Generates an ORC file locally

  • CSV: Generate a CSV file locally

  • AVRO: Generates an Avro file locally

  • JSON: Generates a JSON file locally

  • S3-PARQUET: Generates a parquet file to S3

  • S3-ORC: Generates an ORC file to S3

  • S3-CSV: Generate a CSV file to S3

  • S3-AVRO: Generates an Avro file to S3

  • S3-JSON: Generates a JSON file to S3

  • ADLS-PARQUET: Generates a parquet file on ADLS

  • ADLS-ORC: Generates an ORC file on ADLS

  • ADLS-CSV: Generate a CSV file on ADLS

  • ADLS-AVRO: Generates an Avro file on ADLS

  • ADLS-JSON: Generates a JSON file on ADLS

  • GCS-PARQUET: Generates a parquet file on GCS

  • GCS-ORC: Generates an ORC file on GCS

  • GCS-CSV: Generate a CSV file on GCS

  • GCS-AVRO: Generates an Avro file on GCS

  • GCS-JSON: Generates a JSON file on GCS

N.B: It is possible to output same data into various connectors

Data generated

Data is generated according to a model passed in first argument of the launch.

Exmaples of models could be found under src/main/resources/models/

This model is divided into 4 sections:

Fields:

This is an array describing all fields by at least a name and a type, length could be precised but is optional.
All available types are:

  • STRING is an alphaNumeric string (length represents length of string, by default 20 if not set)

  • STRINGAZ is an alpha non-numeric string (length represents length of string, by default 20 if not set)

  • STRING_REGEX is a string whose value is defined by a regular expression

  • INTEGER (with length representing maximum value, by default Integer.MAX_VALUE)

  • INCREMENT INTEGER An integer increment for each row

  • INCREMENT LONG A long incremented for each row

  • BOOLEAN

  • FLOAT

  • LONG

  • TIMESTAMP (is timestamp of data generation)

  • BYTES (length represents length of byte array, by default 20)

  • HASHMD5 is the hash of a random string (length represents size of byte array, by default 32)

  • BLOB is a byte array of default 1MB (length represents length of byte array) (Use it carefully)

  • DATE is a date represented by the default UTC in format '2011-12-03T10:15:30Z' or as a timestamp

  • DATE_AS_STRING is date that is converted into a string and represented by the pattern user defined

  • BIRTHDATE is a date between 1910 & 2020 (but you can set your own limits)

  • NAME is a first name taken from a dictionary of over 20,000+ names (can be filtered by country)

  • COUNTRY is a country name taken from a dictionary

  • PHONE NUMBER A 10 digits with international indicator in front (can be filtered by country)

  • EMAIL is string as in form of (<name>.<name>|<AZ09><name>)@(gaagle.com|yahaa.com|uutlook.com|email.fr)

  • IP is a string representing an IP in form of Ipv4: 0-255.0-255.0-255.0-255

  • UUID is an unique universal identifier: xxxx-xxxx-xxxx-xxxx

  • CITY is an object representing an existing city (name, lat, long, country) made from a dictionary of over 10,000+ cities, only the name is taken for this field (can be filtered by country)

  • CSV is an object taken from a given CSV file

  • LINK is a string whose values is derived from another field, currently from a CITY or CSV field

Fields values could be also more "deterministic" by providing manually values, or providing values and give them a weight to choose repartition, or even create conditions based on other columns values.

Possible values

Each field could have defined a set of "possible_values" that will limit values to be exactly these.

Possible values weighted

A weight could also be defined to make it less random and make each value having a percentage of appearance. (Only String, Boolean, Integer and Long supports weight). Sum of weights is made internally and probability of appearance of a value will be its weight divided by sum of weights.

Minimum & Maximum

It is possible for INTEGER and LONG type to define a minimum and a maximum.

Regular Expression for String

Using STRING_REGEX, it is possible to define a pattern that the string generated must respect.+ This is done with parameter named regex.

All characters are accepted and will be printed out, to insert a regex, it must be between [], and followed with a number between {} to determine the repetition of this expression.

Inside the [], all values are accepted (including special characters), and must be separated by a ','.
Example: [B, dba, %, zz] will output either: B or dba or % or zz.

To make a range, 3 types of range are available: A-Z for upper, a-z for lower and 0-9 for numbers.
Example: [A-Z] will output an uppercase case letter between A & Z (included). [b-g] will output a lowercase case letter between b & g (included). [1-6] will output a number between 1 & 6 (included).

Finally, few examples:

D-[A-V]{1}[1-7]{1}_C outputs D-F6_C ; D-K1_C ; D-C1_C [A-G]{2}-[0-9]{3}-[A-Z]{2} outputs EF-305-PK ; FA-839-RT ; AG-589-YR

Complete example:

    {
        "name": "department_code",
        "type": "STRING_REGEX",
        "regex": "[A-G]{4}-[b-l]{1}-[3-7]{2}_[A,1,Z,%]{1}-text-not interpreted here"
    }

Outputs: CCBE-c-77_Z-text-not interpreted here GFEB-k-44_%-text-not interpreted here GACD-b-33_1-text-not interpreted here EEBD-l-34_A-text-not interpreted here

N.B: special characters [, ], {, } can be set but must be escaped to not be interpreted

Date Pattern

Using DATE_AS_STRING type, it is possible to set a pattern using pattern parameter, so a date will be generated according to that pattern and output as a string.

The pattern is the standard one from Java, and all specifications can be found here: https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html

N.B: It is also possible to setup a min and max value to generate only between two dates. But, note that min and max must be in the pattern of the output.

Complete example:

    {
    "name": "date_string",
    "type": "DATE_AS_STRING",
    "pattern": "yyyy_MM_dd-K a mm:ss.SSS ZZZ",
    "min": "2024_02_14-7 am 31:53.000 +0000",
    "max": "2024_03_14-7 pm 31:53.050 +0330"
    },

Outputs:

2024_02_20-2 pm 19:48.000 +0000 2024_02_25-1 pm 56:22.000 +0000 2024_02_25-0 am 25:49.000 +0000

Conditionals

Conditions must be make on previous defined columns. Two types of condition:

  1. Formula, possible for Float/Integer/Long Types must be compatible (int can be converted to long but not the invert). It is a simple expression evaluated with operators: * , + , - , / Output column must be of type STRING, Input columns (used to compute) must be INTEGER or LONG or FLOAT

Example:

        "conditionals": {
            "always": "2 * $very_low_int + 56 - $low_int"
        }

Be careful of letting space in your expression to be parsable and evaluated.

  1. Value depend on other column’s value, possible for Integer/Long/Float/String/Boolean (using these types) Support for && (= AND) and || (= OR). Conditions must be equals (=) or unequals (!=) or superior (>) or inferior (<). Multiple conditions is working on same line. Conditions are evaluated one by one like a "if …​ else if …​", first one returning true is picked. Output column must be of type STRING, columns of input must be STRING or LONG or INTEGER or FLOAT

Example:

        "conditionals": {
            "$country_of_provenance=FRANCE" : "Paris",
            "$country_of_provenance=GERMANY | $increment_int<40" : "Berlin",
            "$country_of_provenance=GERMANY & $increment_int>40" : "Berlin_40"
        }

N.B.: Multiple conditions are evaluated using precedence of AND over OR, meaning: A & B | C will in fact be evaluated like (A & B) | C

It is possible to define CITY for a field as its type, this is what happens under the hood:

  • A dictionary of 41,000 cities all around the world is loaded into memory

  • A filter could be applied to take only some cities from one or multiple countries

  • When a row is required, a small city object is constructed, taken randomly from in-memory loaded data, it consists of name, lattitude, longitude and country

It is possible to define a filter based on country for this field, by adding "filters": ["France", "Spain"] in the definition of the field.
With this, only cities whose country is France or Spain will be loaded.

The field CITY will ONLY have the city name written as a value for the row.

It is possible to define LINK for a field as its type, it will be "linked" to a CITY field by defining conditionals on it.

This field will be a string type and will have its value taken from the previous city object created, by either being latitude, longitude or country.

The relationship between this field and the CITY field is defined like this:

    "conditionals": {
        "link": "$city.country"
    }

where city here is the name of another field whose type is CITY.

It is possible to take data from a CSV file with a header, ";" as a separator and a line separator between each line. File path must be specified using: "file": "/home/my_csv.csv"

This file is loaded into memory and filtered (if some filters are specified like this "filters": ["country=France"]).

All fields from teh CSV will be treated as STRING types and a field name must be specified (like this "field": "name" ) to know which one should be set for this field.

Then, a LINK can be made from other fields to this one and hence get linked values.

Examples

A simple definition of a field looks like this:

    {
      "name": "name",
      "type": "NAME"
    }

A definition with restricted values:

    {
      "name": "credits_used",
      "type": "INTEGER",
      "possible_values": [0, 1, -1]
    }

A definition with weighted values to not generate even spread data:

    {
      "name": "country_of_provenance",
      "type": "STRING",
      "possible_values_weighted": {
        "France": 40,
        "Germany": 60
      }
    }

A definition with minimum and maximum:

    {
      "name": "percentage",
      "type": "INTEGER",
      "min": 0,
      "max": 100
    }

A definition with a formula to evaluate value of the column:

    {
      "name": "percentage",
      "type": "INTEGER",
      "conditionals": {
            "formula": "2 * $very_low_int + 56 - $low_int"
        }
    }

The formula is in fact evaluated by a Java Script Engine, hence many functions are available and it is even possible to make if else statements for example

A definition with some conditions (equalities and inequalities) to evaluate its value:

    {
      "name": "percentage",
      "type": "INTEGER",
      "conditionals": {
            "$country_of_provenance=FRANCE" : "Paris",
            "$country_of_provenance=GERMANY | $increment_int<40" : "Berlin",
            "$country_of_provenance=GERMANY & $increment_int>40" : "Berlin_40"
        }
    }

A definition with one field which represent a CITY (filtered on either France or Spain) and other fields for its longitude, latitude and country:

    {
      "name": "city",
      "type": "CITY",
      "possible_values": ["France", "Spain"]
    },
    {
      "name": "city_lat",
      "type": "LINK",
      "conditionals": {
        "link": "$city.lat"
      }
    },
    {
      "name": "city_long",
      "type": "LINK",
      "conditionals": {
        "link": "$city.long"
      }
    },
    {
      "name": "city_country",
      "type": "LINK",
      "conditionals": {
        "link": "$city.country"
      }
    }

A definition with two fields taken from a given CSV file, this file is filtered on a column, and another field is taken as a linked to the first one:

    {
      "name": "person",
      "type": "CSV",
      "filters": ["country=France"],
      "file": "/root/dictionnaries/person_test.csv",
      "field": "name"
    },
    {
      "name": "person_department",
      "type": "LINK",
      "conditionals": {
        "link": "$person.department"
      }
    }

The CSV file looks like this:

name;department;country
francois;PS;France
kamel;SE;France
thomas;RH;Germany
sebastian;PS;Spain

Table Names:

An array of following properties self-describing:

  • HDFS_FILE_PATH

  • HDFS_FILE_NAME

  • HBASE_TABLE_NAME

  • HBASE_NAMESPACE

  • KAFKA_TOPIC

  • OZONE_VOLUME

  • OZONE_BUCKET

  • OZONE_KEY_NAME

  • OZONE_LOCAL_FILE_PATH

  • SOLR_COLLECTION

  • HIVE_DATABASE

  • HIVE_HDFS_FILE_PATH

  • HIVE_TABLE_NAME

  • HIVE_TEMPORARY_TABLE_NAME

  • KUDU_TABLE_NAME

  • LOCAL_FILE_PATH

  • LOCAL_FILE_NAME

  • AVRO_NAME

  • S3_BUCKET

  • S3_DIRECTORY

  • S3_KEY_NAME

  • S3_LOCAL_FILE_PATH

  • ADLS_CONTAINER

  • ADLS_DIRECTORY

  • ADLS_FILE_NAME

  • ADLS_LOCAL_FILE_PATH

  • GCS_BUCKET

  • GCS_DIRECTORY

  • GCS_OBJECT_NAME

  • GCS_LOCAL_FILE_PATH

Primary Keys:

An array of following properties, each of it associated with a value that is corresponding to the name of field (multiple fields could be provided separated by a comma):

  • KAFKA_MSG_KEY

  • HBASE_PRIMARY_KEY

  • KUDU_PRIMARY_KEYS

  • KUDU_HASH_KEYS

  • KUDU_RANGE_KEYS

Options:

An array of other options to configure basic settings for some connectors:

  • HBASE_COLUMN_FAMILIES_MAPPING
    This mapping must be in the form : "CF:col1,col2;CF2:col5"

  • SOLR_SHARDS

  • SOLR_REPLICAS

  • KUDU_REPLICAS

  • ONE_FILE_PER_ITERATION

  • KAFKA_MESSAGE_TYPE

  • KAFKA_JAAS_FILE_PATH

  • SOLR_JAAS_FILE_PATH

  • HIVE_THREAD_NUMBER

  • HIVE_ON_HDFS

  • HIVE_TABLE_TYPE

  • HIVE_TABLE_FORMAT

  • HIVE_TEZ_QUEUE_NAME

  • HIVE_TABLE_PARTITIONS_COLS
    This is a just one string with a comma separated list of cols: "col1,col2"

  • HIVE_TABLE_BUCKETS_COLS
    This is a just one string with a comma separated list of cols: "col1,col2"

  • HIVE_TABLE_BUCKETS_NUMBER

  • CSV_HEADER

  • DELETE_PREVIOUS

  • PARQUET_PAGE_SIZE

  • PARQUET_ROW_GROUP_SIZE

  • PARQUET_DICTIONARY_PAGE_SIZE

  • PARQUET_DICTIONARY_ENCODING

  • KAFKA_ACKS_CONFIG

  • KAFKA_RETRIES_CONFIG

  • KUDU_BUCKETS

  • KUDU_BUFFER

  • KUDU_FLUSH

  • OZONE_REPLICATION_FACTOR

  • HDFS_REPLICATION_FACTOR

  • ADLS_MAX_CONCURRENCY

  • ADLS_MAX_UPLOAD_SIZE

  • ADLS_BLOCK_SIZE

Note that all not required settings could be safely removed with no errors.

Parallel Launch

Note that to make it more efficient and faster, this program can be launched in parallel, and especially on yarn thanks to this project: https://github.infra.cloudera.com/frisch/yarnsubmit.
This project has intent to launch java programs on YARN containers, with as many instances as desired by the user, which is perfectly suited for this project.

The command used to launch the application with yarn-submit project was the following:

        ./yarn-submit.sh
                --app-name=random
                --container-number=10
                --kerberos-user=frisch/admin@FRISCH.COM
                --keytab=/home/frisch/frisch.keytab
                --app-files=/home/frisch/random-datagen/model.json,/home/frisch/random-datagen/config.properties,/home/frisch/random-datagen/log4j.properties
                /home/frisch/random-datagen/random-datagen.jar model.json 1000 100 hbase

Code Architecture

How does it work ?

This is a Spring Boot server, that will listen to requests to generate data.

API Call to the DataGenerationController will lead to a call to CommandRunnerService that will create an object Command. This object contains all properties required to run a data generation: Basic properties like number of batches, threads, rows etc…​ Properties regarding the connector: connector, database name, paths, truststore etc…​ And the model file parsed and set as an object.

This command is queued to be launched and an UUID is returned by the controller. (This ID can be used to track progress of the command)

Generation are made one after another one by the processCommands() scheduled function running every seconds to un-queue a command and run it. It contains the logic of setting up batches, creates threads and send rows to desired connectors.

When a command is scheduled, it is simply added to a List of scheduled commands, which are then periodically checked and added to the queue list of processing if needed.

Other controllers allows to check status of the server, get metrics, check status of commands, get all commands, remove schedule commands. There is also a controller to just run a test on a model, returning a row as a JSON directly.

How to add a connector ?

  • Create a Conncetor under connector package and sub-package that fits, then extends ConnectorInterface

  • Implements required functions (to send one and multiple rows to the output system) and all other needed function in this class

  • Add the connector in the function "stringToConnector" of ConnectorParser under config package

  • Add the connector initialization under the function "connectorsInit" of ConnectorSender under connector package

  • Add a function to create required object for insertion under Field abstract class

  • If needed, add a specific function for some or all Fields extended class

  • Add a function to create required object combining all Fields functions under Row class

  • If needed, under Model class, create a function to create initial queries required

  • Add required properties under config.properties file

How to add a type of field ?

  • Create an extended class of field under package model.type

  • Create a builder in previous class, implement generateRandomValue() function

  • If needed, override Fields function specific to some or all connectors available

  • In Field, instantiateField() function, add in the switch case statement, the new type of field

  • In Model, modify functions on table creation to be able to integrate the new type of field