How to use tracing with coerce-rs?
kesavkolla opened this issue · 4 comments
kesavkolla commented
How can we setup the tracing crate with the actors? Can we get the proper propagation of spawns to actors?
cameronbraid commented
I need the same functionality and I believe that #17 fixes it
cameronbraid commented
Here's a demo (assuming you are using pr #17 )
use async_trait::async_trait;
use coerce::actor::{
context::ActorContext,
message::{Handler, Message},
system::ActorSystem,
Actor, LocalActorRef,
};
use color_eyre::Result;
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use tracing_subscriber::Layer;
use opentelemetry::{
global::shutdown_tracer_provider,
sdk::{
trace::{self, Sampler},
Resource,
},
};
use opentelemetry::KeyValue;
use opentelemetry_otlp::{Protocol, WithExportConfig};
use tokio::time::sleep;
use tracing::{instrument, span, Instrument};
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
use std::time::Duration;
#[tokio::main(flavor = "multi_thread", worker_threads = 5)]
pub async fn main() -> Result<()> {
// Install a new OpenTelemetry trace pipeline
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317")
.with_protocol(Protocol::Grpc)
.with_timeout(Duration::from_secs(5)),
)
.with_trace_config(
trace::config()
.with_sampler(Sampler::AlwaysOn)
.with_max_events_per_span(2000)
.with_max_attributes_per_span(100)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
"tracing-simple",
)])),
)
.install_batch(opentelemetry::runtime::Tokio)?;
// .install_simple()?;
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let layer = tracing_subscriber::fmt::layer()
.pretty()
.with_span_events(FmtSpan::NONE)
.with_filter(EnvFilter::from_default_env());
let subscriber = tracing_subscriber::registry().with(layer).with(telemetry);
tracing::subscriber::set_global_default(subscriber).expect("unable to set global subscriber");
let system = ActorSystem::builder().system_name("tracing-simple").build();
let a = system.new_tracked_actor(AddActor {}).await?;
let p = system
.new_tracked_actor(PrintActor { add_actor: a })
.await?;
async {
p.send(PrintMessage {
message: "Hi".into(),
})
.await
.unwrap();
}
.instrument(tracing::info_span!("send_message_to_actor"))
.await;
println!("sleeping");
sleep(Duration::from_millis(1000)).await;
println!("Shutting down tracing provider");
shutdown_tracer_provider();
println!("done");
Ok(())
}
struct PrintActor {
add_actor: LocalActorRef<AddActor>,
}
impl Actor for PrintActor {}
#[async_trait]
impl Handler<PrintMessage> for PrintActor {
#[instrument(name = "PrintActor::PrintMessage", skip_all)]
async fn handle(&mut self, msg: PrintMessage, _ctx: &mut ActorContext) {
tracing::info!("print message: {}", msg.message);
println!("print message: {}", msg.message);
sleep(Duration::from_millis(100)).await;
let sum = self.add_actor.send(AddMessage { a: 1, b: 2 }).await.unwrap();
tracing::info!("sum: {}", sum);
sleep(Duration::from_millis(100)).await;
let sum = self.add_actor.send(AddMessage { a: 2, b: 3 }).await.unwrap();
tracing::info!("sum2: {}", sum);
sleep(Duration::from_millis(100)).await;
}
}
struct PrintMessage {
message: String,
}
impl Message for PrintMessage {
type Result = ();
}
struct AddActor {}
impl Actor for AddActor {}
#[async_trait]
impl Handler<AddMessage> for AddActor {
#[instrument(name = "AddActor::AddMessage", skip_all)]
async fn handle(&mut self, msg: AddMessage, _ctx: &mut ActorContext) -> usize {
let result = msg.a + msg.b;
tracing::info!("adding: {} + {} = {}", msg.a, msg.b, result);
result
}
}
struct AddMessage {
a: usize,
b: usize,
}
impl Message for AddMessage {
type Result = usize;
}
and the docker-compose setup :
docker-compose.yaml
# based on https://github.com/grafana/tempo/blob/main/example/docker-compose/local/docker-compose.yaml
version: "3"
services:
tempo:
image: grafana/tempo:latest
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ./shared/tempo.yaml:/etc/tempo.yaml
- ./target/tempo-data:/tmp/tempo
ports:
- "14268:14268" # jaeger ingest
- "3200:3200" # tempo
- "4317:4317" # otlp grpc
- "4318:4318" # otlp http
- "9411:9411" # zipkin
prometheus:
image: prom/prometheus:latest
command:
- --config.file=/etc/prometheus.yaml
- --web.enable-remote-write-receiver
- --enable-feature=exemplar-storage
volumes:
- ./shared/prometheus.yaml:/etc/prometheus.yaml
ports:
- "9090:9090"
grafana:
image: grafana/grafana:9.4.3
volumes:
- ./shared/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
- GF_FEATURE_TOGGLES_ENABLE=traceqlEditor
ports:
- "3001:3000"
and other config files 👍
grafana-datasource.yam
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
uid: prometheus
access: proxy
orgId: 1
url: http://prometheus:9090
basicAuth: false
isDefault: false
version: 1
editable: false
jsonData:
httpMethod: GET
- name: Tempo
type: tempo
access: proxy
orgId: 1
url: http://tempo:3200
basicAuth: false
isDefault: true
version: 1
editable: false
apiVersion: 1
uid: tempo
jsonData:
httpMethod: GET
serviceMap:
datasourceUid: prometheus
prometheus.yaml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: [ 'localhost:9090' ]
- job_name: 'tempo'
static_configs:
- targets: [ 'tempo:3200' ]
tempo.yaml
server:
http_listen_port: 3200
grpc_server_max_recv_msg_size: 100000000
grpc_server_max_send_msg_size: 100000000
querier:
frontend_worker:
grpc_client_config:
max_recv_msg_size: 100000000
max_send_msg_size: 100000000
distributor:
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
thrift_http: #
grpc: # for a production deployment you should only enable the receivers you need!
thrift_binary:
thrift_compact:
zipkin:
otlp:
protocols:
http:
grpc:
max_recv_msg_size_mib: 100
opencensus:
ingester:
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally
compactor:
compaction:
block_retention: 1h # overall Tempo trace retention. set for demo purposes
metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /tmp/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
storage:
trace:
backend: local # backend configuration to use
wal:
path: /tmp/tempo/wal # where to store the the wal locally
local:
path: /tmp/tempo/blocks
overrides:
metrics_generator_processors: [service-graphs, span-metrics] # enables metrics generator
max_bytes_per_trace: 500000000 # 500mb (default is 5mb)
kesavkolla commented
I hope these PRs get merged.
LeonHartley commented
This has been merged :) Cheers guys