LeonHartley/Coerce-rs

How to use tracing with coerce-rs?

kesavkolla opened this issue · 4 comments

How can we setup the tracing crate with the actors? Can we get the proper propagation of spawns to actors?

I need the same functionality and I believe that #17 fixes it

Here's a demo (assuming you are using pr #17 )

image

use async_trait::async_trait;
use coerce::actor::{
    context::ActorContext,
    message::{Handler, Message},
    system::ActorSystem,
    Actor, LocalActorRef,
};
use color_eyre::Result;
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use tracing_subscriber::Layer;

use opentelemetry::{
    global::shutdown_tracer_provider,
    sdk::{
        trace::{self, Sampler},
        Resource,
    },
};
use opentelemetry::KeyValue;
use opentelemetry_otlp::{Protocol, WithExportConfig};
use tokio::time::sleep;
use tracing::{instrument, span, Instrument};
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};

use std::time::Duration;

#[tokio::main(flavor = "multi_thread", worker_threads = 5)]
pub async fn main() -> Result<()> {
    // Install a new OpenTelemetry trace pipeline

    let tracer = opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(
            opentelemetry_otlp::new_exporter()
                .tonic()
                .with_endpoint("http://localhost:4317")
                .with_protocol(Protocol::Grpc)
                .with_timeout(Duration::from_secs(5)),
        )
        .with_trace_config(
            trace::config()
                .with_sampler(Sampler::AlwaysOn)
                .with_max_events_per_span(2000)
                .with_max_attributes_per_span(100)
                .with_resource(Resource::new(vec![KeyValue::new(
                    "service.name",
                    "tracing-simple",
                )])),
        )
        .install_batch(opentelemetry::runtime::Tokio)?;
        // .install_simple()?;

    let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
    let layer = tracing_subscriber::fmt::layer()
        .pretty()
        .with_span_events(FmtSpan::NONE)
        .with_filter(EnvFilter::from_default_env());

    let subscriber = tracing_subscriber::registry().with(layer).with(telemetry);
    tracing::subscriber::set_global_default(subscriber).expect("unable to set global subscriber");

    let system = ActorSystem::builder().system_name("tracing-simple").build();

    let a = system.new_tracked_actor(AddActor {}).await?;
    let p = system
        .new_tracked_actor(PrintActor { add_actor: a })
        .await?;

    async {
        p.send(PrintMessage {
            message: "Hi".into(),
        })
        .await
        .unwrap();
    }
    .instrument(tracing::info_span!("send_message_to_actor"))
    .await;

    println!("sleeping");
    sleep(Duration::from_millis(1000)).await;
    println!("Shutting down tracing provider");
    shutdown_tracer_provider();

    println!("done");

    Ok(())
}

struct PrintActor {
    add_actor: LocalActorRef<AddActor>,
}
impl Actor for PrintActor {}

#[async_trait]
impl Handler<PrintMessage> for PrintActor {
    #[instrument(name = "PrintActor::PrintMessage", skip_all)]
    async fn handle(&mut self, msg: PrintMessage, _ctx: &mut ActorContext) {
        tracing::info!("print message: {}", msg.message);
        println!("print message: {}", msg.message);
        
        sleep(Duration::from_millis(100)).await;

        let sum = self.add_actor.send(AddMessage { a: 1, b: 2 }).await.unwrap();
        tracing::info!("sum: {}", sum);
        sleep(Duration::from_millis(100)).await;

        let sum = self.add_actor.send(AddMessage { a: 2, b: 3 }).await.unwrap();
        tracing::info!("sum2: {}", sum);
        sleep(Duration::from_millis(100)).await;
    }
}
struct PrintMessage {
    message: String,
}
impl Message for PrintMessage {
    type Result = ();
}

struct AddActor {}
impl Actor for AddActor {}

#[async_trait]
impl Handler<AddMessage> for AddActor {
    #[instrument(name = "AddActor::AddMessage", skip_all)]
    async fn handle(&mut self, msg: AddMessage, _ctx: &mut ActorContext) -> usize {
        let result = msg.a + msg.b;
        tracing::info!("adding: {} + {} = {}", msg.a, msg.b, result);
        result
    }
}
struct AddMessage {
    a: usize,
    b: usize,
}
impl Message for AddMessage {
    type Result = usize;
}

and the docker-compose setup :

docker-compose.yaml

# based on https://github.com/grafana/tempo/blob/main/example/docker-compose/local/docker-compose.yaml

version: "3"
services:

  tempo:
    image: grafana/tempo:latest
    command: [ "-config.file=/etc/tempo.yaml" ]
    volumes:
      - ./shared/tempo.yaml:/etc/tempo.yaml
      - ./target/tempo-data:/tmp/tempo
    ports:
      - "14268:14268"  # jaeger ingest
      - "3200:3200"   # tempo
      - "4317:4317"  # otlp grpc
      - "4318:4318"  # otlp http
      - "9411:9411"   # zipkin

  prometheus:
    image: prom/prometheus:latest
    command:
      - --config.file=/etc/prometheus.yaml
      - --web.enable-remote-write-receiver
      - --enable-feature=exemplar-storage
    volumes:
      - ./shared/prometheus.yaml:/etc/prometheus.yaml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana:9.4.3
    volumes:
      - ./shared/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
    environment:
      - GF_AUTH_ANONYMOUS_ENABLED=true
      - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
      - GF_AUTH_DISABLE_LOGIN_FORM=true
      - GF_FEATURE_TOGGLES_ENABLE=traceqlEditor
    ports:
      - "3001:3000"

and other config files 👍

grafana-datasource.yam

apiVersion: 1

datasources:
- name: Prometheus
  type: prometheus
  uid: prometheus
  access: proxy
  orgId: 1
  url: http://prometheus:9090
  basicAuth: false
  isDefault: false
  version: 1
  editable: false
  jsonData:
    httpMethod: GET
- name: Tempo
  type: tempo
  access: proxy
  orgId: 1
  url: http://tempo:3200
  basicAuth: false
  isDefault: true
  version: 1
  editable: false
  apiVersion: 1
  uid: tempo
  jsonData:
    httpMethod: GET
    serviceMap:
      datasourceUid: prometheus

prometheus.yaml

global:
  scrape_interval:     15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
      - targets: [ 'localhost:9090' ]
  - job_name: 'tempo'
    static_configs:
      - targets: [ 'tempo:3200' ]

tempo.yaml

server:
  http_listen_port: 3200
  grpc_server_max_recv_msg_size: 100000000
  grpc_server_max_send_msg_size: 100000000

querier:
  frontend_worker:
    grpc_client_config:
      max_recv_msg_size: 100000000
      max_send_msg_size: 100000000

distributor:
  receivers:                           # this configuration will listen on all ports and protocols that tempo is capable of.
    jaeger:                            # the receives all come from the OpenTelemetry collector.  more configuration information can
      protocols:                       # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
        thrift_http:                   #
        grpc:                          # for a production deployment you should only enable the receivers you need!
        thrift_binary:
        thrift_compact:
    zipkin:
    otlp:
      protocols:
        http:
        grpc:
          max_recv_msg_size_mib: 100

    opencensus:

ingester:
  max_block_duration: 5m               # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally

compactor:
  compaction:
    block_retention: 1h                # overall Tempo trace retention. set for demo purposes

metrics_generator:
  registry:
    external_labels:
      source: tempo
      cluster: docker-compose
  storage:
    path: /tmp/tempo/generator/wal
    remote_write:
      - url: http://prometheus:9090/api/v1/write
        send_exemplars: true

storage:
  trace:
    backend: local                     # backend configuration to use
    wal:
      path: /tmp/tempo/wal             # where to store the the wal locally
    local:
      path: /tmp/tempo/blocks

overrides:
  metrics_generator_processors: [service-graphs, span-metrics] # enables metrics generator
  max_bytes_per_trace: 500000000 # 500mb (default is 5mb)

I hope these PRs get merged.

This has been merged :) Cheers guys