/clickhouse-rs

JSON rows support for clickhouse-rs

Primary LanguageRustApache License 2.0Apache-2.0

clickhouse-rs

Official pure Rust typed client for ClickHouse DB.

Crates.io Documentation License Build Status

  • Uses serde for encoding/decoding rows.
  • Supports serde attributes: skip_serializing, skip_deserializing, rename.
  • Uses RowBinary encoding over HTTP transport.
    • There are plans to switch to Native over TCP.
  • Supports TLS (see native-tls and rustls-tls features below).
  • Supports compression and decompression (LZ4 and LZ4HC).
  • Provides API for selecting.
  • Provides API for inserting.
  • Provides API for infinite transactional (see below) inserting.
  • Provides API for watching live views.
  • Provides mocks for unit testing.

Note: ch2rs is useful to generate a row type from ClickHouse.

Usage

To use the crate, add this to your Cargo.toml:

[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }

Note about ClickHouse prior to v22.6

CH server older than v22.6 (2022-06-16) handles RowBinary incorrectly in some rare cases. Use 0.11 and enable wa-37420 feature to solve this problem. Don't use it for newer versions.

Create a client

use clickhouse::Client;

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");
  • Reuse created clients or clone them in order to reuse a connection pool.

Select rows

use serde::Deserialize;
use clickhouse::Row;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let mut cursor = client
    .query("SELECT ?fields FROM some WHERE no BETWEEN ? AND ?")
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • Placeholder ?fields is replaced with no, name (fields of Row).
  • Placeholder ? is replaced with values in following bind() calls.
  • Convenient fetch_one::<Row>() and fetch_all::<Row>() can be used to get a first row or all rows correspondingly.
  • sql::Identifier can be used to bind table names.

Note that cursors can return an error even after producing some rows. To avoid this, use client.with_option("wait_end_of_query", "1") in order to enable buffering on the server-side. More details. The buffer_size option can be useful too.

Insert a batch

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • If end() isn't called, the INSERT is aborted.
  • Rows are being sent progressively to spread network load.
  • ClickHouse inserts batches atomically only if all rows fit in the same partition and their number is less max_insert_block_size.

Infinite inserting

Requires the inserter feature.

let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}
  • Inserter ends an active insert in commit() if thresholds (max_bytes, max_rows, period) are reached.
  • The interval between ending active INSERTs can be biased by using with_period_bias to avoid load spikes by parallel inserters.
  • Inserter::time_left() can be used to detect when the current period ends. Call Inserter::commit() again to check limits if your stream emits items rarely.
  • Time thresholds implemented by using quanta crate to speed the inserter up. Not used if test-util is enabled (thus, time can be managed by tokio::time::advance() in custom tests).
  • All rows between commit() calls are inserted in the same INSERT statement.
  • Do not forget to flush if you want to terminate inserting:
inserter.end().await?;

Perform DDL

client.query("DROP TABLE IF EXISTS some").execute().await?;

Live views

Requires the watch feature.

let mut cursor = client
    .watch("SELECT max(no), argMax(name, no) FROM some")
    .fetch::<Row<'_>>()?;

let (version, row) = cursor.next().await?.unwrap();
println!("live view updated: version={}, row={:?}", version, row);

// Use `only_events()` to iterate over versions only.
let mut cursor = client.watch("some_live_view").limit(20).only_events().fetch()?;
println!("live view updated: version={:?}", cursor.next().await?);
  • Use carefully.
  • This code uses or creates if not exists a temporary live view named lv_{sha1(query)} to reuse the same live view by parallel watchers.
  • You can specify a name instead of a query.
  • This API uses JSONEachRowWithProgress under the hood because of the issue.
  • Only struct rows can be used. Avoid fetch::<u64>() and other without specified names.

See examples.

Feature Flags

  • lz4 (enabled by default) — enables Compression::Lz4 and Compression::Lz4Hc(_) variants. If enabled, Compression::Lz4 is used by default for all queries except for WATCH.
  • native-tls — supports urls with the HTTPS schema via hyper-tls, which links against OpenSSL.
  • rustls-tls — supports urls with the HTTPS schema via hyper-rustls, which does not link against OpenSSL.
  • inserter — enables client.inserter().
  • test-util — adds mocks. See the example. Use it only in dev-dependencies.
  • watch — enables client.watch functionality. See the corresponding section for details.
  • uuid — adds serde::uuid to work with uuid crate.
  • time — adds serde::time to work with time crate.

NOTE: When connecting to ClickHouse via an HTTPS url, you must enable either the native-tls or rustls-tls features. If both are enabled, the rustls-tls feature will take precedence.

Data Types

  • (U)Int(8|16|32|64|128) maps to/from corresponding (u|i)(8|16|32|64|128) types or newtypes around them.

  • (U)Int256 aren't supported directly, but there is a workaround for it.

  • Float(32|64) maps to/from corresponding f(32|64) or newtypes around them.

  • Decimal(32|64|128) maps to/from corresponding i(32|64|128) or newtypes around them. It's more convenient to use fixnum or another implementation of signed fixed-point numbers.

  • Boolean maps to/from bool or newtypes around it.

  • String maps to/from any string or bytes types, e.g. &str, &[u8], String, Vec<u8> or SmartString. Newtypes are also supported. To store bytes, consider using serde_bytes, because it's more efficient.

    Example
    #[derive(Row, Debug, Serialize, Deserialize)]
    struct MyRow<'a> {
        str: &'a str,
        string: String,
        #[serde(with = "serde_bytes")]
        bytes: Vec<u8>,
        #[serde(with = "serde_bytes")]
        byte_slice: &'a [u8],
    }
  • FixedString(N) is supported as an array of bytes, e.g. [u8; N].

    Example
    #[derive(Row, Debug, Serialize, Deserialize)]
    struct MyRow {
        fixed_str: [u8; 16], // FixedString(16)
    }
  • Enum(8|16) are supported using serde_repr.

    Example
    use serde_repr::{Deserialize_repr, Serialize_repr};
    
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        level: Level,
    }
    
    #[derive(Debug, Serialize_repr, Deserialize_repr)]
    #[repr(u8)]
    enum Level {
        Debug = 1,
        Info = 2,
        Warn = 3,
        Error = 4,
    }
  • UUID maps to/from uuid::Uuid by using serde::uuid. Requires the uuid feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(with = "clickhouse::serde::uuid")]
        uuid: uuid::Uuid,
    }
  • IPv6 maps to/from std::net::Ipv6Addr.

  • IPv4 maps to/from std::net::Ipv4Addr by using serde::ipv4.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(with = "clickhouse::serde::ipv4")]
        ipv4: std::net::Ipv4Addr,
    }
  • Date maps to/from u16 or a newtype around it and represents a number of days elapsed since 1970-01-01. Also, time::Date is supported by using serde::time::date, that requires the time feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        days: u16,
        #[serde(with = "clickhouse::serde::time::date")]
        date: Date,
    }
  • Date32 maps to/from i32 or a newtype around it and represents a number of days elapsed since 1970-01-01. Also, time::Date is supported by using serde::time::date32, that requires the time feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        days: i32,
        #[serde(with = "clickhouse::serde::time::date32")]
        date: Date,
    }
  • DateTime maps to/from u32 or a newtype around it and represents a number of seconds elapsed since UNIX epoch. Also, time::OffsetDateTime is supported by using serde::time::datetime, that requires the time feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        ts: u32,
        #[serde(with = "clickhouse::serde::time::datetime")]
        dt: OffsetDateTime,
    }
  • DateTime64(_) maps to/from i32 or a newtype around it and represents a time elapsed since UNIX epoch. Also, time::OffsetDateTime is supported by using serde::time::datetime64::*, that requires the time feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
        #[serde(with = "clickhouse::serde::time::datetime64::secs")]
        dt64s: OffsetDateTime,  // `DateTime64(0)`
        #[serde(with = "clickhouse::serde::time::datetime64::millis")]
        dt64ms: OffsetDateTime, // `DateTime64(3)`
        #[serde(with = "clickhouse::serde::time::datetime64::micros")]
        dt64us: OffsetDateTime, // `DateTime64(6)`
        #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
        dt64ns: OffsetDateTime, // `DateTime64(9)`
    }
  • Tuple(A, B, ...) maps to/from (A, B, ...) or a newtype around it.

  • Array(_) maps to/from any slice, e.g. Vec<_>, &[_]. Newtypes are also supported.

  • Map(K, V) behaves like Array((K, V)).

  • LowCardinality(_) is supported seamlessly.

  • Nullable(_) maps to/from Option<_>. For clickhouse::serde::* helpers add ::option.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(with = "clickhouse::serde::ipv4::option")]
        ipv4_opt: Option<Ipv4Addr>,
    }
  • Nested is supported by providing multiple arrays with renaming.

    Example
    // CREATE TABLE test(items Nested(name String, count UInt32))
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(rename = "items.name")]
        items_name: Vec<String>,
        #[serde(rename = "items.count")]
        items_count: Vec<u32>,
    }
  • Geo types are supported. Point behaves like a tuple (f64, f64), and the rest of the types are just slices of points.

    Example
    type Point = (f64, f64);
    type Ring = Vec<Point>;
    type Polygon = Vec<Ring>;
    type MultiPolygon = Vec<Polygon>;
    type LineString = Vec<Point>;
    type MultiLineString = Vec<LineString>;
    
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        point: Point,
        ring: Ring,
        polygon: Polygon,
        multi_polygon: MultiPolygon,
        line_string: LineString,
        multi_line_string: MultiLineString,
    }
  • JSON, Variant, Dynamic types are not supported for now.

See also the additional examples:

Mocking

The crate provides utils for mocking CH server and testing DDL, SELECT, INSERT and WATCH queries.

The functionality can be enabled with the test-util feature. Use it only in dev-dependencies.

See the example.