/r2dbc-postgresql

R2DBC PostgreSQL Implementation

Primary LanguageJavaApache License 2.0Apache-2.0

Reactive Relational Database Connectivity PostgreSQL Implementation Build Status Maven Central

This project contains the PostgreSQL implementation of the R2DBC SPI. This implementation is not intended to be used directly, but rather to be used as the backing implementation for a humane client library to delegate to.

This driver provides the following features:

  • Login with username/password (MD5, SASL/SCRAM) or implicit trust
  • SCRAM authentication
  • Unix Domain Socket transport
  • TLS
  • Explicit transactions
  • Notifications
  • Logical Decode
  • Binary data transfer
  • Execution of prepared statements with bindings
  • Execution of batch statements without bindings
  • Read and write support for all data types except LOB types (e.g. BLOB, CLOB)
  • Fetching of REFCURSOR using io.r2dbc.postgresql.api.RefCursor
  • Extension points to register Codecs to handle additional PostgreSQL data types

Next steps:

  • Multi-dimensional arrays

Code of Conduct

This project is governed by the Spring Code of Conduct. By participating, you are expected to uphold this code of conduct. Please report unacceptable behavior to spring-code-of-conduct@pivotal.io.

Getting Started

Here is a quick teaser of how to use R2DBC PostgreSQL in Java:

URL Connection Factory Discovery

ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:postgres://<host>:5432/<database>");

Publisher<? extends Connection> connectionPublisher = connectionFactory.create();

Programmatic Connection Factory Discovery

Map<String, String> options = new HashMap<>();
options.put("lock_timeout", "10s");
options.put("statement_timeout", "5m");

ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
   .option(DRIVER, "postgresql")
   .option(HOST, "...")
   .option(PORT, 5432)  // optional, defaults to 5432
   .option(USER, "...")
   .option(PASSWORD, "...")
   .option(DATABASE, "...")  // optional
   .option(OPTIONS, options) // optional
   .build());

Publisher<? extends Connection> connectionPublisher = connectionFactory.create();

// Alternative: Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

Supported ConnectionFactory Discovery Options

Option Description
ssl Enables SSL usage (SSLMode.VERIFY_FULL)
driver Must be postgresql.
host Server hostname to connect to
port Server port to connect to. Defaults to 5432. (Optional)
socket Unix Domain Socket path to connect to as alternative to TCP. (Optional)
username Login username
password Login password (Optional when using TLS Certificate authentication)
database Database to select. (Optional)
applicationName The name of the application connecting to the database. Defaults to r2dbc-postgresql. (Optional)
autodetectExtensions Whether to auto-detect and register Extensions from the class path. Defaults to true. (Optional)
fetchSize The default number of rows to return when fetching results. Defaults to 0 for unlimited. (Optional)
forceBinary Whether to force binary transfer. Defaults to false. (Optional)
preparedStatementCacheQueries Determine the number of queries that are cached in each connection. The default is -1, meaning there's no limit. The value of 0 disables the cache. Any other value specifies the cache size.
options A Map<String, String> of connection parameters. These are applied to each database connection created by the ConnectionFactory. Useful for setting generic PostgreSQL connection parameters. (Optional)
schema The schema to set. (Optional)
sslMode SSL mode to use, see SSLMode enum. Supported values: DISABLE, ALLOW, PREFER, REQUIRE, VERIFY_CA, VERIFY_FULL. (Optional)
sslRootCert Path to SSL CA certificate in PEM format. (Optional)
sslKey Path to SSL key for TLS authentication in PEM format. (Optional)
sslCert Path to SSL certificate for TLS authentication in PEM format. (Optional)
sslPassword Key password to decrypt SSL key. (Optional)
sslHostnameVerifier javax.net.ssl.HostnameVerifier implementation. (Optional)

Programmatic Configuration

Map<String, String> options = new HashMap<>();
options.put("lock_timeout", "10s");

PostgresqlConnectionFactory connectionFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
    .host("...")
    .port(5432)  // optional, defaults to 5432
    .username("...")
    .password("...")
    .database("...")  // optional
    .options(options) // optional
    .build());

Mono<Connection> mono = connectionFactory.create();

PostgreSQL uses index parameters that are prefixed with $. The following SQL statement makes use of parameters:

INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3)

Parameters are referenced using the same identifiers when binding these:

mono.flatMapMany(connection -> connection
                .createStatement("INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3)")
                .bind("$1", 1)
                .bind("$2", "Walter")
                .bind("$3", "White")
                .execute());

Binding also allowed positional index (zero-based) references. The parameter index is derived from the parameter discovery order when parsing the query.

Maven configuration

Artifacts can be found on Maven Central.

<dependency>
  <groupId>io.r2dbc</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <version>${version}</version>
</dependency>

If you'd rather like the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.

<dependency>
  <groupId>io.r2dbc</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <version>${version}.BUILD-SNAPSHOT</version>
</dependency>

<repository>
  <id>spring-libs-snapshot</id>
  <name>Spring Snapshot Repository</name>
  <url>https://repo.spring.io/libs-snapshot</url>
</repository>

Listen/Notify

Listen and Notify provide a simple form of signal or inter-process communication mechanism for processes accessing the same PostgreSQL database. For Listen/Notify, two actors are involved: The sender (notify) and the receiver (listen). The following example uses two connections to illustrate how they work together:

PostgresqlConnection sender = …;
PostgresqlConnection receiver = …;

Flux<Notification> listen = receiver.createStatement("LISTEN mymessage")
                                .execute()
                                .flatMap(PostgresqlResult::getRowsUpdated)
                                .thenMany(receiver.getNotifications());

Mono<Void> notify = sender.createStatement("NOTIFY mymessage, 'Hello World'")
                            .execute()
                            .flatMap(PostgresqlResult::getRowsUpdated)
                            .then();

Upon subscription, the first connection enters listen mode and publishes incoming Notifications as Flux. The second connection broadcasts a notification to the mymessage channel upon subscription.

JSON/JSONB support

PostgreSQL supports JSON by storing values in JSON/JSONB columns. These values can be consumed and written using the regular R2DBC SPI and by using driver-specific extensions with the io.r2dbc.postgresql.codec.Json type.

You can choose from two approaches:

  • Native JSONB encoding using the Json wrapper type.
  • Using scalar types.

The difference between the Json type and scalar types is that Json values are written encoded as JSONB to the database. byte[] and String types are represented as BYTEA respective VARCHAR and require casting ($1::JSON) when used with parameterized statements.

The following code shows INSERT and SELECT cases for JSON interaction:

CREATE TABLE my_table (my_json JSON);

Write JSON

connection.createStatement("INSERT INTO my_table (my_json) VALUES($1)")
            .bind("$1", Json.of("{\"hello\": \"world\"}")).execute();

Consume JSON

connection.createStatement("SELECT my_json FROM my_table")
            .execute()
            .flatMap(it -> it.map((row, rowMetadata) -> row.get("my_json", Json.class)))
            .map(Json::asString);

Write JSON using casting

connection.createStatement("INSERT INTO my_table (my_json) VALUES($1::JSON)")
    .bind("$1", "{\"hello\": \"world\"}").execute();

Consume JSON as scalar type

connection.createStatement("SELECT my_json FROM my_table")
    .execute()
    .flatMap(it -> it.map((row, rowMetadata) -> row.get("my_json", String.class)));

The following types are supported for JSON exchange:

  • io.r2dbc.postgresql.codec.Json
  • ByteBuf (must be released after usage to avoid memory leaks)
  • ByteBuffer
  • byte[]
  • String
  • InputStream (must be closed after usage to avoid memory leaks)

Cursors

The driver can consume cursors that were created by PL/pgSQL as refcursor. Cursors are represented as RefCursor objects. Cursors obtained from Result can be used to fetch the cursor directly. Since cursors are stateful, they must be closed once they are no longer in use.

connection.createStatement("SELECT show_cities_multiple()").execute()
    .flatMap(result -> result.map((row, rowMetadata) -> row.get(0, RefCursor.class)))
    .flatMap(cursor -> {
        Mono<PostgresResult> data = cursor.fetch()
            .flatMap(…)
            .then(rc.close());
        return data;
    });

Logical Decode

PostgreSQL allows replication streaming and decoding persistent changes to a database's tables into useful chunks of data. In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.

Consuming the replication stream is a four-step process:

  1. Obtain a replication connection via PostgresqlConnectionFactory.replication().
  2. Create a replication slot (physical/logical).
  3. Initiate replication using the replication slot.
  4. Once the replication stream is set up, you can consume and map the binary data using ReplicationStream.map(…).

On application shutdown, close() the ReplicationStream.

Note that a connection is busy once the replication is active and a connection can have at most one active replication stream.

Mono<PostgresqlReplicationConnection> replicationMono = connectionFactory.replication();

// later:
ReplicationSlotRequest request = ReplicationSlotRequest.logical()
                                        .slotName("my_slot")
                                        .outputPlugin("test_decoding")
                                        .temporary()
                                        .build();
Mono<ReplicationSlot> createSlot = replicationConnection.createSlot(request);

ReplicationRequest replicationRequest = ReplicationRequest.logical()
                                        .slotName("my_slot")
                                        .startPosition(LogSequenceNumber.valueOf(0))
                                        .slotOption("skip-empty-xacts", true)
                                        .slotOption("include-xids", false)
                                        .build();

Flux<T> replicationStream = replicationConnection.startReplication(replicationRequest).flatMapMany(it -> {
    return it.map(byteBuf -> {…})
        .doOnError(t -> it.close().subscribe());
});

Data Type Mapping

This reference table shows the type mapping between PostgreSQL and Java data types:

PostgreSQL Type Supported Data Type
bigint Long, Boolean, Byte, Short, Integer, BigDecimal, BigInteger
bit Not yet supported.
bit varying Not yet supported.
boolean or bool Boolean
box Not yet supported.
bytea ByteBuffer, byte[], Blob
character String
character varying String
cidr Not yet supported.
circle Not yet supported.
date LocalDate
double precision Double, Float, Boolean, Byte, Short, Integer, Long, BigDecimal, BigInteger
inet InetAddress
integer Integer, Boolean, Byte, Short, Long, BigDecimal, BigInteger
interval Not yet supported.
json Json, String. Reading: ByteBufbyte[], ByteBuffer, String, InputStream
jsonb Json, String. Reading: ByteBufbyte[], ByteBuffer, String, InputStream
line Not yet supported.
lseg Not yet supported.
macaddr Not yet supported.
macaddr8 Not yet supported.
money Not yet supported.
numeric BigDecimal, Boolean, Byte, Short, Integer, Long, BigInteger
oid Integer, Boolean, Byte, Short, Long, BigDecimal, BigInteger
path Not yet supported.
pg_lsn Not yet supported.
point Not yet supported.
polygon Not yet supported.
real Float, Double, Boolean, Byte, Short, Integer, Long, BigDecimal, BigInteger
smallint Short, Boolean, Byte, Integer, Long, BigDecimal, BigInteger
smallserial Integer, Boolean, Byte, Short, Long, BigDecimal, BigInteger
serial Long, Boolean, Byte, Short, Integer, BigDecimal, BigInteger
text String, Clob
time [without time zone] LocalTime
time [with time zone] Not yet supported.
timestamp [without time zone] LocalDateTime, LocalTime, LocalDate, java.util.Date
timestamp [with time zone] OffsetDatetime, ZonedDateTime, Instant
tsquery Not yet supported.
tsvector Not yet supported.
txid_snapshot Not yet supported.
uuid UUID, String
xml Not yet supported.

Types in bold indicate the native (default) Java type.

Support for the following single-dimensional arrays (read and write):

PostgreSQL Type Supported Data Type
text[] String[]
integer[] or int[] Integer[], Long[], Short[]

Extension mechanism

This driver accepts the following extensions:

  • CodecRegistrar to contribute Codecs for PostgreSQL ObjectIDs.

Extensions can be registered programmatically using PostgresConnectionConfiguration or discovered using Java's ServiceLoader mechanism (from META-INF/services/io.r2dbc.postgresql.extension.Extension).

Logging

If SL4J is on the classpath, it will be used. Otherwise, there are two possible fallbacks: Console or java.util.logging.Logger). By default, the Console fallback is used. To use the JDK loggers, set the reactor.logging.fallback System property to JDK.

Logging facilities:

  • Driver Logging (io.r2dbc.postgresql)
  • Query Logging (io.r2dbc.postgresql.QUERY on DEBUG level)
  • Transport Logging (io.r2dbc.postgresql.client)
    • DEBUG enables Message exchange logging
    • TRACE enables traffic logging

Getting Help

Having trouble with R2DBC? We'd love to help!

Reporting Issues

R2DBC uses GitHub as issue tracking system to record bugs and feature requests. If you want to raise an issue, please follow the recommendations below:

  • Before you log a bug, please search the issue tracker to see if someone has already reported the problem.
  • If the issue doesn't already exist, create a new issue.
  • Please provide as much information as possible with the issue report, we like to know the version of R2DBC PostgreSQL that you are using and JVM version.
  • If you need to paste code, or include a stack trace use Markdown ``` escapes before and after your text.
  • If possible try to create a test-case or project that replicates the issue. Attach a link to your code or a compressed file containing your code.

Building from Source

You don't need to build from source to use R2DBC PostgreSQL (binaries in Maven Central), but if you want to try out the latest and greatest, R2DBC PostgreSQL can be easily built with the maven wrapper. You also need JDK 1.8 and Docker to run integration tests.

 $ ./mvnw clean install

If you want to build with the regular mvn command, you will need Maven v3.5.0 or above.

Also see CONTRIBUTING.adoc if you wish to submit pull requests, and in particular please sign the Contributor's Agreement before your first change, however trivial.

Running JMH Benchmarks

Running the JMH benchmarks builds and runs the benchmarks without running tests.

 $ ./mvnw clean install -Pjmh

License

This project is released under version 2.0 of the Apache License.