/iotdb-client-rs

Rust client for apache iotdb.

Primary LanguageRustApache License 2.0Apache-2.0

Apache IoTDB

Main Mac and Linux Main Win coveralls GitHub release License IoTDB Website

Apache IoTDB (Database for Internet of Things) is an IoT native database with high performance for data management and analysis, deployable on the edge and the cloud. Due to its light-weight architecture, high performance and rich feature set together with its deep integration with Apache Hadoop, Spark and Flink, Apache IoTDB can meet the requirements of massive data storage, high-speed data ingestion and complex data analysis in the IoT industrial fields.

Apache IoTDB Client for Rust

Overview

This is the Rust client of Apache IoTDB.

Apache IoTDB website: https://iotdb.apache.org Apache IoTDB Github: https://github.com/apache/iotdb

Prerequisites

apache-iotdb 0.12.0 and newer.
rust 1.56.0 and newer.

How to Use the Client (Quick Start)

Usage

Put this in your Cargo.toml:

[dependencies]
iotdb-client-rs="^0.3.12"

Example

Put this in your example's Cargo.toml:

[dependencies]
iotdb-client-rs="^0.3.12"
chrono="0.4.19"
prettytable-rs="0.8.0"
structopt = "0.3.25"
use std::{collections::BTreeMap, vec};

use chrono::Local;
use iotdb::client::remote::{Config, RpcSession};
use iotdb::client::{MeasurementSchema, Result, RowRecord, Session, Tablet, Value};
use iotdb::protocal::{TSCompressionType, TSDataType, TSEncoding};
use prettytable::{cell, Row, Table};
use structopt::StructOpt;

fn main() {
    run().expect("failed to run session_example.");
}

fn run() -> Result<()> {
    #[derive(StructOpt)]
    #[structopt(name = "session_example")]
    struct Opt {
        #[structopt(short = "h", long, default_value = "127.0.0.1")]
        host: String,

        #[structopt(short = "P", long, default_value = "6667")]
        port: i32,

        #[structopt(short = "u", long, default_value = "root")]
        user: String,

        #[structopt(short = "p", long, default_value = "root")]
        password: String,

        #[structopt(short = "c", long)]
        clean: bool,
    }

    let opt = Opt::from_args();
    // let config = Config {
    //     host: opt.host,
    //     port: opt.port,
    //     username: opt.user,
    //     password: opt.password,
    //     ..Default::default()
    // };

    // Create config object with builder
    let config = Config::builder()
        .host(opt.host)
        .port(opt.port)
        .username(opt.user)
        .password(opt.password)
        .build();

    let mut session = RpcSession::new(config)?;
    session.open()?;

    //time_zone
    let tz = session.get_time_zone()?;
    if tz != "Asia/Shanghai" {
        session.set_time_zone("Asia/Shanghai")?;
    }

    //set_storage_group
    session.set_storage_group("root.ln1")?;
    session.delete_storage_group("root.ln1")?;

    //delete_storage_groups
    session.set_storage_group("root.ln1")?;
    session.set_storage_group("root.ln2")?;
    session.delete_storage_groups(vec!["root.ln1", "root.ln2"])?;

    //if storage group 'root.sg_rs' exist, remove it.
    if opt.clean {
        session
            .delete_storage_group("root.sg_rs")
            .unwrap_or_default();
    }

    //create_timeseries
    {
        session.create_timeseries(
            "root.sg_rs.dev2.status",
            TSDataType::Float,
            TSEncoding::Plain,
            TSCompressionType::SNAPPY,
            Some(BTreeMap::from([
                ("prop1".to_owned(), "1".to_owned()),
                ("prop2".to_owned(), "2".to_owned()),
            ])),
            Some(BTreeMap::from([
                ("attr1".to_owned(), "1".to_owned()),
                ("attr2".to_owned(), "2".to_owned()),
            ])),
            Some(BTreeMap::from([
                ("tag1".to_owned(), "1".to_owned()),
                ("tag2".to_owned(), "2".to_owned()),
            ])),
            Some("stats".to_string()),
        )?;
        session.delete_timeseries(vec!["root.sg_rs.dev2.status"])?;
    }

    //create_multi_timeseries
    {
        session.create_multi_timeseries(
            vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"],
            vec![TSDataType::Float, TSDataType::Text],
            vec![TSEncoding::Plain, TSEncoding::Plain],
            vec![TSCompressionType::SNAPPY, TSCompressionType::SNAPPY],
            None,
            None,
            None,
            None,
        )?;
        session.delete_timeseries(vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"])?;
    }

    //insert_record
    {
        session.insert_record(
            "root.sg_rs.dev5",
            vec!["online", "desc"],
            vec![Value::Bool(false), Value::Text("F4145".to_string())],
            Local::now().timestamp_millis(),
            false,
        )?;
        session.delete_timeseries(vec!["root.sg_rs.dev5.online", "root.sg_rs.dev5.desc"])?;
    }

    //insert_string_record
    {
        session.insert_string_record(
            "root.sg_rs.wf02.wt02",
            vec!["id", "location"],
            vec!["SN:001", "BeiJing"],
            Local::now().timestamp_millis(),
            false,
        )?;
        session.delete_timeseries(vec![
            "root.sg_rs.wf02.wt02.id",
            "root.sg_rs.wf02.wt02.location",
        ])?;
    }

    //insert_records
    {
        session.insert_records(
            vec!["root.sg_rs.dev1"],
            vec![vec![
                "restart_count",
                "tick_count",
                "price",
                "temperature",
                "description",
                "status",
            ]],
            vec![vec![
                Value::Int32(i32::MAX),
                Value::Int64(1639704010752),
                Value::Double(1988.1),
                Value::Float(36.8),
                Value::Text("Test Device 1".to_string()),
                Value::Bool(false),
            ]],
            vec![Local::now().timestamp_millis()],
        )?;
        session.delete_timeseries(vec![
            "root.sg_rs.dev1.restart_count",
            "root.sg_rs.dev1.tick_count",
            "root.sg_rs.dev1.price",
            "root.sg_rs.dev1.temperature",
            "root.sg_rs.dev1.description",
            "root.sg_rs.dev1.status",
        ])?;
    }

    //insert_records_of_one_device
    {
        session.insert_records_of_one_device(
            "root.sg_rs.dev0",
            vec![
                Local::now().timestamp_millis(),
                Local::now().timestamp_millis() - 1,
            ],
            vec![
                vec!["restart_count", "tick_count", "price"],
                vec!["temperature", "description", "status"],
            ],
            vec![
                vec![Value::Int32(1), Value::Int64(2018), Value::Double(1988.1)],
                vec![
                    Value::Float(36.8),
                    Value::Text("thermograph".to_string()),
                    Value::Bool(false),
                ],
            ],
            false,
        )?;
    }

    //tablet
    let mut ts = Local::now().timestamp_millis();
    let mut tablet1 = create_tablet(5, ts);

    ts += 5;
    let mut tablet2 = create_tablet(10, ts);

    ts += 10;
    let mut tablet3 = create_tablet(2, ts);

    //insert_tablet
    tablet1.sort();
    session.insert_tablet(&tablet1)?;

    //insert_tablets
    {
        tablet2.sort();
        tablet3.sort();
        session.insert_tablets(vec![&tablet2, &tablet3])?;
    }

    //delete_data
    session.delete_data(vec!["root.sg_rs.dev1.status"], 1, 16)?;

    //execute_query_statement
    {
        let dataset = session.execute_query_statement("select * from root.sg_rs.device2", None)?;
        // Get columns, column types and values from the dataset
        // For example:
        let width = 18;
        let column_count = dataset.get_column_names().len();
        let print_line_sep =
            || println!("{:=<width$}", '=', width = (width + 1) * column_count + 1);

        print_line_sep();
        dataset
            .get_column_names()
            .iter()
            .for_each(|c| print!("|{:>width$}", c.split('.').last().unwrap(), width = width));
        println!("|");
        print_line_sep();
        dataset.get_data_types().iter().for_each(|t| {
            let type_name = format!("{:?}", t);
            print!("|{:>width$}", type_name, width = width)
        });
        println!("|");
        print_line_sep();
        dataset.for_each(|r| {
            r.values.iter().for_each(|v| match v {
                Value::Bool(v) => print!("|{:>width$}", v, width = width),
                Value::Int32(v) => print!("|{:>width$}", v, width = width),
                Value::Int64(v) => print!("|{:>width$}", v, width = width),
                Value::Float(v) => print!("|{:>width$}", v, width = width),
                Value::Double(v) => print!("|{:>width$}", v, width = width),
                Value::Text(v) => print!("|{:>width$}", v, width = width),
                Value::Null => print!("|{:>width$}", "null", width = width),
            });
            println!("|");
        });
        print_line_sep();
    }

    //execute_statement
    {
        let dataset = session.execute_statement("show timeseries", None)?;
        let mut table = Table::new();
        table.set_titles(Row::new(
            dataset
                .get_column_names()
                .iter()
                .map(|c| cell!(c))
                .collect(),
        ));
        dataset.for_each(|r: RowRecord| {
            table.add_row(Row::new(
                r.values.iter().map(|v: &Value| cell!(v)).collect(),
            ));
        });
        table.printstd();
    }

    //execute_batch_statement
    {
        session.execute_batch_statement(vec![
            "insert into root.sg_rs.dev6(time,s5) values(1,true)",
            "insert into root.sg_rs.dev6(time,s5) values(2,true)",
            "insert into root.sg_rs.dev6(time,s5) values(3,true)",
        ])?;
        session.delete_timeseries(vec!["root.sg_rs.dev6.s5"])?;
    }
    //execute_raw_data_query
    {
        let dataset = session.execute_raw_data_query(
            vec![
                "root.sg_rs.device2.restart_count",
                "root.sg_rs.device2.tick_count",
                "root.sg_rs.device2.description",
            ],
            0,
            i64::MAX,
        )?;
        let mut table = Table::new();
        table.set_titles(Row::new(
            dataset
                .get_column_names()
                .iter()
                .map(|c| cell!(c))
                .collect(),
        ));
        dataset.for_each(|r: RowRecord| {
            table.add_row(Row::new(
                r.values.iter().map(|v: &Value| cell!(v)).collect(),
            ));
        });
        table.printstd();
    }

    //execute_update_statement
    {
        if let Some(dataset) =
            session.execute_update_statement("delete timeseries root.sg_rs.dev0.*")?
        {
            dataset.for_each(|r| println!("timestamp: {} {:?}", r.timestamp, r.values));
        }
    }

    session.close()?;
    Ok(())
}

fn create_tablet(row_count: i32, start_timestamp: i64) -> Tablet {
    let mut tablet = Tablet::new(
        "root.sg_rs.device2",
        vec![
            MeasurementSchema::new(
                String::from("status"),
                TSDataType::Boolean,
                TSEncoding::Plain,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("restart_count"),
                TSDataType::Int32,
                TSEncoding::RLE,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("tick_count"),
                TSDataType::Int64,
                TSEncoding::RLE,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("temperature"),
                TSDataType::Float,
                TSEncoding::Plain,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("price"),
                TSDataType::Double,
                TSEncoding::Gorilla,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("description"),
                TSDataType::Text,
                TSEncoding::Plain,
                TSCompressionType::SNAPPY,
                None,
            ),
        ],
    );
    (0..row_count).for_each(|row| {
        let ts = start_timestamp + row as i64;
        tablet
            .add_row(
                vec![
                    Value::Bool(ts % 2 == 0),
                    Value::Int32(row),
                    Value::Int64(row as i64),
                    Value::Float(row as f32 + 0.1),
                    Value::Double(row as f64 + 0.2),
                    Value::Text(format!("ts: {}", ts)),
                ],
                ts,
            )
            .unwrap_or_else(|err| eprintln!("Add row failed, reason '{}'", err));
    });
    tablet
}