/dbt-greenplum

Adaptation postgres adapter for Greenplum

Primary LanguagePython

dbt logo

Unit Tests Badge Integration Tests Badge

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.

dbt is the T in ELT. Organize, cleanse, denormalize, filter, rename, and pre-aggregate the raw data in your warehouse so that it's ready for analysis.

dbt-greenplum

The dbt-greenplum package contains the code enabling dbt to work with Greenplum. This adapter based on postgres-adapter with a bit difference for a greenplum specific features

Installation

Easiest way to start use dbt-greenplum is to install it using pip pip install dbt-greenplum==<version>

Where <version> is same as your dbt version

Available versions:

  • 0.19.2
  • 1.0.4
  • 1.2.0
  • 1.4.0
  • 1.5.0

Supported Features

You can specify following settings:

  • Storage type
    • heap
    • appendoptimized
  • Distribution
    • distributed randomly by defaut
    • distributed by (column, [ ... ] ) by setting up distributed_by parameter in the model config
    • distributed replicated by setting up distributed_replicated=true parameter in the model config
  • Table orientation
    • orientation=colum by default
    • orientation=row by setting up orientation parameter in row in the model config
  • Compress type, level and blocksize with default values
     compresstype=ZLIB,
     compresslevel=1,
     blocksize=32768
    You can also specify blocksize, compresstype, compresslevel in the model config
  • appendoptimized preference by default is true, also you can override it by setting up appendoptimized field in the model config
  • Partitions (see "Partition" chapter below)
  • Additional incremental strategy
    • truncate-insert by setting up incremental_strategy="truncate+insert" parameter in the model config or +incremental_strategy: truncate_insert in the dbt_project.yml

Heap table example

To create heap table set appendoptimized parameter value to false

{{
   config(
      ...
      materialized='table',
      appendoptimized=false
      ...
   )
}}

select 1 as "id"

will produce following SQL code

create  table "<db_name>"."<schema_name>"."<table_name>"
with (
   appendoptimized=false
) as (
   select 1 as "id"
)
DISTRIBUTED RANDOMLY;

Appendoptimized table example

You can use appendopimized or appendonly(legacy) to create appendoptimized table

Such model definition

{{
    config(
        materialized='table',
        distributed_by='id',
        appendoptimized=true,
        orientation='column',
        compresstype='ZLIB',
        compresslevel=1,
        blocksize=32768
    )
}}

with source_data as (

    select 1 as id
    union all
    select null as id

)

select *
from source_data

will produce following sql code

create  table "dvault"."dv"."my_first_dbt_model__dbt_tmp"
with (
    appendoptimized=true,
    blocksize=32768,
    orientation=column,
    compresstype=ZLIB,
    compresslevel=1
)
as (
  with source_data as (
      select 1 as id
      union all
      select null as id
    )
  select *
  from source_data
)  
distributed by (id);

  
alter table "dvault"."dv"."my_first_dbt_model__dbt_tmp" rename to "my_first_dbt_model";

Partitions

Greenplum does not support partitions with create table as construction, so you need to build model in two steps

  • create table schema
  • insert data

To implement partitions into you dbt-model you need to specify on of the following config parameters:

  • fields_string - definition of columns name, type and constraints
  • one of following way to configure partitions
    • raw_partition by default
    • partition_type, partition_column, partition_spec
    • partition_type, partition_column, partition_start, partition_end, partition_every
    • partition_type, partition_column, partition_values
  • default_partition_name - name of default partition 'other' by default

Let consider examples of definition model with partitions

  • using raw_partition parameter
    {% set fields_string %}
         id int4 null,
         incomingdate timestamp NULL
    {% endset %}
    
    
    {% set raw_partition %}
        PARTITION BY RANGE (incomingdate)
        (
            START ('2021-01-01'::timestamp) INCLUSIVE
            END ('2023-01-01'::timestamp) EXCLUSIVE
            EVERY (INTERVAL '1 day'),
            DEFAULT PARTITION extra
        );
    {% endset %}
    
    {{
        config(
            materialized='table',
            distributed_by='id',
            appendoptimized=true,
            orientation='column',
            compresstype='ZLIB',
            compresslevel=1,
            blocksize=32768,
            fields_string=fields_string,
            raw_partition=raw_partition,
            default_partition_name='other_data'
        )
    }}
    
    with source_data as (
    
        select
            1 as id,
            '2022-02-22'::timestamp as incomingdate
        union all
        select
            null as id,
            '2022-02-25'::timestamp as incomingdate
    )
    select *
    from source_data
    will produce following sql code
    create table if not exists "database"."schema"."my_first_dbt_model__dbt_tmp" (
        id int4 null,
        incomingdate timestamp NULL
    )
    with (
        appendoptimized=true,
        blocksize=32768,
        orientation=column,
        compresstype=ZLIB,
        compresslevel=1
    )
    DISTRIBUTED BY (id)
    PARTITION BY RANGE (incomingdate)
    (
        START ('2021-01-01'::timestamp) INCLUSIVE
        END ('2023-01-01'::timestamp) EXCLUSIVE
        EVERY (INTERVAL '1 day'),
        DEFAULT PARTITION extra
    );
    
    insert into "database"."schema"."my_first_dbt_model__dbt_tmp" (
        with source_data as (
    
            select
                1 as id,
                '2022-02-22'::timestamp as incomingdate
            union all
            select
                null as id,
                '2022-02-25'::timestamp as incomingdate
        )
        select *
        from source_data
    );
    alter table "dvault"."dv"."my_first_dbt_model" rename to "my_first_dbt_model__dbt_backup";
    drop table if exists "dvault"."dv"."my_first_dbt_model__dbt_backup" cascade;
    alter table "database"."schema"."my_first_dbt_model__dbt_tmp" rename to "my_first_dbt_model";
  • Same result you can get using partition_type, partition_column, partition_spec parameters
    {% set fields_string %}
        id int4 null,
        incomingdate timestamp NULL
    {% endset %}
    
    {%- set partition_type = 'RANGE' -%}
    {%- set partition_column = 'incomingdate' -%}
    {% set partition_spec %}
        START ('2021-01-01'::timestamp) INCLUSIVE
        END ('2023-01-01'::timestamp) EXCLUSIVE
        EVERY (INTERVAL '1 day'),
        DEFAULT PARTITION extra
    {% endset %}
    
    {{
        config(
            materialized='table',
            distributed_by='id',
            appendoptimized=true,
            orientation='column',
            compresstype='ZLIB',
            compresslevel=1,
            blocksize=32768,
            fields_string=fields_string,
            partition_type=partition_type,
            partition_column=partition_column,
            partition_spec=partition_spec,
            default_partition_name='other_data'
        )
    }}
    
    with source_data as (
    
        select
            1 as id,
            '2022-02-22'::timestamp as incomingdate
        union all
        select
            null as id,
            '2022-02-25'::timestamp as incomingdate
    )
    select *
    from source_data
  • also, you can use third way
    {% set fields_string %}
        id int4 null,
        incomingdate timestamp NULL
    {% endset %}
    
    
    {%- set partition_type = 'RANGE' -%}
    {%- set partition_column = 'incomingdate' -%}
    {%- set partition_start = "'2021-01-01'::timestamp" -%}
    {%- set partition_end = "'2022-01-01'::timestamp" -%}
    {%- set partition_every = '1 day' -%}
    
    
    {{
        config(
            materialized='table',
            distributed_by='id',
            appendoptimized=true,
            orientation='column',
            compresstype='ZLIB',
            compresslevel=1,
            blocksize=32768,
            fields_string=fields_string,
            partition_type=partition_type,
            partition_column=partition_column,
            partition_start=partition_start,
            partition_end=partition_end,
            partition_every=partition_every,
            default_partition_name='other_data'
        )
    }}
    
    with source_data as (
    
        select
            1 as id,
            '2022-02-22'::timestamp as incomingdate
        union all
        select
            null as id,
            '2022-02-25'::timestamp as incomingdate
    )
    select *
    from source_data
  • example of partition_type LIST is coming soon

Table partition hints

Too check generate sql script use -d option: dbt -d run <...> -m <models>

If you want implement complex partition logic with subpartition or something else use raw_partition parameter

Truncate+insert incremental strategy

You can use this incremental strategy to safely reload models without cascade dropping dependent objects due to Greenplum's transactional TRUNCATE operation realization

Model definition example:

{{
   config(
      ...
      materialized='incremental',
      incremental_strategy='truncate+insert',
      ...
   )
}}

select *
from source_data

Getting started

Join the dbt Community

Reporting bugs and contributing code

Code of Conduct

Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the dbt Code of Conduct.