Passo 1. Criar os tópicos
docker run --rm --network=kafka-ksqldb-sales_network confluentinc/cp-kafka:7.7.1 kafka-topics --create --topic sales --bootstrap-server kafka:9093 --partitions 1 --replication-factor 1
docker run --rm --network=kafka-ksqldb-sales_network confluentinc/cp-kafka:7.7.1 kafka-topics --create --topic products --bootstrap-server kafka:9093 --partitions 1 --replication-factor 1
Passo 2. Criar as estruturas no ksqlDB
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
CREATE STREAM sales_stream (
product_id STRING,
quantity INT,
price DOUBLE,
sale_date STRING, -- Alterado para STRING
payment_method STRING,
customer_id STRING,
region STRING,
discount DOUBLE,
total_weight DOUBLE,
delivery_date STRING -- Alterado para STRING
) WITH (
KAFKA_TOPIC='sales',
VALUE_FORMAT='JSON'
);
CREATE TABLE total_sales_by_product AS
SELECT product_id,
SUM(quantity * price) AS total_sales,
SUM(quantity) AS total_quantity,
AVG(price) AS avg_price,
MIN(sale_date) AS first_sale_date,
MAX(sale_date) AS last_sale_date
FROM sales_stream
GROUP BY product_id;
Passo 3. Popular Produtos
docker-compose exec kafka kafka-console-producer --bootstrap-server localhost:9092 --topic sales
{"product_id": "101", "quantity": 2, "price": 10.0}
{"product_id": "102", "quantity": 1, "price": 15.0}
{"product_id": "103", "quantity": 5, "price": 8.0}
{"product_id": "104", "quantity": 2, "price": 12.0}
{"product_id": "105", "quantity": 4, "price": 20.0}
{"product_id": "106", "quantity": 1, "price": 25.0}
{"product_id": "107", "quantity": 3, "price": 18.0}
{"product_id": "108", "quantity": 6, "price": 30.0}
{"product_id": "109", "quantity": 2, "price": 22.0}
{"product_id": "110", "quantity": 1, "price": 35.0}
{"product_id": "101", "quantity": 3, "price": 10.0}
{"product_id": "102", "quantity": 2, "price": 15.0}
{"product_id": "103", "quantity": 4, "price": 8.0}
{"product_id": "104", "quantity": 5, "price": 12.0}
{"product_id": "105", "quantity": 1, "price": 20.0}
{"product_id": "106", "quantity": 2, "price": 25.0}
{"product_id": "107", "quantity": 6, "price": 18.0}
{"product_id": "108", "quantity": 3, "price": 30.0}
{"product_id": "109", "quantity": 1, "price": 22.0}
{"product_id": "110", "quantity": 4, "price": 35.0}
docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic products --from-beginning
docker-compose exec kafka kafka-console-producer --bootstrap-server localhost:9092 --topic products
{"name": "Widget A", "category": "Widgets"}
{"name": "Widget B", "category": "Widgets"}
{"name": "Gadget X", "category": "Gadgets"}
{"name": "Gadget Y", "category": "Gadgets"}
{"name": "Tool Z", "category": "Tools"}
{"name": "Tool W", "category": "Tools"}
{"name": "Device Q", "category": "Devices"}
{"name": "Device R", "category": "Devices"}
{"name": "Instrument S", "category": "Instruments"}
{"name": "Instrument T", "category": "Instruments"}
docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic sales --from-beginning