jaegertracing/jaeger-client-python

Error running simple asyncio example

akzincsystems opened this issue · 6 comments

Requirement - what kind of business use case are you trying to solve?

Running jaeger tracing with asyncio. Intention is to run functions in AWS Lambda.

Problem - what in Jaeger blocks you from solving the requirement?

Error gets thrown:
Traceback (most recent call last):
File "/home/xxx/PycharmProjects/JaegerTest/main.py", line 55, in
tracer.close()
File "/home/xxx/PycharmProjects/JaegerTest/venv/lib/python3.8/site-packages/jaeger_client/tracer.py", line 284, in close
return self.reporter.close()
File "/home/xxx/PycharmProjects/JaegerTest/venv/lib/python3.8/site-packages/jaeger_client/reporter.py", line 260, in close
future = Future()
File "/usr/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'MainThread'.

Proposal - what do you suggest to solve the problem or improve the existing situation?

No suggestion. Help sought

Code being run is a simple example.

#!/usr/bin/env python3

import asyncio
import logging
import time
from jaeger_client import Config
from opentracing.scope_managers.contextvars import ContextVarsScopeManager

# jaeger tracer
tracer = None

def init_jaeger_tracer(serviceName='jeagertest'):
    config = Config(
        config={  # usually read from some yaml config
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'local_agent': {
                'reporting_host': 'localhost',
                'reporting_port': '6831',
            },
            'logging': True,
        },
        service_name=serviceName,
        validate=True,
        scope_manager=ContextVarsScopeManager()
    )
    return config.initialize_tracer()

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())

def init_logging():
    log_level = logging.DEBUG
    logging.getLogger('').handlers = []
    logging.basicConfig(format='%(asctime)s %(message)s', level=log_level)

if __name__ == "__main__":
    init_logging()
    tracer = init_jaeger_tracer()

    with tracer.start_span('Root') as span:
        s = time.perf_counter()
        asyncio.run(main())
        elapsed = time.perf_counter() - s
        print(f"{__file__} executed in {elapsed:0.2f} seconds.")

    time.sleep(2)
    tracer.close()

I've searched everywhere for a possible solution but none found. Any suggestions welcome.

cc @obataku not sure if it's related

thanks for pinging @yurishkuro; this doesn't appear to be directly related to my change in #294 fortunately

@akzincsystems I believe the problem here is that asyncio.run will explicitly close the event loop after completion of main() whereas tracer.close() ultimately tries to initialize a new Future here:

... and thus requires that there be an event loop associated with the current thread. you can fix this by not closing your thread's event loop until after you've called tracer.close():

if __name__ == "__main__":
    init_logging()
    tracer = init_jaeger_tracer()

    loop = asyncio.get_event_loop()

    with tracer.start_span('Root') as span:
        s = time.perf_counter()
        loop.run_until_complete(main())
        elapsed = time.perf_counter() - s
        print(f"{__file__} executed in {elapsed:0.2f} seconds.")

    time.sleep(2)
    tracer.close()

    loop.close()

alternatively, consider encapsulating all of the above logic in a coroutine that we can pass at the top level to asyncio.run:

async def run_traced():
    tracer = init_jaeger_tracer()

    with tracer.start_span('Root') as span:
        s = time.perf_counter()
        await main()
        elapsed = time.perf_counter() - s
        print(f"{__file__} executed in {elapsed:0.2f} seconds.")

    await asyncio.sleep(2)  # ensure pending spans are flushed
    tracer.close()

if __name__ == "__main__":
    init_logging()
    asyncio.run(run_traced())

as an aside: currently, the tracer will spawn a separate Tornado I/O loop (in 5.0+, backed by an asyncio event loop) in a separate background thread to run its tracing coroutines; you can also configure the client to use the existing thread & event loop for tracing by passing IOLoop.current() to initialize_tracer():

from tornado.ioloop import IOLoop

def init_jaeger_tracer(serviceName='jeagertest'):
    ...
    return config.initialize_tracer(io_loop=IOLoop.current())

...

async def run_traced():
    ...
    await asyncio.sleep(2)  # ensure pending spans are flushed
    await tracer.close()  # we can now safely wait for tracer to close, too

if __name__ == "__main__":
    init_logging()
    asyncio.run(run_traced())

note this also lets us safely await the future returned by tracer.close() as it'll be updated & used in our single thread.

@yurishkuro: when the change I added to #294 lands in a versioned release, one hopefully won't need to sleep for a period to ensure pending spans are flushed before calling close() anymore (so long as they await it)

hi @akzincsystems, any luck with the above suggestions?

@obataku Sorry, chap - that didn't work. Late in the night here - will revisit

@obataku Got to it.

  • Don't think asyncio.sleep() does anything
  • await tracer.close() seems to work

here's refactored code (added prometheus in - not a factor)

#!/usr/bin/env python3

import asyncio
import logging
import time
from jaeger_client import Config
from jaeger_client.metrics.prometheus import PrometheusMetricsFactory
from opentracing.scope_managers.contextvars import ContextVarsScopeManager

# jaeger tracer
tracer = None

def init_jaeger_tracer(serviceName='jeagertest'):
    config = Config(
        config={  # usually read from some yaml config
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'local_agent': {
                'reporting_host': '172.16.238.160',
                'reporting_port': '5775',
            },
            'logging': True,
        },
        service_name=serviceName,
        validate=True,
        scope_manager=ContextVarsScopeManager(),
        metrics_factory=PrometheusMetricsFactory(namespace=serviceName)
    )
    return config.initialize_tracer()

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())

def init_logging():
    log_level = logging.DEBUG
    logging.getLogger('').handlers = []
    logging.basicConfig(format='%(asctime)s %(message)s', level=log_level)

async def run_main():
    with tracer.start_span('Root') as span:
        s = time.perf_counter()
        await main()
        elapsed = time.perf_counter() - s
        print(f"{__file__} executed in {elapsed:0.2f} seconds.")

    # await asyncio.sleep(2)  # ensure pending spans are flushed
    await tracer.close()

if __name__ == "__main__":
    init_logging()
    tracer = init_jaeger_tracer()
    asyncio.run(run_main())

happy to hear it's working for you, @akzincsystems

unfortunately a minor sleep before close-ing is currently necessary if you want to better ensure that your Root span you just finished gets a chance to be reported (at least until #294 lands in a release). if some unreliability there isn't a big problem for you, though, feel free to not sleep--just keep in mind you may wind up dropping recently finished spans!