/datasaurus

Data Engineering framework written in Python based in Polars.

Primary LanguagePython

Datasaurus is a Data Engineering framework written in Python 3.8, 3.9, 3.10 and 3.11

It is based in Polars and heavily influenced by Django.

Datasaurus offers an opinionated, feature-rich and powerful framework to help you write data pipelines, ETLs or data manipulation programs.

Documentation (TODO)

It supports:

  • โœ… Fully support read/write operations.
  • โญ• Not yet but will be implemented.
  • ๐Ÿ’€ Won't be implemented in the near future.

Storages:

  • Sqlite โœ…
  • PostgresSQL โœ…
  • MySQL โœ…
  • Mariadb โœ…
  • Local Storage โœ…
  • Azure blob storage โญ•
  • AWS S3 โญ•

Formats:

  • CSV โœ…
  • JSON โœ…
  • PARQUET โœ…
  • EXCEL โœ…
  • AVRO โœ…
  • TSV โญ•
  • SQL โญ• (Like sql inserts)

Features:

  • Delta Tables โญ•
  • Field validations โญ•

Simple example

# settings.py 
from datasaurus.core.storage import PostgresStorage, StorageGroup, SqliteStorage
from datasaurus.core.models import StringColumn, IntegerColumn

# We set the environment that will be used.
os.environ['DATASAURUS_ENVIRONMENT'] = 'dev'

class ProfilesData(StorageGroup):
    dev = SqliteStorage(path='/data/data.sqlite')
    live = PostgresStorage(username='user', password='user', host='localhost', database='postgres')

    
# models.py
from datasaurus.core.models import Model, StringColumn, IntegerColumn

class ProfileModel(Model):
    id = IntegerColumn()
    username = StringColumn()
    mail = StringColumn()
    sex = StringColumn()

    class Meta:
        storage = ProfilesData
        table_name = 'PROFILE'

We can access the raw Polars dataframe with 'Model.df', it's lazy, meaning it will only load the data if we access the attribute.

>>> ProfileModel.df
shape: (100, 4)
โ”Œโ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”
โ”‚ id  โ”† username           โ”† mail                     โ”† sex โ”‚
โ”‚ --- โ”† ---                โ”† ---                      โ”† --- โ”‚
โ”‚ i64 โ”† str                โ”† str                      โ”† str โ”‚
โ•žโ•โ•โ•โ•โ•โ•ชโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ชโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ชโ•โ•โ•โ•โ•โ•ก
โ”‚ 1   โ”† ehayes             โ”† colleen63@hotmail.com    โ”† F   โ”‚
โ”‚ 2   โ”† thompsondeborah    โ”† judyortega@hotmail.com   โ”† F   โ”‚
โ”‚ 3   โ”† orivera            โ”† iperkins@hotmail.com     โ”† F   โ”‚
โ”‚ 4   โ”† ychase             โ”† sophia92@hotmail.com     โ”† F   โ”‚
โ”‚ โ€ฆ   โ”† โ€ฆ                  โ”† โ€ฆ                        โ”† โ€ฆ   โ”‚
โ”‚ 97  โ”† mary38             โ”† sylvia80@yahoo.com       โ”† F   โ”‚
โ”‚ 98  โ”† charlessteven      โ”† usmith@gmail.com         โ”† F   โ”‚
โ”‚ 99  โ”† plee               โ”† powens@hotmail.com       โ”† F   โ”‚
โ”‚ 100 โ”† elliottchristopher โ”† wilsonbenjamin@yahoo.com โ”† M   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”˜

We could now create a new model whose data is created from ProfileModel

class FemaleProfiles(Model):
    id = IntegerField()
    profile_id = IntegerField()
    mail = StringField()

    def calculate_data(self):
        return (
            ProfileModel.df
            .filter(ProfileModel.sex == 'F')
            .with_row_count('new_id')
            .with_columns(
                pl.col('new_id')
            )
            .with_columns(
                pl.col('id').alias('profile_id')
            )
        )

    class Meta:
        recalculate = 'if_no_data_in_storage'
        storage = ProfilesData
        table_name = 'PROFILE_FEMALES'

Et voilรก! the columns will be auto selected from the column definitions (id, profile_id and email).

If we now call:

FemaleProfiles.df

It will check if the dataframe exists in the storage and if it does not, it will 'calculate' it again from calculate_data and save it to the Storage, this parameter can also be set to 'always'.

You can also move data to different environments or storages, making it easy to change formats or move data around:

FemaleProfiles.save(to=ProfilesData.live)

Effectively moving data from SQLITE (dev) to PostgreSQL (live),

# Can also change formats
FemaleProfiles.save(to=ProfilesData.otherenvironment, format=LocalFormat.JSON)
FemaleProfiles.save(to=ProfilesData.otherenvironment, format=LocalFormat.CSV)
FemaleProfiles.save(to=ProfilesData.otherenvironment, format=LocalFormat.PARQUET)