NullPointerException when setting eventuate.outbox.id
amirh715 opened this issue · 3 comments
I'm trying to use Eventuate Tram and Eventuate CDC with Apache Kafka. Everything works fine until I set eventuate.outbox.id = 1
on my application.yml
file. I need to have multiple readers/pipelines. I've searched a lot but could not find any answers.
The details are included below:
Stack trace
java.lang.NullPointerException
at io.eventuate.common.spring.jdbc.EventuateSpringJdbcStatementExecutor.insertAndReturnGeneratedId(EventuateSpringJdbcStatementExecutor.java:38)
at io.eventuate.common.jdbc.EventuateCommonJdbcOperations.insertIntoMessageTableDatabaseId(EventuateCommonJdbcOperations.java:163)
at io.eventuate.common.jdbc.EventuateCommonJdbcOperations.insertIntoMessageTable(EventuateCommonJdbcOperations.java:126)
at io.eventuate.common.jdbc.EventuateCommonJdbcOperations.insertIntoMessageTable(EventuateCommonJdbcOperations.java:106)
at io.eventuate.tram.messaging.producer.jdbc.MessageProducerJdbcImpl.send(MessageProducerJdbcImpl.java:28)
at io.eventuate.tram.messaging.producer.common.MessageProducerImpl.send(MessageProducerImpl.java:49)
at io.eventuate.tram.messaging.producer.common.MessageProducerImpl.lambda$send$2(MessageProducerImpl.java:37)
at io.eventuate.tram.messaging.producer.common.MessageProducerImplementation.withContext(MessageProducerImplementation.java:14)
at io.eventuate.tram.messaging.producer.common.MessageProducerImpl.send(MessageProducerImpl.java:37)
at io.eventuate.tram.events.publisher.DomainEventPublisherImpl.publish(DomainEventPublisherImpl.java:35)
at io.eventuate.tram.events.publisher.DomainEventPublisherImpl.publish(DomainEventPublisherImpl.java:25)
at com.easybuy.oneaccess.domain.services.UserCommandService.registerNewUser(UserCommandService.java:40)
application.yml
eventuate:
outbox:
id: 1 --> causes the error
database:
schema: ${DATABASE_NAME}
eventuatelocal:
kafka:
bootstrap:
servers: ${KAFKA_URL}
zookeeper:
connection:
string: ${ZOOKEEPER_URL}
server:
port: ${APP_PORT}
spring:
application:
name: ${APP_NAME}
jpa:
generate-ddl: true
datasource:
url: ${DATABASE_URL}
username: ${DATABASE_USER}
password: ${DATABASE_USER_PASSWORD}
driver-class-name: com.mysql.cj.jdbc.Driver
management:
endpoint:
health:
enabled: true
prometheus:
enabled: true
endpoints:
web:
exposure:
include: "*"
pom.xml
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-messaging</artifactId>
<version>0.30.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-events</artifactId>
<version>0.30.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-commands</artifactId>
<version>0.30.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-jdbc-kafka</artifactId>
<version>0.30.0.RELEASE</version>
</dependency>
Configuration
import io.eventuate.tram.spring.events.common.TramEventsCommonAutoConfiguration;
import io.eventuate.tram.spring.events.publisher.TramEventsPublisherConfiguration;
import io.eventuate.tram.spring.jdbckafka.TramJdbcKafkaConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
@EnableAutoConfiguration
@Import({
TramEventsPublisherConfiguration.class,
TramEventsCommonAutoConfiguration.class,
TramJdbcKafkaConfiguration.class
})
public class EventuateEventsConfiguration {
}
docker-compose.yml
version: '3'
name: EasyBuy
networks:
app-tier:
name: app-tier
services:
zookeeper:
image: bitnami/zookeeper:3.7.1
networks:
- app-tier
ports:
- 2181:2181
environment:
ALLOW_ANONYMOUS_LOGIN: yes
kafka:
image: bitnami/kafka:3.1.1
networks:
- app-tier
ports:
- 9092:9092
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: yes
eventuate-cdc:
image: eventuateio/eventuate-cdc-service:0.13.0.RELEASE
networks:
- app-tier
ports:
- 8099:8080
depends_on:
- mysql
- zookeeper
- kafka
environment:
EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
EVENTUATE_CDC_SERVICE_DRY_RUN: false
USE_DB_ID: true
# oneaccess
EVENTUATE_CDC_READER_ONEACCESSREADER_TYPE: polling
EVENTUATE_CDC_READER_ONEACCESSREADER_DATASOURCEURL: jdbc:mysql://mysql:3306/oneaccess_db?allowPublicKeyRetrieval=true&useSSL=false
EVENTUATE_CDC_READER_ONEACCESSREADER_DATASOURCEUSERNAME: root
EVENTUATE_CDC_READER_ONEACCESSREADER_DATASOURCEPASSWORD: password
EVENTUATE_CDC_READER_ONEACCESSREADER_DATASOURCEDRIVERCLASSNAME: com.mysql.jdbc.Driver
EVENTUATE_CDC_READER_ONEACCESSREADER_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/eventuatelocal
EVENTUATE_CDC_READER_ONEACCESSREADER_OUTBOXID: 1
EVENTUATE_CDC_PIPELINE_ONEACCESSPIPELINE_TYPE: eventuate-tram
EVENTUATE_CDC_PIPELINE_ONEACCESSPIPELINE_READER: oneaccessreader
EVENTUATE_CDC_PIPELINE_ONEACCESSPIPELINE_EVENTUATEDATABASESCHEMA: oneaccess_db
.
.
.
The problem is mostly likely the database schema: the message table does not have an auto-incrementing primary key. Hence a NPE on this line:
First, this is the base schema: https://github.com/eventuate-foundation/eventuate-common/blob/e66da82f5901370414e249e98d61d3d2e3edc472/mysql/2.initialize-database.sql#L6
Second, the DB generated IDs are enabled by starting the Eventuate MySQL with USE_DB_ID set:
Third, this "activates", in other words, migrates the schema to https://github.com/eventuate-foundation/eventuate-common/blob/39f791cd35b5026d387755b865e81a320dee0b74/mysql/4.initialize-database-db-id.sql#L4
OK. So this script is not legit? I need Sagas too. Also I'm not allowed to use eventuate-mysql.
CREATE DATABASE oneaccess_db;
USE oneaccess_db;
DROP table IF EXISTS events;
DROP table IF EXISTS entities;
DROP table IF EXISTS snapshots;
DROP table IF EXISTS cdc_monitoring;
create table events (
event_id varchar(100) PRIMARY KEY,
event_type varchar(1000),
event_data varchar(1000) NOT NULL,
entity_type VARCHAR(1000) NOT NULL,
entity_id VARCHAR(1000) NOT NULL,
triggering_event VARCHAR(1000),
metadata VARCHAR(1000),
published TINYINT DEFAULT 0
);
CREATE INDEX events_idx ON events(entity_type(25), entity_id(25), event_id(25));
CREATE INDEX events_published_idx ON events(published, event_id(50));
create table entities (
entity_type VARCHAR(1000),
entity_id VARCHAR(1000),
entity_version VARCHAR(1000) NOT NULL,
PRIMARY KEY(entity_type(50), entity_id(50))
);
CREATE INDEX entities_idx ON events(entity_type(50), entity_id(50));
create table snapshots (
entity_type VARCHAR(1000),
entity_id VARCHAR(1000),
entity_version VARCHAR(1000),
snapshot_type VARCHAR(1000) NOT NULL,
snapshot_json VARCHAR(1000) NOT NULL,
triggering_events VARCHAR(1000),
PRIMARY KEY(entity_type(25), entity_id(25), entity_version(25))
);
DROP Table IF Exists message;
DROP Table IF Exists received_messages;
DROP Table IF Exists offset_store;
CREATE TABLE message (
id VARCHAR(767) PRIMARY KEY,
destination VARCHAR(1000) NOT NULL,
headers VARCHAR(1000) NOT NULL,
payload VARCHAR(1000) NOT NULL,
published SMALLINT DEFAULT 0,
creation_time BIGINT
);
CREATE INDEX message_published_idx ON message(published, id(50));
CREATE TABLE received_messages (
consumer_id VARCHAR(767),
message_id VARCHAR(767),
PRIMARY KEY(consumer_id(50), message_id(50)),
creation_time BIGINT
);
CREATE TABLE offset_store(
client_name VARCHAR(255) NOT NULL PRIMARY KEY,
serialized_offset VARCHAR(255)
);
DROP Table IF Exists saga_instance_participants;
DROP Table IF Exists saga_instance;
DROP Table IF Exists saga_lock_table;
DROP Table IF Exists saga_stash_table;
CREATE TABLE saga_instance_participants (
saga_type VARCHAR(100) NOT NULL,
saga_id VARCHAR(100) NOT NULL,
destination VARCHAR(100) NOT NULL,
resource VARCHAR(100) NOT NULL,
PRIMARY KEY(saga_type(25), saga_id(25), destination(25), resource(25))
);
CREATE TABLE saga_instance(
saga_type VARCHAR(100) NOT NULL,
saga_id VARCHAR(100) NOT NULL,
state_name VARCHAR(100) NOT NULL,
last_request_id VARCHAR(100),
end_state INT(1),
compensating INT(1),
saga_data_type VARCHAR(1000) NOT NULL,
saga_data_json VARCHAR(1000) NOT NULL,
PRIMARY KEY(saga_type(50), saga_id(50))
);
create table saga_lock_table(
target VARCHAR(100) PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
saga_Id VARCHAR(100) NOT NULL
);
create table saga_stash_table(
message_id VARCHAR(100) PRIMARY KEY,
target VARCHAR(100) NOT NULL,
saga_type VARCHAR(100) NOT NULL,
saga_id VARCHAR(100) NOT NULL,
message_headers VARCHAR(1000) NOT NULL,
message_payload VARCHAR(1000) NOT NULL
);
For DB generated IDs you also need this script: https://github.com/eventuate-foundation/eventuate-common/blob/39f791cd35b5026d387755b865e81a320dee0b74/mysql/4.initialize-database-db-id.sql#L4
Either:
- Append it to your script
- Change the table definitions in your script to match those in
4.initialize-database-db-id.sql