- Clone the project
git clone https://github.com/CliffLolo/postgres_kafka.git
- Go to the project directory
cd postgres_kafka
- Run the docker compose file
docker-compose up -d
If it is successful, you'll have everything running in their own containers, with Debezium configured to ship changes from Postgres into Kafka.
- Open a new terminal and run this command to point Debezium to Postgres
curl -X POST -H “Accept:application/json” -H 'Content-Type:application/json' localhost:8083/connectors --data '
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname" : "inventory_db",
"database.server.name": "postgres"
}
}
- Log into the debezium container and run this to view the Kafka topics
bin/kafka-topics.sh --list --bootstrap-server postgres_kafka_kafka_1:9092
- Launch the Materialize CLI
docker-compose run cli
- Now that you're in the Materialize CLI, define all of the tables in postgres.inventory_db as Kafka sources:
CREATE SOURCE sales
FROM KAFKA BROKER 'kafka:9092' TOPIC 'postgres.public.sales_sale'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'
ENVELOPE DEBEZIUM;
CREATE SOURCE sale_items
FROM KAFKA BROKER 'kafka:9092' TOPIC 'postgres.public.sales_saleitem'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'
ENVELOPE DEBEZIUM;
Note that materialize knows the column types to use for each attribute because the sources we created above are pulling message schema data from the registry.
-
Run
SHOW SOURCES;
in the CLI to see all the sources created -
Create a materialized view summarizing cost_price by product_alias_id
CREATE MATERIALIZED VIEW cost_price AS
SELECT sale_items.product_alias_id AS product_alias_id,
SUM(sale_items.unit_cost*sale_items.quantity) AS total_selling_price,
MAX(sale_items.unit_cost) as unit_selling_price,
SUM(sale_items.quantity) AS quantity_sold
FROM sale_items
JOIN sales ON sales.id=sale_items.sale_id
GROUP BY product_alias_id;
- Now, if you select from this materialized view, you can see the results in real-time:
SELECT * FROM cost_price ORDER;
-
Run
SHOW VIEWS;
in the CLI to see all the views created -
We now have a materialized view we can visualize in a BI tool like Metabase. Close out of the Materialize CLI (Ctrl + D).
-
In a browser, go to localhost:3030
-
Click Let's get started.
-
Complete the first set of fields.
-
On the Add your data page, fill in the following information:
Field Enter... Database type Postgres Name inventory Host materialized Port 6875 Database name materialize Database username materialize Database password Leave empty -
Proceed past the screens until you reach your primary dashboard.
-
Click Ask a question
-
Click Native query.
-
From Select a database, select inventory.
-
In the query editor, enter:
SELECT * FROM item_summary;
-
You can save the output and add it to a dashboard, once you've drafted a dashboard you can manually set the refresh rate to 1 second by adding
#refresh=1
to the end of the URL.