/dbt-graph-theory

A dbt package designed to help SQL based analysis of graphs

Primary LanguageShellApache License 2.0Apache-2.0

dbt-graph-theory

Tests

A DBT package designed to help SQL based analysis of graphs.

Supported adapters are:

  • dbt-snowflake
  • dbt-postgres (note postgres version >= 10 is required)
  • dbt-bigquery (see important note below!!!)
  • dbt-duckdb (note duckdb version >= 0.8.1 is required)

Adapter contributions are welcome! Generally new adapters require additions to the macros/utils folder, assuming the given database / engine supports recursive CTEs elegantly. In some cases (namely bigquery), specific array handling was required.

It's recommended to use the unit test suit to develop new adapters. Please get in touch if you need assistance!

IMPORTANT NOTE: BigQuery is untested in the wild, and is quite brittle regarding the recursive keyword. Ensure you only macros without CTE nesting - for example, to use largest_connected_subgraphs, write SQL like:

-- model.sql
{{
  largest_connected_subgraphs(...)
}}

rather than

-- model.sql
with recursive foo as (
  {{
    largest_connected_subgraphs(...)
  }}
)
...

This is to ensure that the recursive handling works. A feature request to improve this behaviour has been sent to google - please upvote: https://issuetracker.google.com/u/1/issues/263510050


Install

dbt-graph-theory currently supports dbt 1.0.0 or higher.

Check dbt package hub for the latest installation instructions, or read the docs for more information on installing packages.

Include in packages.yml

packages:
  - package: jpmmcneill/dbt_graph_theory
    version: [">=0.1.0", "<0.2.0"]
    # <see https://github.com/jpmmcneill/dbt-graph-theory/releases/latest> for the latest version tag

For latest release, see https://github.com/jpmmcneill/dbt-graph-theory/releases


Introduction

A graph is a structure defined by a set of vertices and edges.

flowchart
  A---B
  A---C
  B---C
  C---D
  B---E
  E---F
  A---F
Loading

The above is a graph with vertices {A, B, C, D, E, F}, and edges described by the lines between vertices. In the context of this package, this graph is represented by the SQL table:

edge_id vertex_1 vertex_2
1 A B
2 A C
3 B C
4 C D
5 B E
6 E F
7 A F

In table representation, null vertices represent vertices that are not connected to any other vertices.

The following tables:

edge_id vertex_1 vertex_2
1 A
2 B C
edge_id vertex_1 vertex_2
1 A
2 B C

are equivalent to:

flowchart
  A
  B---C
Loading

In this package, all rows are considered - meaning that the following tables are equivalent:

edge_id vertex_1 vertex_2
1 A
2 A
3
4 A B
edge_id vertex_1 vertex_2
1 A B

This package also supports multiple graphs being represented in the same table:

graph_id edge_id vertex_1 vertex_2
1 1 A B
1 2 A C
1 3 B C
1 4 C D
2 1 A' B'
2 2 B' C'
2 3 C' D'
flowchart
  subgraph 1
  A---B
  A---C
  B---C
  C---D
  end
  subgraph 2
  A'---B'
  B'---C'
  C'---D'
  end
Loading

While in the example above no vertex labels were shared between different graph_ids, this is not a strict requirement. When a vertex is shared between two graph_ids, the vertices are considered seperate (ie. all algorithms are performed on the graph_id level).

edge_id should be unique over the table (when graph_id is not defined) or at a graph_id level when graph_id is defined.

Types of Graph

This package currently two types of graphs: Ordered Graphs and Non Ordered Graphs. There currently no in data difference between these graphs (ie. a given graph table can correspond to both types), but some macros are designed to be implemented on specific types of graph. For example, connect_ordered_graph is designed to only be used with ordered graphs.

Ordered Graphs

Ordered graphs have a natural order to their nodes. In this package, the ordering is typically implemented via edges - meaning that this package treats ordered graphs as an extension of induced ordered graphs. For more detail, see wikipedia. Ordered graphs are typically interacted with via an ordering field - this corresponds to the order of the edges / nodes. Supported Ordering types are numeric and date(time).

An example of an ordered graph might be a rename path, or a customer's subscription history. These would both be ordered via a timestamp at which the rename happened / the customer swapped subscription type (respectively).

Non Ordered Graphs

Non ordered graphs are simply graphs that do not fall into the above definition.


Variables

This package currently has no variables that need to be configured.


Integration Tests (Developers Only)

This section assumes development on a mac, where python3 & postgresql are installed.

At the moment, postgres and duckdb integration tests are implemented.

Setting up python environment

Integration tests for this repository are managed via the integration_tests folder.

To set up a python3 virtual environment, run the following in order from the integration_tests folder.

python3 -m venv ci_venv
source ci_venv/bin/activate
pip install -r requirements/requirements_<adapter>.txt

ie. to use postgres, run `pip install -r requirements/requirements_postgres.txt

To exit the virtual environment, simply run:

deactivate

By default, dbt runs against postgres on setup. You can use any of the adapters listed (the best way of seeing these is by the requirements files). Each adapter has a specific target in the ci_profiles.yml. You can specifc these with the --target flag in dbt - ie. dbt run --profiles-dir ci_profiles --target ci_duckdb will run dbt with duckdb.

The easiest way to follow along with this is with the CI file specified in .github/workflows/ci.yml

Setting up postgres server

Postgres integration tests run on a local postgres server. The below assumes postgres has been installed via homebrew.

A postgres server can be spun up with the following:

bash bin/setup-postgres    --sets up a postgres server for running integration tests locally

Note that both of these silence stdout (ie error / success messages) so you may experience unexpected behaviour. If so - please raise an issue on GitHub.

Once, a postgres server has been setup, it can be started and stopped with the following:

bash bin/start-postgres    --starts the postgres server for running integration tests locally
bash bin/stop-postgres    --stops the postgres server for running integration tests locally

These can be useful when you want to persist data from previous runs of integration tests but not constantly run a postgres server.

Finally, the postgres server can be destroyed via:

bash bin/destroy-postgres    --destroys the postgres server for running integration tests locally

Running dbt

To run dbt, simply run dbt commands as usual, specifying the CI profile & selecting the integration tests:

dbt clean
dbt deps
dbt seed --profiles-dir ci_profiles
dbt build -s dbt_graph_theory_integration_tests --profiles-dir ci_profiles

It is often easier to set a DBT_PROFILES_DIR environment variable to remove the need for the --profiles-dir ci_profiles part of the above:

export DBT_PROFILES_DIR=ci_profiles

Remember to undo this when you are finished with:

unset DBT_PROFILES_DIR

The style of integration test is raw data that is seeded and validated against by running a model and running tests that should pass when the expected result is present.

Viewing local data

To view data generated in the integration tests locally, simply connect to the database and query the given table.

The details change between adapters. For postgres:

psql ci_db
select * from <table>;
...
quit -- to end the server connection

All CI models are required to run and pass tests for a merge to be allowed.


Contents

Generic tests

Macros

Helper Macros

Generic Tests

Arguments:

  • edge_id [text]: the name of the field for the edge_id column in the given table graph representation.
  • vertex_1 [text]: the name of the field for the vertex_1 column in the given table graph representation.
  • vertex_2 [text]: the name of the field for the vertex_2 column in the given table graph representation.
  • graph_id [Optional, text]: the name of the field for the graph_id column in the given table graph representation.

Usage:

models:
  - name: <model_name>
    tests:
      - dbt_graph_theory.graph_is_connected:
          graph_id: ...
          edge_id: ...
          vertex_1: ...
          vertex_2: ...

Tests whether the given model (a table representation of a graph) is connected. A connected graph is defined as one where a path exists between any two nodes. As an example, the below graph is not connected:

flowchart
  A---B
  B---C
  D---E
  E---F
  D---F
  E---G
Loading

Macros

Readme is TODO

Arguments:

  • input: the input node (inputted as ref(...) or source(...)) or CTE (inputted as a string)
  • edge_id [text]: the name of the field for the edge_id column in the given table graph representation.
  • vertex_1 [Any]: the name of the field for the vertex_1 column in the given table graph representation.
  • vertex_2 [Any]: the name of the field for the vertex_2 column in the given table graph representation.
  • ordering [Dict[text,text]]: the field (and data type) corresponding to the ordering of the given edges. For example, {'timestamp_field': 'timestamp'} corresponds to a field named timestamp_field of type timestamp being the ordering field. Data types must be one of: 'timestamp', 'date', 'numeric'.
  • graph_id [Optional, text]: the name of the field for the graph_id column in the given table graph representation.

Usage:

with connect_subgraphs as (
    {{ dbt_graph_theory.connect_ordered_graph(
        input=ref('example_model'),
        edge_id='edge_id_field_name',
        vertex_1='vertex_1_field_name',
        vertex_2='vertex_2_field_name',
        ordering={'ordering_field': 'numeric'},
        graph_id='graph_id_field_name'
    ) }}
)
...
...
with connect_subgraphs as (
    {{ dbt_graph_theory.connect_ordered_graph(
        input=ref('example_model'),
        edge_id='edge_id_field_name',
        vertex_1='vertex_1_field_name',
        vertex_2='vertex_2_field_name',
        ordering={'different_ordering_field': 'timestamp'}
    }}
)
...

This connects disconnected subgraphs (at the graph_id level) based on a ranking of orderings. This can only be applied on ordered graphs, meaning that an ordering must be provided via the ordering kwarg.

In the below graph (the text on arrows is the ordering - in this case, numeric):

flowchart
  A--->|1|B
  B--->|2|C
  D--->|3|E
  E--->|4|F
Loading

The following graph (in table representation) is returned:

flowchart
  A--->|1|B
  B--->|2|C
  C--->|2.5|D
  D--->|3|E
  E--->|4|F
Loading

The table in this case will have a new row in the output:

graph_id edge_id vertex_2 vertex_1 ordering_field
1 inserted_edge_1 C D 2.5

The orderings of inserted edges are dependent on the data type in question.

Timestamps will be 1 second behind the later edge that is being connected to. For example:

flowchart
  A--->|2021-01-01 10:25:15|B
  B--->|2021-01-04 17:43:45|C
  D--->|2021-01-05 14:02:05|E
Loading

would become

flowchart
  A--->|2021-01-01 10:25:15|B
  B--->|2021-01-04 17:43:45|C
  C--->|2021-01-05 14:02:04|D
  D--->|2021-01-05 14:02:05|E
Loading

Dates will be 1 day behind the later edge (if this does not conflict with the earlier edge, in which case the date is left equal):

flowchart
  A--->|2021-01-01|B
  B--->|2021-01-04|C
  D--->|2021-01-05|E
Loading

would become

flowchart
  A--->|2021-01-01|B
  B--->|2021-01-04|C
  C--->|2021-01-05|D
  D--->|2021-01-05|E
Loading

and

flowchart
  A--->|2021-01-01|B
  B--->|2021-01-04|C
  D--->|2021-01-07|E
Loading

would become

flowchart
  A--->|2021-01-01|B
  B--->|2021-01-04|C
  C--->|2021-01-06|D
  D--->|2021-01-07|E
Loading

Numerics go exactly in between the two nodes being connected, as demonstrated in the above example.

Arguments:

  • input: the input node (inputted as ref(...) or source(...)) or CTE (inputted as a string)
  • edge_id [text]: the name of the field for the edge_id column in the given table graph representation.
  • vertex_1 [Any]: the name of the field for the vertex_1 column in the given table graph representation.
  • vertex_2 [Any]: the name of the field for the vertex_2 column in the given table graph representation.
  • graph_id [Optional, text]: the name of the field for the graph_id column in the given table graph representation.

Usage:

with subgraphs as (
    {{ dbt_graph_theory.largest_connected_subgraphs(
        input=ref('example_model'),
        edge_id='edge_id_field_name',
        vertex_1='vertex_1_field_name',
        vertex_2='vertex_2_field_name',
        graph_id='graph_id_field_name'
    ) }}
)
...
...
subgraphs as (
    {{ dbt_graph_theory.largest_connected_subgraphs(
        input='example_cte',
        edge_id='different_edge_id_field_name',
        vertex_1='different_vertex_1_field_name',
        vertex_2='different_vertex_2_field_name'
    ) }}
)
...

This macro groups vertices into the largest connected subgraph that they are a member of.

In the below graph:

flowchart
  A---B
  B---C
  D---E
  E---F
  D---F
  E---G
Loading

The following table is returned:

vertex subgraph_id subgraph_members
A 1 ['A', 'B', 'C']
B 1 ['A', 'B', 'C']
C 2 ['A', 'B', 'C']
D 2 ['D', 'E', 'F', 'G']
E 2 ['D', 'E', 'F', 'G']
F 2 ['D', 'E', 'F', 'G']
G 2 ['D', 'E', 'F', 'G']

subgraph_id is designed to be unique to both the graph and subgraph level. When graph_id is defined, the output is also at a graph_id level.

Helper Macros

Note that the below are designed for internal (ie. dbt-graph-theory) use only. Use them at your own risk!

Arguments:

  • field [text]: the field to be aggregated into an array (inputted as 'field_name').
  • distinct [optional, bool]: whether the aggregation should only include distinct values (inputted as true/false). Defaults to false.
  • order_field [optional, text]: the field that the array elements should be ordered by (inputted as 'field_name'). Defaults to none.
  • order [optional, text]: the ordering that the order_field should be ordered by (inputted as 'field_name'). Defaults to none.

This is an adapter specific macro for aggregating a column into an array.

This macro excludes nulls.

Usage:

select
    date_month,
    {{ dbt_graph_theory.array_agg(field='customer_id') }} as customer_array
from {{ ref('model') }}
group by date_month
select
    date_month,
    {{ dbt_graph_theory.array_agg(field='customer_id', distinct=true, order_field='num_orders') }} as customer_array
from {{ ref('model') }}
group by date_month

Arguments:

  • array [text]: the array field where new values should be appended into (inputted as 'field_name').
  • new_value [text]: the field (or value) to be appended to the existing array(inputted as 'field_name' or 'value').

This is an adapter specific macro for appending a new value into an array.

Usage:

select
    {{ dbt_graph_theory.array_append('existing_array', 'new_field_to_append') }} as updated_existing_array
from {{ ref('model') }}
select
    {{ dbt_graph_theory.array_append('existing_array', "'a_hardcoded_string'") }} as updated_existing_array
from {{ ref('model') }}

Arguments:

  • components [list[text]]: the jinja list which will be used (in order) for the array's construction.

This is an adapter specific macro for constructuring an array from a list of values.

Usage:

{% set list = ['field_one', 'field_two', "'hardcoded_string'"] %}
select
    {{ dbt_graph_theory.array_construct(components=list) }} as constructed_array
from {{ ref('model') }}
select
    {{ dbt_graph_theory.array_construct(components=['1', 'a_field']) }} as constructed_array
from {{ ref('model') }}

Arguments:

  • array [text]: the array field which will be checked for inclusion of the given value.
  • value [text]: the field (or hardcoded data) which is checked for in the given array.

This is an adapter specific macro to test whether a value is contained within an array.

Usage:

select
    {{ dbt_graph_theory.array_contains(array='array_field', value='other_value') }} as value_contained__bool
from {{ ref('model') }}

Note: README structure inspired by dbt-lab's dbt-utils and calogica's dbt-date and dbt-expectations.

Exceptions

These macros are for internal package use only. They should help with raising consistend error messages.

Arguments: none

Raise an exception to highlight that the given adapter is not currently supported.

Usage:

...
{{ dbt_graph_theory.adapter_missing_exception() }}
...