ChatGPT解决这个技术问题 Extra ChatGPT

How should I log while using multiprocessing in Python?

Right now I have a central module in a framework that spawns multiple processes using the Python 2.6 multiprocessing module. Because it uses multiprocessing, there is module-level multiprocessing-aware log, LOG = multiprocessing.get_logger(). Per the docs, this logger (EDIT) does not have process-shared locks so that you don't garble things up in sys.stderr (or whatever filehandle) by having multiple processes writing to it simultaneously.

The issue I have now is that the other modules in the framework are not multiprocessing-aware. The way I see it, I need to make all dependencies on this central module use multiprocessing-aware logging. That's annoying within the framework, let alone for all clients of the framework. Are there alternatives I'm not thinking of?

The docs you link to, state the exact opposite of what you say, the logger has no process shared locks and things get mixed up - a problem I had as well.
see examples in the stdlib docs: Logging to a single file from multiple processes. The recipes doesn't require other modules to be multiprocessing-aware.
So, what is the use case for multiprocessing.get_logger()? It seems based on these other ways of doing logging are the logging functionality in multiprocessing of little value.
get_logger() is the logger used by multiprocessing module itself. It is useful if you want to debug a multiprocessing issue.

z
zzzeek

I just now wrote a log handler of my own that just feeds everything to the parent process via a pipe. I've only been testing it for ten minutes but it seems to work pretty well.

(Note: This is hardcoded to RotatingFileHandler, which is my own use case.)

Update: @javier now maintains this approach as a package available on Pypi - see multiprocessing-logging on Pypi, github at https://github.com/jruere/multiprocessing-logging

Update: Implementation!

This now uses a queue for correct handling of concurrency, and also recovers from errors correctly. I've now been using this in production for several months, and the current version below works without issue.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)

The above handler does all the file writing from the parent process and uses just one thread to receive messages passed from child processes. If you invoke the handler itself from a spawned child process then that's using it incorrectly, and you'll get all the same issues as RotatingFileHandler. I've used the above code for years with no issue.
Unfortunately this approach doesn't work on Windows. From docs.python.org/library/multiprocessing.html 16.6.2.12 "Note that on Windows child processes will only inherit the level of the parent process’s logger – any other customization of the logger will not be inherited." Subprocesses won't inherit the handler, and you can't pass it explicitly because it's not pickleable.
It worth noting that multiprocessing.Queue uses a thread to in put(). So do not invoke put (i.e. log a msg using MultiProcessingLog handler) before creating all subprocesses. Otherwise the thread will be dead in child process. One solution is to call Queue._after_fork() at the start of each child process, or to use multiprocessing.queues.SimpleQueue instead, which does not involve thread but is blocking.
Could you add a simple example that shows initialization, as well as usage from a hypothetical child process? I'm not quite sure how the child process is supposed to get access to the queue without instantiating another instance of your class.
@zzzeek, this solution is good but I couldn't find a package with it or something similar so I created one called multiprocessing-logging.
Z
Zearin

The only way to deal with this non-intrusively is to:

Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped. Your controller process can then do one of the following: If using disk files: Coalesce the log files at the end of the run, sorted by timestamp If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes' file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)


Nice, that was 35s before I thought of that (thought I'd use atexit :-). Problem is that it won't give you a realtime readout. This may be part of the price of multiprocessing as opposed to multithreading.
@cdleary, using the piped approach it would be as near-realtime as one can get (especially if stderr is not buffered in the spawned processes.)
Incidentally, big assumption here: not Windows. Are you on Windows?
@BrandonRhodes - As I said, non-intrusively. Using multiprocessing.Queue will not be simpler if there is a lot of code to rewire to use multiprocessing.Queue, and/or if performance is an issue
@schlamar you may want to re-read the OP before commenting; I do not assume a log stream, rather the OP clearly states that the legacy code already writes to a stream (stderr) and that he still expects aggregate logging to go to a stream, albeit with some degree of line-level atomicity (non-garbled). Do you now see why this method is non-intrusive for the OP? As for the benchmark, the number of pipes is not relevant; the gains come from buffering reducing the actual number of system calls (and impact on client performance) in exchange for extra latency in the aggregator proc.
f
fantabolous

QueueHandler is native in Python 3.2+, and does exactly this. It is easily replicated in previous versions.

Python docs have two complete examples: Logging to a single file from multiple processes

For those using Python < 3.2, just copy QueueHandler into your own code from: https://gist.github.com/vsajip/591589 or alternatively import logutils.

Each process (including the parent process) puts its logging on the Queue, and then a listener thread or process (one example is provided for each) picks those up and writes them all to a file - no risk of corruption or garbling.


this should be the accepted answer at least from the advent of QueueHandler. It is not intrusive, transparent, and works regardless what logger configuration(s) the main process is using. The workers are always logging to the their configured QueueHandler Also does not expect any kind of logger configuration from the parent to the spawn child process
u
user2133814

Below is another solution with a focus on simplicity for anyone else (like me) who get here from Google. Logging should be easy! Only for 3.2 or higher.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()

The QueueHandler and QueueListener classes can be used on Python 2.7 as well, available in the logutils package.
The logger of the main process should also use a QueueHandler. In your current code, the main process is bypassing the queue so there can be race conditions between the main process and workers ones. Everyone should log to the queue (via a QueueHandler) and only the QueueListener should be allowed to log to the StreamHandler.
Also, you don't have to initial the logger in each child. Just initial the logger in the parent process, and get the logger in each child process.
Z
Zearin

Yet another alternative might be the various non-file-based logging handlers in the logging package:

SocketHandler

DatagramHandler

SyslogHandler

(and others)

This way, you could easily have a logging daemon somewhere that you could write to safely and would handle the results correctly. (E.g., a simple socket server that just unpickles the message and emits it to its own rotating file handler.)

The SyslogHandler would take care of this for you, too. Of course, you could use your own instance of syslog, not the system one.


I
Iopheam

As of 2020 it seems there is a simpler way of logging with multiprocessing.

This function will create the logger. You can set the format here and where you want your output to go (file, stdout):

def create_logger():
    import multiprocessing, logging
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(\
        '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
    handler = logging.FileHandler('logs/your_file_name.log')
    handler.setFormatter(formatter)

    # this bit will make sure you won't have 
    # duplicated messages in the output
    if not len(logger.handlers): 
        logger.addHandler(handler)
    return logger

In the init you instantiate the logger:

if __name__ == '__main__': 
    from multiprocessing import Pool
    logger = create_logger()
    logger.info('Starting pooling')
    p = Pool()
    # rest of the code

Now, you only need to add this reference in each function where you need logging:

logger = create_logger()

And output messages:

logger.info(f'My message from {something}')

Hope this helps.


This seems like the most straightforward solution now. Note that the "if not len(logger.handlers)" part is assuming you will use a single handler. If you want to have more than one handler to, e.g., send all messages to a file but only INFO and above to stdout, then you'll need to adjust that part.
Normally you have vast amounts of code that just does import logging and then uses things like 'logging.info("whatever")" - there's no place you can pass a logger object to anything, and there's no chance you can retrofit that code.
This works but it's not very flexible. For example, once you put create_logger() into all your functions, there's no way to turn off logging in case someone else wants to use your library with their own application. Best practice for libraries is to never force anyone to see the log messages.
@JamesMoore I haven't tried using logging.info(..) with multiprocessing. If this works, I'm happy to update the answer.
@medley56 Agree. Perhaps, this code isn't intended to work in a library. I used it when I needed to debug a multiprocessing scrapper for an ad-hoc task.
i
ironhacker

A variant of the others that keeps the logging and queue thread separate.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)

I like an idea of fetching logger name from queue record. This allows to use conventional fileConfig() in MainProcess and a barely configured logger in PoolWorkers (with only setLevel(logging.NOTSET)). As I mentioned in another comment, I'm using Pool so I had to obtain my Queue (proxy) from Manager instead of multiprocessing so it can be pickled. This allows me to pass queue to a worker inside of a dictionary (most of which is derived from argsparse object using vars()). I feel like in the end this is the best approach for MS Windows that lacks fork() and breaks @zzzeak solution.
@mlt I think you could also put a multiprocessing Queue in the init instead of using a Manager (see answer to stackoverflow.com/questions/25557686/… - it's about Locks but I believe it works for Queues as well)
@fantabolous That won't work on MS Windows or any other platform that lacks fork. That way each process will have its own independent useless queue. The second approach in the linked Q/A won't work on such platforms. It is a way to non-portable code.
@mlt Interesting. I'm using Windows and it seems to work ok for me - not long after I last commented I set up a pool of processes sharing a multiprocessing.Queue with the main process and I've been using it constantly since. Won't claim to understand why it works though.
s
schlamar

All current solutions are too coupled to the logging configuration by using a handler. My solution has the following architecture and features:

You can use any logging configuration you want

Logging is done in a daemon thread

Safe shutdown of the daemon by using a context manager

Communication to the logging thread is done by multiprocessing.Queue

In subprocesses, logging.Logger (and already defined instances) are patched to send all records to the queue

New: format traceback and message before sending to queue to prevent pickling errors

Code with usage example and output can be found at the following Gist: https://gist.github.com/schlamar/7003737


Unless I'm missing something, this isn't actually a daemon thread, since you never set daemon_thread.daemon to True. I needed to do that in order to get my Python program to exit properly when an exception occurs within the context manager.
I also needed to catch, log and swallow exceptions thrown by the target func in logged_call, otherwise the exception would get garbled with other logged output. Here's my modified version of this: gist.github.com/blah238/8ab79c4fe9cdb254f5c37abfc5dc85bf
@blah238 If you set @schlamar 's daemon (let's call it the QueueListener, for a better naming) as an actual daemon thread, you risk that being abruptly stopped when the main program exits. Imagine the queue has buffered quite a lot of messages, the main program comes to the end, exits the context manager, the None sentinel is added on top of the full queue, and then the main process terminates before the listener (daemon) is able to dequeue and handle all log messages. You would lose those messages. How are you handling this situation in your code?
S
Samuel

Since we can represent multiprocess logging as many publishers and one subscriber (listener), using ZeroMQ to implement PUB-SUB messaging is indeed an option.

Moreover, PyZMQ module, the Python bindings for ZMQ, implements PUBHandler, which is object for publishing logging messages over a zmq.PUB socket.

There's a solution on the web, for centralized logging from distributed application using PyZMQ and PUBHandler, which can be easily adopted for working locally with multiple publishing processes.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()

C
Community

I also like zzzeek's answer but Andre is correct that a queue is required to prevent garbling. I had some luck with the pipe, but did see garbling which is somewhat expected. Implementing it turned out to be harder than I thought, particularly due to running on Windows, where there are some additional restrictions about global variables and stuff (see: How's Python Multiprocessing Implemented on Windows?)

But, I finally got it working. This example probably isn't perfect, so comments and suggestions are welcome. It also does not support setting the formatter or anything other than the root logger. Basically, you have to reinit the logger in each of the pool processes with the queue and set up the other attributes on the logger.

Again, any suggestions on how to make the code better are welcome. I certainly don't know all the Python tricks yet :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()

I wonder if if 'MainProcess' == multiprocessing.current_process().name: can be used in place of passing child?
In case someone else is trying to use process pool instead of separate process objects on Windows, it worth mentioning that Manager shall be used to pass queue to subprocesses as it is not picklable directly.
This implementation worked well for me. I modified it to work with an arbitrary number of handlers. This way you can configure you root handler in a non-multiprocessing fashion, then where it is safe to make the queue, pass the root handlers to this, delete them, and make this the only handler.
r
rudaoshi

I'd like to suggest to use the logger_tt library: https://github.com/Dragon2fly/logger_tt

The multiporcessing_logging library is not working on my macOSX, while logger_tt does.


I don't know why there is a disagree with my answer. logger_tt library is certainly most friendly logging library for multiprocessing.
J
Javier

just publish somewhere your instance of the logger. that way, the other modules and clients can use your API to get the logger without having to import multiprocessing.


The problem with this is that the multiprocessing loggers appear unnamed, so you won't be able to decipher the message stream easily. Maybe it would be possible to name them after creation, which would make it more reasonable to look at.
well, publish one logger for each module, or better, export diferent closures that use the logger with the module name. the point is to let other modules use your API
Definitely reasonable (and +1 from me!), but I would miss being able to just import logging; logging.basicConfig(level=logging.DEBUG); logging.debug('spam!') from anywhere and have it work properly.
It's an interesting phenomenon that I see when I use Python, that we get so used to being able to do what we want in 1 or 2 simple lines that the simple and logical approach in other languages (eg. to publish the multiprocessing logger or wrap it in an accessor) still feels like a burden. :)
A
André Cruz

I liked zzzeek's answer. I would just substitute the Pipe for a Queue since if multiple threads/processes use the same pipe end to generate log messages they will get garbled.


I was having some issues with the handler, though it wasnt that messages were garbled, its just the whole thing would stop working. I changed Pipe to be Queue since that is more appropriate. However the errors I was getting weren't resolved by that - ultimately I added a try/except to the receive() method - very rarely, an attempt to log exceptions will fail and wind up being caught there. Once I added the try/except, it runs for weeks with no problem, and a standarderr file will grab about two errant exceptions per week.
n
np8

The concurrent-log-handler seems to do the job perfectly. Tested on Windows. Supports also POSIX systems.

Main idea

Create a separate file with a function that returns a logger. The logger must have fresh instance of ConcurrentRotatingFileHandler for each process. Example function get_logger() given below.

Creating loggers is done at the initialization of the process. For a multiprocessing.Process subclass it would mean the beginning of the run() method.

Detailed instructions

I this example, I will use the following file structure

.
│-- child.py        <-- For a child process
│-- logs.py         <-- For setting up the logs for the app
│-- main.py         <-- For a main process
│-- myapp.py        <-- For starting the app
│-- somemodule.py   <-- For an example, a "3rd party module using standard logging"

Code

Child process

# child.py 

import multiprocessing as mp
import time
from somemodule import do_something


class ChildProcess(mp.Process):
    def __init__(self):
        self.logger = None
        super().__init__()

    def run(self):
        from logs import get_logger
        self.logger = get_logger()


        while True:
            time.sleep(1)
            self.logger.info("Child process")
            do_something()

Simple child process that inherits multiprocessing.Process and simply logs to file text "Child process"

Important: The get_logger() is called inside the run(), or elsewhere inside the child process (not module level or in __init__().) This is required as get_logger() creates ConcurrentRotatingFileHandler instance, and new instance is needed for each process.

The do_something is used just to demonstrate that this works with 3rd party library code which does not have any clue that you are using concurrent-log-handler.

Main Process

# main.py

import logging
import multiprocessing as mp
import time

from child import ChildProcess
from somemodule import do_something


class MainProcess(mp.Process):
    def __init__(self):
        self.logger = logging.getLogger()
        super().__init__()

    def run(self):
        from logs import get_logger

        self.logger = get_logger()
        self.child = ChildProcess()
        self.child.daemon = True
        self.child.start()

        while True:
            time.sleep(0.5)
            self.logger.critical("Main process")
            do_something()


The main process that logs into file two times a second "Main process". Also inheriting from multiprocessing.Process.

Same comments for get_logger() and do_something() apply as for the child process.

Logger setup

# logs.py

import logging
import os

from concurrent_log_handler import ConcurrentRotatingFileHandler

LOGLEVEL = logging.DEBUG


def get_logger():
    logger = logging.getLogger()

    if logger.handlers:
        return logger

    # Use an absolute path to prevent file rotation trouble.
    logfile = os.path.abspath("mylog.log")

    logger.setLevel(LOGLEVEL)

    # Rotate log after reaching 512K, keep 5 old copies.
    filehandler = ConcurrentRotatingFileHandler(
        logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
    )
    filehandler.setLevel(LOGLEVEL)

    # create also handler for displaying output in the stdout
    ch = logging.StreamHandler()
    ch.setLevel(LOGLEVEL)

    formatter = logging.Formatter(
        "%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
    )

    # add formatter to ch
    ch.setFormatter(formatter)
    filehandler.setFormatter(formatter)

    logger.addHandler(ch)
    logger.addHandler(filehandler)

    return logger

This uses the ConcurrentRotatingFileHandler from the concurrent-log-handler package. Each process needs a fresh ConcurrentRotatingFileHandler instance.

Note that all the arguments for the ConcurrentRotatingFileHandler should be the same in every process.

Example app

# myapp.py 

if __name__ == "__main__":
    from main import MainProcess

    p = MainProcess()
    p.start()

Just a simple example on how to start the multiprocess application

Example of 3rd party module using standard logging

# somemodule.py 

import logging

logger = logging.getLogger("somemodule")

def do_something():
    logging.info("doing something")

Just a simple example to test if loggers from 3rd party code will work normally.

Example output

2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]


S
Sawan

How about delegating all the logging to another process that reads all log entries from a Queue?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

Simply share LOG_QUEUE via any of the multiprocess mechanisms or even inheritance and it all works out fine!


u
user6336812

Below is a class that can be used in Windows environment, requires ActivePython. You can also inherit for other logging handlers (StreamHandler etc.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

And here is an example that demonstrates usage:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass

Probably using multiprocessing.Lock() instead of Windows Mutex would make the solution portable.
R
Richard Jones

I have a solution that's similar to ironhacker's except that I use logging.exception in some of my code and found that I needed to format the exception before passing it back over the Queue since tracebacks aren't pickle'able:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s

I found a complete example along these lines here.
n
nmz787

Here's my simple hack/workaround... not the most comprehensive, but easily modifiable and simpler to read and understand I think than any other answers I found before writing this:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)

j
juan Isaza

There is this great package

Package: https://pypi.python.org/pypi/multiprocessing-logging/

code: https://github.com/jruere/multiprocessing-logging

Install:

pip install multiprocessing-logging

Then add:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()

This library is literally based off of another comment on the current SO post: stackoverflow.com/a/894284/1698058.
Origins: stackoverflow.com/a/894284/1663382 I appreciate the example usage of the module, in addition to the documentation on the homepage.
This module does not work if the multiprocessing context is spawn (default on >3.8 with MacOS)
O
Orsiris de Jong

For whoever might need this, I wrote a decorator for multiprocessing_logging package that adds the current process name to logs, so it becomes clear who logs what.

It also runs install_mp_handler() so it becomes unuseful to run it before creating a pool.

This allows me to see which worker creates which logs messages.

Here's the blueprint with an example:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')

c
cdleary

One of the alternatives is to write the mutliprocessing logging to a known file and register an atexit handler to join on those processes read it back on stderr; however, you won't get a real-time flow to the output messages on stderr that way.


is the approach you are proposing below identical to the one from your comment here stackoverflow.com/questions/641420/…
C
Community

If you have deadlocks occurring in a combination of locks, threads and forks in the logging module, that is reported in bug report 6721 (see also related SO question).

There is a small fixup solution posted here.

However, that will just fix any potential deadlocks in logging. That will not fix that things are maybe garbled up. See the other answers presented here.


u
user1460675

Simplest idea as mentioned:

Grab the filename and the process id of the current process.

Set up a [WatchedFileHandler][1]. The reasons for this handler are discussed in detail here, but in short there are certain worse race conditions with the other logging handlers. This one has the shortest window for the race condition. Choose a path to save the logs to such as /var/log/...

Choose a path to save the logs to such as /var/log/...