Delgan/loguru

Passing loguru function to multiprocessing

TinyTheBrontosaurus opened this issue ยท 8 comments

When passing a logger.info into a multiprocessing.Pool, like as following:

    with multiprocessing.Pool(1) as pool:
        for my_id in range(5):
            pool.apply_async(do_something, (my_id, logger.info))
        pool.close()
        pool.join()

the following exception occurs:

Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/home/enelson/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/enelson/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/enelson/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/enelson/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/queues.py", line 354, in get
    return _ForkingPickler.loads(res)
AttributeError: 'Logger' object has no attribute 'log_function'

A fully reproducible test case is here

This is using Python 3.7.3 via pyenv in Ubuntu 18.04

Wow, thanks a lot for such a good minimal reproducible example!

I will investigate this and give an update on what is causing this weird bug. Maybe is it related to my usage of @staticfunction and __func__().

I don't know what is happening, but it seems defining a function in the main scope that wraps the call to logger.info works in this case.

def log(message):
    logger.info(message)
with multiprocessing.Pool(1) as pool:
        for my_id in range(5):
            pool.apply_async(do_something, (my_id, log))
        pool.close()
        pool.join()

Using a lambda or defining it in a local scope doesn't work

Thanks for this interesting workaround, @AnesBenmerzoug.

I suppose the multiprocessing module need to somehow "serialize" the objects passed to apply_async(). It may have trouble doing this with logger.info() for some reason, but this is purely speculative guess for now.

I managed to reduce the test case to just this:

import pickle
from loguru import logger

pickle.loads(pickle.dumps(logger.info))

or this (to mimic more closely what is happening inside apply_async):

from multiprocessing import SimpleQueue
from loguru import logger

q = SimpleQueue()
q.get(q.put(logger.info))

It seems apply_async puts the given arguments pickled by a ForkingPickler in a multiprocessing.SimpleQueue object.

import pickle


class Test:
    def log(self, *args, **kwargs):
        print("oups wrong method...")

    @staticmethod
    def _make_function1():
        def log(_self, x):
            print("local function 1: " + str(x))
        return log

    @staticmethod
    def _make_function2():
        def log_function(_self, x):
            print("local function 2: " + str(x))
        return log_function

    info1 = _make_function1.__func__()
    info2 = _make_function2.__func__()


if __name__ == "__main__":
    test = Test()
    # Method info1 works
    test.info1("test1 before")
    func = pickle.loads(pickle.dumps(test.info1))
    func("test1 after")
    # Method info2 fails
    test.info2("test2 before")
    func = pickle.loads(pickle.dumps(test.info2))
    func("test2 after")

It seems that by defining a normal method that has the same name as the local function defined in _make_log_function even if it does nothing allows us to pickle the other methods.

Admittedly this would feel more like a hack than an actual solution.

EDIT:
and here's the corresponding output

local function 1: test1 before
oups wrong method...
local function 2: test2 before
Traceback (most recent call last):
  File "test.py", line 33, in <module>
    func = pickle.loads(pickle.dumps(test.info2))
AttributeError: 'Test' object has no attribute 'log_function'

Again, thanks a lot for helping me understanding the issue, @AnesBenmerzoug! ๐Ÿ‘

Python seems to have problem pickling closure functions inside others functions. Probably related question on SO: Python: pickling nested functions.

The problem with adding a no-op function with the same name is that it makes pickle serializing the wrong function. func = pickle.loads(pickle.dumps(test.info1)) and then func("Some message") produces nothing, as it uses the dummy log method, so this does not really solve the issue.

I guess I will need to refactor the _make_log_function() function to get rid of the pickle limitations.

@Delgan you're welcome.
I started using loguru and thought I could help and right at that moment I found this issue.
I forgot to get the return value from pickle.loads, thanks for noticing that. I updated the example in my previous comment.

Ok, I fixed this by refactoring the logger and removing closure functions generated by _make_log_function().

The fix will be available in the next v0.3.0 release, thanks to both of you!