eventuate-tram/eventuate-tram-core

NullPointerException when setting eventuate.outbox.id

amirh715 opened this issue · 3 comments

I'm trying to use Eventuate Tram and Eventuate CDC with Apache Kafka. Everything works fine until I set eventuate.outbox.id = 1 on my application.yml file. I need to have multiple readers/pipelines. I've searched a lot but could not find any answers.
The details are included below:

Stack trace

java.lang.NullPointerException
	at io.eventuate.common.spring.jdbc.EventuateSpringJdbcStatementExecutor.insertAndReturnGeneratedId(EventuateSpringJdbcStatementExecutor.java:38)
	at io.eventuate.common.jdbc.EventuateCommonJdbcOperations.insertIntoMessageTableDatabaseId(EventuateCommonJdbcOperations.java:163)
	at io.eventuate.common.jdbc.EventuateCommonJdbcOperations.insertIntoMessageTable(EventuateCommonJdbcOperations.java:126)
	at io.eventuate.common.jdbc.EventuateCommonJdbcOperations.insertIntoMessageTable(EventuateCommonJdbcOperations.java:106)
	at io.eventuate.tram.messaging.producer.jdbc.MessageProducerJdbcImpl.send(MessageProducerJdbcImpl.java:28)
	at io.eventuate.tram.messaging.producer.common.MessageProducerImpl.send(MessageProducerImpl.java:49)
	at io.eventuate.tram.messaging.producer.common.MessageProducerImpl.lambda$send$2(MessageProducerImpl.java:37)
	at io.eventuate.tram.messaging.producer.common.MessageProducerImplementation.withContext(MessageProducerImplementation.java:14)
	at io.eventuate.tram.messaging.producer.common.MessageProducerImpl.send(MessageProducerImpl.java:37)
	at io.eventuate.tram.events.publisher.DomainEventPublisherImpl.publish(DomainEventPublisherImpl.java:35)
	at io.eventuate.tram.events.publisher.DomainEventPublisherImpl.publish(DomainEventPublisherImpl.java:25)
	at com.easybuy.oneaccess.domain.services.UserCommandService.registerNewUser(UserCommandService.java:40)

application.yml

eventuate:
  outbox:
    id: 1 --> causes the error
  database:
    schema: ${DATABASE_NAME}
eventuatelocal:
  kafka:
    bootstrap:
      servers: ${KAFKA_URL}
  zookeeper:
    connection:
      string: ${ZOOKEEPER_URL}
server:
  port: ${APP_PORT}
spring:
  application:
    name: ${APP_NAME}
  jpa:
    generate-ddl: true
  datasource:
    url: ${DATABASE_URL}
    username: ${DATABASE_USER}
    password: ${DATABASE_USER_PASSWORD}
    driver-class-name: com.mysql.cj.jdbc.Driver
management:
  endpoint:
    health:
      enabled: true
    prometheus:
      enabled: true
  endpoints:
    web:
      exposure:
        include: "*"

pom.xml

<dependency>
	<groupId>io.eventuate.tram.core</groupId>
	<artifactId>eventuate-tram-spring-messaging</artifactId>
	<version>0.30.0.RELEASE</version>
</dependency>
<dependency>
	<groupId>io.eventuate.tram.core</groupId>
	<artifactId>eventuate-tram-spring-events</artifactId>
	<version>0.30.0.RELEASE</version>
</dependency>
<dependency>
	<groupId>io.eventuate.tram.core</groupId>
	<artifactId>eventuate-tram-spring-commands</artifactId>
	<version>0.30.0.RELEASE</version>
</dependency>
<dependency>
	<groupId>io.eventuate.tram.core</groupId>
	<artifactId>eventuate-tram-spring-jdbc-kafka</artifactId>
	<version>0.30.0.RELEASE</version>
</dependency>

Configuration

import io.eventuate.tram.spring.events.common.TramEventsCommonAutoConfiguration;
import io.eventuate.tram.spring.events.publisher.TramEventsPublisherConfiguration;
import io.eventuate.tram.spring.jdbckafka.TramJdbcKafkaConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@EnableAutoConfiguration
@Import({
        TramEventsPublisherConfiguration.class,
        TramEventsCommonAutoConfiguration.class,
        TramJdbcKafkaConfiguration.class
})
public class EventuateEventsConfiguration {

}

docker-compose.yml

version: '3'
name: EasyBuy
networks:
  app-tier:
    name: app-tier
services:
  zookeeper:
    image: bitnami/zookeeper:3.7.1
    networks:
      - app-tier
    ports:
      - 2181:2181
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes
  kafka:
    image: bitnami/kafka:3.1.1
    networks:
      - app-tier
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      ALLOW_PLAINTEXT_LISTENER: yes
  eventuate-cdc:
    image: eventuateio/eventuate-cdc-service:0.13.0.RELEASE
    networks:
      - app-tier
    ports:
      - 8099:8080
    depends_on:
      - mysql
      - zookeeper
      - kafka
    environment:
      EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
      EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
      EVENTUATE_CDC_SERVICE_DRY_RUN: false
      USE_DB_ID: true
      # oneaccess
      EVENTUATE_CDC_READER_ONEACCESSREADER_TYPE: polling
      EVENTUATE_CDC_READER_ONEACCESSREADER_DATASOURCEURL: jdbc:mysql://mysql:3306/oneaccess_db?allowPublicKeyRetrieval=true&useSSL=false
      EVENTUATE_CDC_READER_ONEACCESSREADER_DATASOURCEUSERNAME: root
      EVENTUATE_CDC_READER_ONEACCESSREADER_DATASOURCEPASSWORD: password
      EVENTUATE_CDC_READER_ONEACCESSREADER_DATASOURCEDRIVERCLASSNAME: com.mysql.jdbc.Driver
      EVENTUATE_CDC_READER_ONEACCESSREADER_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/eventuatelocal
      EVENTUATE_CDC_READER_ONEACCESSREADER_OUTBOXID: 1
      EVENTUATE_CDC_PIPELINE_ONEACCESSPIPELINE_TYPE: eventuate-tram
      EVENTUATE_CDC_PIPELINE_ONEACCESSPIPELINE_READER: oneaccessreader
      EVENTUATE_CDC_PIPELINE_ONEACCESSPIPELINE_EVENTUATEDATABASESCHEMA: oneaccess_db
.
.
.

OK. So this script is not legit? I need Sagas too. Also I'm not allowed to use eventuate-mysql.

CREATE DATABASE oneaccess_db;
USE oneaccess_db;

DROP table IF EXISTS events;
DROP table IF EXISTS entities;
DROP table IF EXISTS snapshots;
DROP table IF EXISTS cdc_monitoring;

create table events (
  event_id varchar(100) PRIMARY KEY,
  event_type varchar(1000),
  event_data varchar(1000) NOT NULL,
  entity_type VARCHAR(1000) NOT NULL,
  entity_id VARCHAR(1000) NOT NULL,
  triggering_event VARCHAR(1000),
  metadata VARCHAR(1000),
  published TINYINT DEFAULT 0
);

CREATE INDEX events_idx ON events(entity_type(25), entity_id(25), event_id(25));
CREATE INDEX events_published_idx ON events(published, event_id(50));

create table entities (
  entity_type VARCHAR(1000),
  entity_id VARCHAR(1000),
  entity_version VARCHAR(1000) NOT NULL,
  PRIMARY KEY(entity_type(50), entity_id(50))
);

CREATE INDEX entities_idx ON events(entity_type(50), entity_id(50));

create table snapshots (
  entity_type VARCHAR(1000),
  entity_id VARCHAR(1000),
  entity_version VARCHAR(1000),
  snapshot_type VARCHAR(1000) NOT NULL,
  snapshot_json VARCHAR(1000) NOT NULL,
  triggering_events VARCHAR(1000),
  PRIMARY KEY(entity_type(25), entity_id(25), entity_version(25))
);

DROP Table IF Exists message;
DROP Table IF Exists received_messages;
DROP Table IF Exists offset_store;

CREATE TABLE message (
  id VARCHAR(767) PRIMARY KEY,
  destination VARCHAR(1000) NOT NULL,
  headers VARCHAR(1000) NOT NULL,
  payload VARCHAR(1000) NOT NULL,
  published SMALLINT DEFAULT 0,
  creation_time BIGINT
);

CREATE INDEX message_published_idx ON message(published, id(50));

CREATE TABLE received_messages (
  consumer_id VARCHAR(767),
  message_id VARCHAR(767),
  PRIMARY KEY(consumer_id(50), message_id(50)),
  creation_time BIGINT
);

CREATE TABLE offset_store(
  client_name VARCHAR(255) NOT NULL PRIMARY KEY,
  serialized_offset VARCHAR(255)
);

DROP Table IF Exists saga_instance_participants;
DROP Table IF Exists saga_instance;
DROP Table IF Exists saga_lock_table;
DROP Table IF Exists saga_stash_table;

CREATE TABLE saga_instance_participants (
  saga_type VARCHAR(100) NOT NULL,
  saga_id VARCHAR(100) NOT NULL,
  destination VARCHAR(100) NOT NULL,
  resource VARCHAR(100) NOT NULL,
  PRIMARY KEY(saga_type(25), saga_id(25), destination(25), resource(25))
);


CREATE TABLE saga_instance(
  saga_type VARCHAR(100) NOT NULL,
  saga_id VARCHAR(100) NOT NULL,
  state_name VARCHAR(100) NOT NULL,
  last_request_id VARCHAR(100),
  end_state INT(1),
  compensating INT(1),
  saga_data_type VARCHAR(1000) NOT NULL,
  saga_data_json VARCHAR(1000) NOT NULL,
  PRIMARY KEY(saga_type(50), saga_id(50))
);

create table saga_lock_table(
  target VARCHAR(100) PRIMARY KEY,
  saga_type VARCHAR(100) NOT NULL,
  saga_Id VARCHAR(100) NOT NULL
);

create table saga_stash_table(
  message_id VARCHAR(100) PRIMARY KEY,
  target VARCHAR(100) NOT NULL,
  saga_type VARCHAR(100) NOT NULL,
  saga_id VARCHAR(100) NOT NULL,
  message_headers VARCHAR(1000) NOT NULL,
  message_payload VARCHAR(1000) NOT NULL
);

cer commented

For DB generated IDs you also need this script: https://github.com/eventuate-foundation/eventuate-common/blob/39f791cd35b5026d387755b865e81a320dee0b74/mysql/4.initialize-database-db-id.sql#L4

Either:

  • Append it to your script
  • Change the table definitions in your script to match those in 4.initialize-database-db-id.sql