aprimadi/influxdb2

Querying multiple datapoints with the same tag fails

FreyMo opened this issue · 8 comments

Hi,

first of all I would like to express my thanks for this library and I really appreciate your effort. Since there is no official InfluxDB V2 library for Rust out there I'm having trouble finishing a project I'm working on in my free time. This library is like a godsend, however it seems that querying multiple datapoints is not possible. I had a look at your examples and always found that your queries ended with last(). If I remove that and have more than one entry in the Database with the same tag I get the following error:

stack backtrace:
   0: std::panicking::begin_panic
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/std/src/panicking.rs:616:12
   1: <testinflux::StockPrice as influxdb2_structmap::FromMap>::from_genericmap
             at ./src/main.rs:14:17
   2: influxdb2::api::query::<impl influxdb2::Client>::query::{{closure}}
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/influxdb2-0.2.7/src/api/query.rs:120:36
   3: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/core/src/future/mod.rs:91:19
   4: testinflux::main::{{closure}}
             at ./src/main.rs:60:48
   5: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/core/src/future/mod.rs:91:19
   6: tokio::park::thread::CachedParkThread::block_on::{{closure}}
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/park/thread.rs:263:54
   7: tokio::coop::with_budget::{{closure}}
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/coop.rs:102:9
   8: std::thread::local::LocalKey<T>::try_with
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/std/src/thread/local.rs:445:16
   9: std::thread::local::LocalKey<T>::with
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/std/src/thread/local.rs:421:9
  10: tokio::coop::with_budget
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/coop.rs:95:5
  11: tokio::coop::budget
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/coop.rs:72:5
  12: tokio::park::thread::CachedParkThread::block_on
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/park/thread.rs:263:31
  13: tokio::runtime::enter::Enter::block_on
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/runtime/enter.rs:152:13
  14: tokio::runtime::thread_pool::ThreadPool::block_on
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/runtime/thread_pool/mod.rs:90:9
  15: tokio::runtime::Runtime::block_on
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/runtime/mod.rs:484:43
  16: testinflux::main
             at ./src/main.rs:63:5
  17: core::ops::function::FnOnce::call_once
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/core/src/ops/function.rs:248:5

Are you aware of this? Is this intended? Is there another way to query data other than the way described in the examples? I'd really like to use your library but with this it's currently impossible. I'd also be willing to help out if that is desired.

Anyway, thanks in advance. Your work is much appreciated!

@FreyMo Hello, is it possible to attach some source code to reproduce the issue. For example, the data struct, how the datapoint is written would be useful, and the flux query would be useful.

Also, what version of the library are you using and whether it works if you downgrade to version 0.1.1?

Hi @aprimadi, thanks for the quick response!

Reproduce error

  • Run the main function. The first time, it will work and successfully print the output.
  • Run the main function the second time. An error occurs because there is more than one entry for the Tags GOOG and AAPL now

Docker compose file:

services:
  influxdb:
    image: influxdb:2.4-alpine
    environment: # Hardcoded for demonstration purposes
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=influx
      - DOCKER_INFLUXDB_INIT_PASSWORD=influxdbpassword
      - DOCKER_INFLUXDB_INIT_ORG=myorg
      - DOCKER_INFLUXDB_INIT_BUCKET=mybucket
      - DOCKER_INFLUXDB_INIT_RETENTION=1w
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
    ports:
      - "8086:8086"
    volumes:
      - ./influx-data:/var/lib/influxdb2
      - ./influx-config:/etc/influxdb2

Cargo.toml

[package]
name = "testinflux"
version = "0.1.0"
edition = "2021"

[dependencies]
chrono = { version = "0.4.22", features = ["serde"] }
influxdb2 = "0.2.7"
influxdb2-structmap = "0.1.5"
influxdb2-structmap-derive = "0.1.5"
futures = "0.3.23"
tokio = { version = "1.20.1", features = ["full"] }

main.rs

use chrono::{DateTime, FixedOffset, TimeZone, Utc};
use futures::stream;
use influxdb2::models::DataPoint;
use influxdb2::models::Query;
use influxdb2::{Client};
use influxdb2_structmap::FromMap;

#[derive(Debug, influxdb2_structmap_derive::FromMap)]
pub struct StockPrice {
   ticker: String,
   value: f64,
   open: f64,
   time: DateTime<FixedOffset>,
}

impl Default for StockPrice {
    fn default() -> Self {
        let now = Utc::now().naive_utc();
        Self {
            ticker: "".to_owned(),
            value: 0.0,
            open: 0.0,
            time: FixedOffset::east(7 * 3600).from_utc_datetime(&now),
        }
    }
}

// Hardcoded for demonstration purposes
const HOST: &str = "http://localhost:8086";
const ORG: &str = "myorg";
const TOKEN: &str = "my-super-secret-auth-token";
const BUCKET: &str = "mybucket";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new(HOST, ORG, TOKEN);

    println!("HealthCheck: {:#?}", client.health().await?);
    let points: Vec<DataPoint> = vec![ 
        DataPoint::builder("bar")
            .tag("ticker", "AAPL")
            .field("value", 123.46)
            .field("open", 200.0)
            .build()?,
        DataPoint::builder("bar")
            .tag("ticker", "GOOG")
            .field("value", 321.09)
            .field("open", 309.2)
            .build()?,
    ];
    client.write(BUCKET, stream::iter(points)).await?;
    let qs = format!("from(bucket: \"{}\") 
      |> range(start: -1w)", BUCKET); // notice the missing last()
    let query = Query::new(qs.to_string());

    println!(
        "Query result was: {:#?}", 
        client.query::<StockPrice>(Some(query)).await?
    );

    Ok(())
}

This is mainly taken from your examples.

Oh and btw, yes this is reprodducable by using version 0.2.6 as well

Hello @FreyMo ,

I think this is fixed in the new version v0.3.0 which I just released. I haven't tested them yet in my project which I'm currently doing. There are a few API changes, see README for usage description.

Let me know if the new version is working for you so I can close the issue.

Also, thank you for opening the issue. My project is still using the 0.1 version and I wouldn't know there is such a breaking change in version 0.2.

Thanks again for responding so quickly. I will let you know once I tried out if it works in about 10 hours. Again, your work is much appreciated.

Kind regards

Works like a charm, thanks! i will make sure to give you credit once I release my project :) InfluxData should employ you or at least build their Rust client for InfluxDb V2 on your work. Enjoy the rest of the week