/flow

Monorepo for Flow PHP - Unified data processing library

Primary LanguagePHPMIT LicenseMIT

img

Flow is a PHP-based, strongly typed ETL (Extract Transform Load), asynchronous data processing library with constant memory consumption.

Latest Stable Version Latest Unstable Version License Test Suite

Supported PHP versions: PHP 8.1 PHP 8.2

We Stand Against Terror

Stand With Ukraine Stand With Us
Flag of Ukraine Flag of Israel

On Feb. 24, 2022, Russia declared an unprovoked war on Ukraine and launched a full-scale invasion. Russia is currently bombing peaceful Ukrainian cities, including schools and hospitals and attacking civilians who are fleeing conflict zones.

On Oct. 7, 2023, the national holiday of Simchat Torah, Hamas terrorists initiated an attack on Israel in the early hours, targeting civilians. They unleashed violence that resulted in at least 1,400 casualties and abducted at least 200 individuals, not limited to Israelis.


Usage

<?php

declare(strict_types=1);

use Flow\ETL\DSL\Parquet;
use Flow\ETL\Filesystem\SaveMode;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use Flow\ETL\DSL\To;
use Flow\ETL\Flow;
use Flow\ETL\GroupBy\Aggregation;

require __DIR__ . '/vendor/autoload.php';

(new Flow())
    ->read(Parquet::from(__FLOW_DATA__ . '/orders_flow.parquet'))
    ->select('created_at', 'total_price', 'discount')
    ->withEntry('created_at', ref('created_at')->toDate(\DateTime::RFC3339)->dateFormat('Y/m'))
    ->withEntry('revenue', ref('total_price')->minus(ref('discount')))
    ->select('created_at', 'revenue')
    ->groupBy('created_at')
    ->aggregate(Aggregation::sum(ref('revenue')))
    ->sortBy(ref('created_at')->desc())
    ->withEntry('daily_revenue', ref('revenue_sum')->round(lit(2))->numberFormat(lit(2)))
    ->drop('revenue_sum')
    ->write(To::output(truncate: false))
    ->mode(SaveMode::Overwrite)
    ->withEntry('created_at', ref('created_at')->toDate('Y/m'))
    ->write(Parquet::to(__FLOW_OUTPUT__ . '/daily_revenue.parquet'))
    ->run();
$ php daily_revenue.php
+------------+---------------+
| created_at | daily_revenue |
+------------+---------------+
|    2023/10 |    206,669.74 |
|    2023/09 |    227,647.47 |
|    2023/08 |    237,027.31 |
|    2023/07 |    240,111.05 |
|    2023/06 |    225,536.35 |
|    2023/05 |    234,624.74 |
|    2023/04 |    231,472.05 |
|    2023/03 |    231,697.36 |
|    2023/02 |    211,048.97 |
|    2023/01 |    225,539.81 |
+------------+---------------+
10 rows

Features

  • low and constant memory consumption
  • asynchronous data processing
  • reading from any data source
  • writing to any data source
  • rich collection of data transformation functions
  • direct access to remote filesystems
  • partitioning
  • grouping & aggregating
  • remote file processing
  • joins
  • sorting
  • displaying datasets as ASCII table
  • validation against the schema
  • window functions
  • caching

📈Project Roadmap

Installation

This package is a monorepo. Please check the below packages and select only those that you are going to use, this will reduce the number of unnecessary dependencies in your project (less maintenance).

For example, if you want to work with JSON/CSV files here are the dependencies you will need to install:

composer require flow-php/etl:^0.1 flow-php/etl-adapter-csv:^0.1 flow-php/etl-adapter-json:^0.1

Docker

Since some of the Flow adapters require additional PHP extensions, we have prepared a Docker image with all the necessary dependencies.

$ docker pull ghcr.io/flow-php/flow:latest
$ docker run -v $(pwd):/flow-workspace --rm -it flow-php/flow:latest
Flow-PHP - Extract Transform Load - Data processing framework 0.4.0-325-g6c3e4404

Usage:
  command [options] [arguments]

Options:
  -h, --help            Display help for the given command. When no command is given display help for the list command
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi|--no-ansi  Force (or disable --no-ansi) ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Available commands:
  completion             Dump the shell completion script
  help                   Display help for a command
  list                   List commands
  run                    Run ETL pipeline
 parquet
  parquet:read:data      Read data from parquet file
  parquet:read:metadata  Read metadata from parquet file

If you would like to try Flow, fork this repository, navigate to it through command line interface and execute following command:

$ docker run -v $(pwd):/flow-workspace --rm -it flow-php/flow:latest run /flow-workspace/examples/topics/aggregations/daily_revenue.php

Flow CLI will grab the pipeline definition from examples/topics/aggregations/daily_revenue.php file and execute it.

Usage

In order to understand how Flow works, please read documentation

Building blocks

  • DataFrame - Lazy data processing frame.
  • Rows - Immutable collection of Row objects.
  • Row - Immutable, strongly typed collection of Entry objects.
  • Entry - Immutable, strongly typed object representing a cell in a row.
  • Extractor (Reader) - Memory safe, Data Source returning \Generator, yielding Rows to the Pipeline
  • Transformer - Data transformer receiving and returning Rows (in most cases transformer), one instance of Rows at once.
  • Loader (Writer) - Memory safe representation of Data Sink, the responsibility of Loader is to write Rows into destination storage, one at time.
  • Pipeline - Interface representing ETL process, each received Rows instanced is passed through all Pipes, also responsible for error handling.
  • Pipe - Loader of Transformer instance existing in the Pipes collection.

Asynchronous Processing

GitHub Stars

Star History Chart

Sponsors

Flow PHP is sponsored by:

  • Blackfire - the best PHP profiling and monitoring tool!