Error running simple asyncio example
akzincsystems opened this issue · 6 comments
Requirement - what kind of business use case are you trying to solve?
Running jaeger tracing with asyncio. Intention is to run functions in AWS Lambda.
Problem - what in Jaeger blocks you from solving the requirement?
Error gets thrown:
Traceback (most recent call last):
File "/home/xxx/PycharmProjects/JaegerTest/main.py", line 55, in
tracer.close()
File "/home/xxx/PycharmProjects/JaegerTest/venv/lib/python3.8/site-packages/jaeger_client/tracer.py", line 284, in close
return self.reporter.close()
File "/home/xxx/PycharmProjects/JaegerTest/venv/lib/python3.8/site-packages/jaeger_client/reporter.py", line 260, in close
future = Future()
File "/usr/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'MainThread'.
Proposal - what do you suggest to solve the problem or improve the existing situation?
No suggestion. Help sought
Code being run is a simple example.
#!/usr/bin/env python3
import asyncio
import logging
import time
from jaeger_client import Config
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
# jaeger tracer
tracer = None
def init_jaeger_tracer(serviceName='jeagertest'):
config = Config(
config={ # usually read from some yaml config
'sampler': {
'type': 'const',
'param': 1,
},
'local_agent': {
'reporting_host': 'localhost',
'reporting_port': '6831',
},
'logging': True,
},
service_name=serviceName,
validate=True,
scope_manager=ContextVarsScopeManager()
)
return config.initialize_tracer()
async def count():
print("One")
await asyncio.sleep(1)
print("Two")
async def main():
await asyncio.gather(count(), count(), count())
def init_logging():
log_level = logging.DEBUG
logging.getLogger('').handlers = []
logging.basicConfig(format='%(asctime)s %(message)s', level=log_level)
if __name__ == "__main__":
init_logging()
tracer = init_jaeger_tracer()
with tracer.start_span('Root') as span:
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
time.sleep(2)
tracer.close()
I've searched everywhere for a possible solution but none found. Any suggestions welcome.
cc @obataku not sure if it's related
thanks for pinging @yurishkuro; this doesn't appear to be directly related to my change in #294 fortunately
@akzincsystems I believe the problem here is that asyncio.run
will explicitly close the event loop after completion of main()
whereas tracer.close()
ultimately tries to initialize a new Future
here:
... and thus requires that there be an event loop associated with the current thread. you can fix this by not closing your thread's event loop until after you've called tracer.close()
:
if __name__ == "__main__":
init_logging()
tracer = init_jaeger_tracer()
loop = asyncio.get_event_loop()
with tracer.start_span('Root') as span:
s = time.perf_counter()
loop.run_until_complete(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
time.sleep(2)
tracer.close()
loop.close()
alternatively, consider encapsulating all of the above logic in a coroutine that we can pass at the top level to asyncio.run
:
async def run_traced():
tracer = init_jaeger_tracer()
with tracer.start_span('Root') as span:
s = time.perf_counter()
await main()
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
await asyncio.sleep(2) # ensure pending spans are flushed
tracer.close()
if __name__ == "__main__":
init_logging()
asyncio.run(run_traced())
as an aside: currently, the tracer will spawn a separate Tornado I/O loop (in 5.0+, backed by an asyncio
event loop) in a separate background thread to run its tracing coroutines; you can also configure the client to use the existing thread & event loop for tracing by passing IOLoop.current()
to initialize_tracer()
:
from tornado.ioloop import IOLoop
def init_jaeger_tracer(serviceName='jeagertest'):
...
return config.initialize_tracer(io_loop=IOLoop.current())
...
async def run_traced():
...
await asyncio.sleep(2) # ensure pending spans are flushed
await tracer.close() # we can now safely wait for tracer to close, too
if __name__ == "__main__":
init_logging()
asyncio.run(run_traced())
note this also lets us safely await
the future returned by tracer.close()
as it'll be updated & used in our single thread.
@yurishkuro: when the change I added to #294 lands in a versioned release, one hopefully won't need to sleep for a period to ensure pending spans are flushed before calling close()
anymore (so long as they await
it)
hi @akzincsystems, any luck with the above suggestions?
@obataku Sorry, chap - that didn't work. Late in the night here - will revisit
@obataku Got to it.
- Don't think asyncio.sleep() does anything
- await tracer.close() seems to work
here's refactored code (added prometheus in - not a factor)
#!/usr/bin/env python3
import asyncio
import logging
import time
from jaeger_client import Config
from jaeger_client.metrics.prometheus import PrometheusMetricsFactory
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
# jaeger tracer
tracer = None
def init_jaeger_tracer(serviceName='jeagertest'):
config = Config(
config={ # usually read from some yaml config
'sampler': {
'type': 'const',
'param': 1,
},
'local_agent': {
'reporting_host': '172.16.238.160',
'reporting_port': '5775',
},
'logging': True,
},
service_name=serviceName,
validate=True,
scope_manager=ContextVarsScopeManager(),
metrics_factory=PrometheusMetricsFactory(namespace=serviceName)
)
return config.initialize_tracer()
async def count():
print("One")
await asyncio.sleep(1)
print("Two")
async def main():
await asyncio.gather(count(), count(), count())
def init_logging():
log_level = logging.DEBUG
logging.getLogger('').handlers = []
logging.basicConfig(format='%(asctime)s %(message)s', level=log_level)
async def run_main():
with tracer.start_span('Root') as span:
s = time.perf_counter()
await main()
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
# await asyncio.sleep(2) # ensure pending spans are flushed
await tracer.close()
if __name__ == "__main__":
init_logging()
tracer = init_jaeger_tracer()
asyncio.run(run_main())
happy to hear it's working for you, @akzincsystems
unfortunately a minor sleep
before close
-ing is currently necessary if you want to better ensure that your Root
span you just finished gets a chance to be reported (at least until #294 lands in a release). if some unreliability there isn't a big problem for you, though, feel free to not sleep
--just keep in mind you may wind up dropping recently finished spans!