fails to publish if client is cloned
Closed this issue · 4 comments
Observed behavior
I have a function like so
pub async fn get_jetstream() -> Result<(Client, Stream)> {
let client = connect_with_options(
env::var("NATS_URL").unwrap_or("localhost:30042".to_string()),
ConnectOptions::new().name(env::var("SERVICE_NAME").unwrap_or("worker".to_string())),
)
.await?;
client.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
let ctx = jetstream::new(client.clone());
let cfg = Config {
name: "tasks".to_string(),
subjects: vec!["tasks.>".to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
};
Ok((client, ctx.get_or_create_stream(cfg).await?))
}
the example publish line here works as expected
Expected behavior
I'm initializing jetstream with a clone of this client object...
jetstream behaves as expected...
but then, when I try publishing with the returned client
#[traced_test]
#[tokio::test]
async fn test_produce() -> Result<()>{
let (nc, _) = get_jetstream().await?;
nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
Ok(())
}
I get no errors, but no message gets published..
Here's a screenshots illustrating nats s subjects
and advisory events
Server and client version
server: 2.9.20
client: async-nats = "0.37.0"
Host environment
nats running as docker containers
Steps to reproduce
util
pub async fn get_jetstream() -> Result<(Client, Stream)> {
let client = connect_with_options(
env::var("NATS_URL").unwrap_or("localhost:30042".to_string()),
ConnectOptions::new().name(env::var("SERVICE_NAME").unwrap_or("worker".to_string())),
)
.await?;
client.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
let ctx = jetstream::new(client.clone());
let cfg = Config {
name: "tasks".to_string(),
subjects: vec!["tasks.>".to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
};
Ok((client, ctx.get_or_create_stream(cfg).await?))
}
test
#[traced_test]
#[tokio::test]
async fn test_produce() -> Result<()>{
let (nc, _) = get_jetstream().await?;
nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
Ok(())
}
note: there's probably something I'm doing incorrectly...
same behavior when I create a second instance as well. nothing to do with clone
pub async fn get_jetstream() -> Result<(Client, Stream)> {
let client = connect_with_options(
env::var("NATS_URL").unwrap_or("localhost:30042".to_string()),
ConnectOptions::new().name(env::var("SERVICE_NAME").unwrap_or("worker".to_string())),
)
.await?;
client.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
let ctx = jetstream::new(client);
let cfg = Config {
name: "tasks".to_string(),
subjects: vec!["tasks.>".to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
};
let nc = connect_with_options(
env::var("NATS_URL").unwrap_or("localhost:30042".to_string()),
ConnectOptions::new().name(env::var("SERVICE_NAME").unwrap_or("worker".to_string())),
)
.await?; //because of nats-rs bug: https://github.com/nats-io/nats.rs/issues/1336
Ok((nc, ctx.get_or_create_stream(cfg).await?))
}
Updated the test to directly create a client
#[traced_test]
#[tokio::test]
async fn test_produce() -> Result<()> {
//let (nc, _) = get_jetstream().await?;
let nc = connect_with_options(
settings.nats_url.clone(),
ConnectOptions::new().name(settings.service_name.clone()),
)
.await?;
nc.publish(
"tasks.one",
serde_json::to_vec(&json!({"msg": "hey"}))?.into(),
)
.await?;
nc.publish(
"tasks.one",
serde_json::to_vec(&json!({"msg": "hey"}))?.into(),
)
.await?;
nc.publish(
"tasks.one",
serde_json::to_vec(&json!({"msg": "hey"}))?.into(),
)
.await?;
Ok(())
}
still not producing properly, can someone please point out what could be amiss?
You need to flush()
before the end of the test/app to ensure that everything in the buffer is sent through the socket before everything is dropped.
yep, thanks
adding nc.flush().await?
in the end worked