ChatGPT解决这个技术问题 Extra ChatGPT

在 Python 中使用多处理时我应该如何记录?

现在我在一个框架中有一个中心模块,它使用 Python 2.6 multiprocessing module 生成多个进程。因为它使用 multiprocessing,所以存在模块级多处理感知日志 LOG = multiprocessing.get_logger()。根据 the docs,此记录器 (EDIT) 确实 not 具有进程共享锁,因此您不会在 sys.stderr(或任何文件句柄)中通过有多个进程同时写入它。

我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中央模块的所有依赖项都使用多处理感知日志记录。这在框架内很烦人,更不用说框架的所有客户端了。有没有我没有想到的替代方案?

您链接到的文档与您所说的完全相反,记录器没有进程共享锁并且事情变得混乱 - 我也遇到了一个问题。
请参阅 stdlib 文档中的示例:Logging to a single file from multiple processes。配方不需要其他模块能够感知多处理。
那么,multiprocessing.get_logger() 的用例是什么?似乎基于这些其他日志记录方式的 multiprocessing 中的日志记录功能价值不大。
get_logger()multiprocessing 模块本身使用的记录器。如果您想调试 multiprocessing 问题,这很有用。

z
zzzeek

我刚刚编写了一个自己的日志处理程序,它只是通过管道将所有内容提供给父进程。我只测试了十分钟,但它似乎工作得很好。

注意:这是硬编码为 RotatingFileHandler,这是我自己的用例。)

更新:@javier 现在将这种方法作为 Pypi 上可用的包进行维护 - 请参阅 Pypi 上的 multiprocessing-logging,github 上 https://github.com/jruere/multiprocessing-logging

更新:实施!

这现在使用队列来正确处理并发,并且还可以正确地从错误中恢复。我现在已经在生产中使用它几个月了,下面的当前版本可以正常工作。

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)

上述处理程序从父进程执行所有文件写入,并仅使用一个线程接收从子进程传递的消息。如果您从派生的子进程调用处理程序本身,则说明使用不正确,您将遇到与 RotatingFileHandler 相同的所有问题。我已经使用上面的代码多年没有问题。
不幸的是,这种方法不适用于 Windows。从 docs.python.org/library/multiprocessing.html 16.6.2.12 “请注意,在 Windows 上,子进程将仅继承父进程的记录器的级别 - 记录器的任何其他自定义都不会被继承。”子进程不会继承处理程序,并且您不能显式传递它,因为它不可腌制。
值得注意的是,multiprocessing.Queueput() 中使用了一个线程。因此,在创建所有子进程之前不要调用 put(即使用 MultiProcessingLog 处理程序记录消息)。否则线程将死在子进程中。一种解决方案是在每个子进程开始时调用 Queue._after_fork(),或者改用 multiprocessing.queues.SimpleQueue,这不涉及线程,而是阻塞。
您能否添加一个简单的示例来显示初始化以及假设子进程的用法?我不太确定子进程应该如何在不实例化您的类的另一个实例的情况下访问队列。
@zzzeek,这个解决方案很好,但我找不到包含它的包或类似的东西,所以我创建了一个名为 multiprocessing-logging 的包。
Z
Zearin

以非侵入方式处理此问题的唯一方法是:

生成每个工作进程,使其日志转到不同的文件描述符(磁盘或管道)。理想情况下,所有日志条目都应该加上时间戳。然后,您的控制器进程可以执行以下操作之一: 如果使用磁盘文件:在运行结束时合并日志文件,按时间戳排序 如果使用管道(推荐):将所有管道中的日志条目即时合并到一个中央日志文件。 (例如,定期从管道的文件描述符中选择,对可用的日志条目执行合并排序,然后刷新到集中式日志。重复。)


很好,那是我想到之前的 35 秒(我想我会使用 atexit :-)。问题是它不会给你一个实时读数。与多线程相比,这可能是多处理成本的一部分。
@cdleary,使用管道方法将尽可能接近实时(特别是如果 stderr 在生成的进程中没有缓冲。)
顺便说一句,这里有一个很大的假设:不是 Windows。你在 Windows 上吗?
@BrandonRhodes - 正如我所说,非侵入式。如果有很多代码需要重新连接以使用 multiprocessing.Queue,和/或如果 performance is an issue
@schlamar 您可能想在评论之前重新阅读 OP;我不假设日志流,而是 OP明确声明遗留代码已经写入流 (stderr) 并且他仍然希望聚合日志记录到流中,尽管具有一定程度的行级原子性(非乱码)。你现在明白为什么这种方法对 OP 来说是非侵入性的了吗?至于基准,管道的 number 是不相关的;收益来自 缓冲 减少了系统调用的实际数量(以及对 客户端 性能的影响),以换取聚合器 proc 中的额外延迟。
f
fantabolous

QueueHandler 在 Python 3.2+ 中是本机的,并且正是这样做的。它很容易在以前的版本中复制。

Python 文档有两个完整的示例:Logging to a single file from multiple processes

对于那些使用 Python < 3.2,只需将 QueueHandler 复制到您自己的代码中:https://gist.github.com/vsajip/591589 或导入 logutils

每个进程(包括父进程)将其日志记录放在 Queue 上,然后一个 listener 线程或进程(为每个进程提供一个示例)拾取这些并将它们全部写入文件 - 没有损坏或乱码。


至少从 QueueHandler 出现开始,这应该是公认的答案。它不是侵入性的、透明的,并且无论主进程使用什么记录器配置都可以工作。工作人员总是记录到他们配置的 QueueHandler 也不期望从父进程到生成子进程的任何类型的记录器配置
u
user2133814

下面是另一个解决方案,重点是为从 Google 来到这里的其他任何人(比如我)提供简单性。记录应该很容易!仅适用于 3.2 或更高版本。

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()

QueueHandlerQueueListener 类也可用于 Python 2.7,可在 logutils 包中找到。
主进程的记录器也应该使用 QueueHandler。在您当前的代码中,主进程绕过队列,因此主进程和工作进程之间可能存在竞争条件。每个人都应该登录到队列(通过 QueueHandler),并且应该只允许 QueueListener 登录到 StreamHandler。
此外,您不必在每个孩子中初始化记录器。只需在父进程中初始化记录器,并在每个子进程中获取记录器。
Z
Zearin

另一种选择可能是 logging package 中的各种非基于文件的日志记录处理程序:

套接字处理器

数据报处理程序

系统日志处理程序

(和别的)

这样,您可以轻松地在可以安全写入并正确处理结果的某个地方拥有一个日志记录守护程序。 (例如,一个简单的套接字服务器,它只是解开消息并将其发送到它自己的旋转文件处理程序。)

SyslogHandler 也会为您解决这个问题。当然,您可以使用自己的 syslog 实例,而不是系统实例。


I
Iopheam

截至 2020 年,似乎有一种更简单的多处理日志记录方式。

此函数将创建记录器。您可以在此处设置格式以及您希望输出的位置(文件、标准输出):

def create_logger():
    import multiprocessing, logging
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(\
        '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
    handler = logging.FileHandler('logs/your_file_name.log')
    handler.setFormatter(formatter)

    # this bit will make sure you won't have 
    # duplicated messages in the output
    if not len(logger.handlers): 
        logger.addHandler(handler)
    return logger

在 init 中实例化记录器:

if __name__ == '__main__': 
    from multiprocessing import Pool
    logger = create_logger()
    logger.info('Starting pooling')
    p = Pool()
    # rest of the code

现在,您只需在需要记录的每个函数中添加此引用:

logger = create_logger()

并输出消息:

logger.info(f'My message from {something}')

希望这可以帮助。


这似乎是现在最直接的解决方案。请注意,“if not len(logger.handlers)”部分假设您将使用单个处理程序。如果您希望有多个处理程序,例如,将所有消息发送到一个文件,但仅将 INFO 及以上信息发送到标准输出,那么您需要调整该部分。
通常你有大量的代码只是导入日志然后使用诸如'logging.info("whatever")"之类的东西——没有地方可以将记录器对象传递给任何东西,也没有机会改造该代码。
这可行,但不是很灵活。例如,一旦将 create_logger() 放入所有函数中,就无法关闭日志记录,以防其他人想在他们自己的应用程序中使用您的库。库的最佳实践是永远不要强迫任何人查看日志消息。
@JamesMoore 我没有尝试将 logging.info(..) 与多处理一起使用。如果这可行,我很乐意更新答案。
@medley56 同意。也许,此代码不适用于库中。当我需要为临时任务调试多处理抓取工具时,我使用了它。
i
ironhacker

将日志记录和队列线程分开的其他变体。

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)

我喜欢从队列记录中获取记录器名称的想法。这允许在 MainProcess 中使用传统的 fileConfig(),在 PoolWorkers 中使用几乎没有配置的记录器(只有 setLevel(logging.NOTSET))。正如我在另一条评论中提到的,我使用的是 Pool,所以我必须从 Manager 获取我的队列(代理)而不是 multiprocessing 以便它可以被腌制。这允许我将队列传递给字典内的工作人员(其中大部分是使用 vars() 从 argsparse 对象派生的)。最后,我觉得这是缺少 fork() 并破坏 @zzzeak 解决方案的 MS Windows 的最佳方法。
@mlt 我认为您也可以在 init 中放置一个多处理队列,而不是使用管理器(请参阅对 stackoverflow.com/questions/25557686/… 的回答 - 它与锁有关,但我相信它也适用于队列)
@fantabolous 这不适用于 MS Windows 或任何其他缺少 fork 的平台。这样每个进程都会有自己独立的无用队列。链接问答中的第二种方法不适用于此类平台。这是一种不可移植代码的方式。
@mlt 有趣。我正在使用 Windows,它对我来说似乎工作正常 - 在我上次发表评论后不久,我设置了一个与主进程共享 multiprocessing.Queue 的进程池,从那以后我一直在使用它。不会声称理解它为什么起作用。
s
schlamar

通过使用处理程序,所有当前的解决方案都过于耦合到日志记录配置。我的解决方案具有以下架构和功能:

您可以使用任何您想要的日志记录配置

日志记录在守护线程中完成

使用上下文管理器安全关闭守护进程

与日志线程的通信由 multiprocessing.Queue 完成

在子流程中,logging.Logger(和已经定义的实例)被修补以将所有记录发送到队列

新:在发送到队列之前格式化回溯和消息以防止酸洗错误

可以在以下 Gist 中找到包含使用示例和输出的代码:https://gist.github.com/schlamar/7003737


除非我遗漏了什么,否则这实际上不是一个守护线程,因为您从未将 daemon_thread.daemon 设置为 True。当上下文管理器中发生异常时,我需要这样做以便让我的 Python 程序正确退出。
我还需要在 logged_call 中捕获、记录和吞下目标 func 抛出的异常,否则异常会与其他记录的输出混淆。这是我的修改版本:gist.github.com/blah238/8ab79c4fe9cdb254f5c37abfc5dc85bf
@blah238 如果您将 @schlamar 的 daemon (我们称其为 QueueListener,以便更好地命名)作为实际的守护线程,则可能会在主程序退出时突然停止。想象一下队列已经缓冲了相当多的消息,主程序结束,退出上下文管理器,None标记被添加到完整队列的顶部,然后主进程在侦听器之前终止({1 }) 能够出列并处理所有日志消息。你会丢失这些消息。您如何在代码中处理这种情况?
S
Samuel

由于我们可以将多进程日志记录表示为多个发布者和一个订阅者(侦听器),因此使用 ZeroMQ 来实现 PUB-SUB 消息传递确实是一种选择。

此外,PyZMQ 模块(ZMQ 的 Python 绑定)实现了 PUBHandler,它是通过 zmq.PUB 套接字发布日志消息的对象。

有一个 solution on the web,用于使用 PyZMQ 和 PUBHandler 从分布式应用程序集中记录日志,可以轻松采用在本地处理多个发布进程。

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()

C
Community

我也喜欢 zzzeek 的回答,但安德烈是正确的,需要一个队列来防止乱码。我对管道有一些运气,但确实看到了一些意料之中的乱码。实现它比我想象的要难,特别是由于在 Windows 上运行,那里有一些关于全局变量和东西的额外限制(参见:How's Python Multiprocessing Implemented on Windows?

但是,我终于让它工作了。这个例子可能并不完美,所以欢迎提出意见和建议。它也不支持设置格式化程序或除根记录器以外的任何内容。基本上,您必须使用队列重新启动每个池进程中的记录器,并在记录器上设置其他属性。

同样,欢迎任何有关如何使代码更好的建议。我当然还不知道所有的 Python 技巧 :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()

我想知道是否可以使用 if 'MainProcess' == multiprocessing.current_process().name: 代替传递 child
如果其他人试图在 Windows 上使用进程池而不是单独的进程对象,值得一提的是,Manager 应用于将队列传递给子进程,因为它不能直接选择。
这个实现对我来说效果很好。我修改它以使用任意数量的处理程序。通过这种方式,您可以以非多处理方式配置您的根处理程序,然后在可以安全的地方创建队列,将根处理程序传递给它,删除它们,并使其成为唯一的处理程序。
r
rudaoshi

我想建议使用 logger_tt 库:https://github.com/Dragon2fly/logger_tt

multiporcessing_logging 库在我的 macOSX 上不起作用,而 logger_tt 可以。


我不知道为什么有人不同意我的回答。 logger_tt 库无疑是最友好的多处理日志库。
J
Javier

只需在某处发布您的记录器实例。这样,其他模块和客户端可以使用您的 API 来获取记录器,而无需 import multiprocessing


这样做的问题是多处理记录器似乎未命名,因此您将无法轻松破译消息流。也许可以在创建之后给它们命名,这样看起来更合理。
好吧,为每个模块发布一个记录器,或者更好的是,导出使用带有模块名称的记录器的不同闭包。关键是让其他模块使用您的 API
绝对合理(而且我 +1!),但我会想念能够从任何地方import logging; logging.basicConfig(level=logging.DEBUG); logging.debug('spam!')并让它正常工作。
这是我在使用 Python 时看到的一个有趣的现象,我们已经习惯于用 1 或 2 行简单的代码来做我们想做的事情,而其他语言中的简单而合乎逻辑的方法(例如,发布多处理记录器或包装它在访问器中)仍然感觉像是一种负担。 :)
A
André Cruz

我喜欢 zzzeek 的回答。我只是将管道替换为队列,因为如果多个线程/进程使用相同的管道端来生成日志消息,它们会出现乱码。


我在处理程序方面遇到了一些问题,尽管消息不是乱码,只是整个事情都会停止工作。我将 Pipe 更改为 Queue ,因为这样更合适。然而,我得到的错误并没有得到解决——最终我在 receive() 方法中添加了一个 try/except ——很少,尝试记录异常会失败并最终被捕获。一旦我添加了 try/except,它会运行数周而没有问题,并且一个标准错误文件每周将抓取大约两个错误异常。
n
np8

concurrent-log-handler 似乎完美地完成了这项工作。在 Windows 上测试。还支持 POSIX 系统。

大意

使用返回记录器的函数创建一个单独的文件。记录器必须为每个进程提供新的 ConcurrentRotatingFileHandler 实例。下面给出了示例函数 get_logger()。

创建记录器是在进程初始化时完成的。对于 multiprocessing.Process 子类,它意味着 run() 方法的开始。

详细说明

我这个例子,我会使用下面的文件结构

.
│-- child.py        <-- For a child process
│-- logs.py         <-- For setting up the logs for the app
│-- main.py         <-- For a main process
│-- myapp.py        <-- For starting the app
│-- somemodule.py   <-- For an example, a "3rd party module using standard logging"

代码

子进程

# child.py 

import multiprocessing as mp
import time
from somemodule import do_something


class ChildProcess(mp.Process):
    def __init__(self):
        self.logger = None
        super().__init__()

    def run(self):
        from logs import get_logger
        self.logger = get_logger()


        while True:
            time.sleep(1)
            self.logger.info("Child process")
            do_something()

继承 multiprocessing.Process 并简单地记录到文件文本“子进程”的简单子进程

重要提示:get_logger() 在 run() 内部或子进程内部的其他位置(不是模块级别或在 __init__() 中调用)。这是必需的,因为 get_logger() 创建 ConcurrentRotatingFileHandler 实例,并且每个进程都需要新实例.

do_something 仅用于演示这适用于 3rd 方库代码,它没有任何线索表明您正在使用并发日志处理程序。

主要流程

# main.py

import logging
import multiprocessing as mp
import time

from child import ChildProcess
from somemodule import do_something


class MainProcess(mp.Process):
    def __init__(self):
        self.logger = logging.getLogger()
        super().__init__()

    def run(self):
        from logs import get_logger

        self.logger = get_logger()
        self.child = ChildProcess()
        self.child.daemon = True
        self.child.start()

        while True:
            time.sleep(0.5)
            self.logger.critical("Main process")
            do_something()


每秒两次登录文件的主进程“主进程”。也继承自 multiprocessing.Process。

get_logger() 和 do_something() 的注释与子进程相同。

记录仪设置

# logs.py

import logging
import os

from concurrent_log_handler import ConcurrentRotatingFileHandler

LOGLEVEL = logging.DEBUG


def get_logger():
    logger = logging.getLogger()

    if logger.handlers:
        return logger

    # Use an absolute path to prevent file rotation trouble.
    logfile = os.path.abspath("mylog.log")

    logger.setLevel(LOGLEVEL)

    # Rotate log after reaching 512K, keep 5 old copies.
    filehandler = ConcurrentRotatingFileHandler(
        logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
    )
    filehandler.setLevel(LOGLEVEL)

    # create also handler for displaying output in the stdout
    ch = logging.StreamHandler()
    ch.setLevel(LOGLEVEL)

    formatter = logging.Formatter(
        "%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
    )

    # add formatter to ch
    ch.setFormatter(formatter)
    filehandler.setFormatter(formatter)

    logger.addHandler(ch)
    logger.addHandler(filehandler)

    return logger

这使用了 concurrent-log-handler 包中的 ConcurrentRotatingFileHandler。每个进程都需要一个新的 ConcurrentRotatingFileHandler 实例。

请注意,ConcurrentRotatingFileHandler 的所有参数在每个进程中都应该相同。

示例应用

# myapp.py 

if __name__ == "__main__":
    from main import MainProcess

    p = MainProcess()
    p.start()

只是一个关于如何启动多进程应用程序的简单示例

使用标准日志记录的第 3 方模块示例

# somemodule.py 

import logging

logger = logging.getLogger("somemodule")

def do_something():
    logging.info("doing something")

只是一个简单的例子来测试来自 3rd 方代码的记录器是否可以正常工作。

示例输出

2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]


S
Sawan

将所有日志记录委托给另一个从队列中读取所有日志条目的进程怎么样?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

只需通过任何多进程机制甚至继承共享 LOG_QUEUE 即可,一切正常!


u
user6336812

下面是一个可以在 Windows 环境下使用的类,需要 ActivePython。您还可以继承其他日志处理程序(StreamHandler 等)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

这是一个演示用法的示例:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass

可能使用 multiprocessing.Lock() 而不是 Windows Mutex 会使解决方案具有可移植性。
R
Richard Jones

我有一个类似于 Ironhacker 的解决方案,除了我在我的一些代码中使用 logging.exception 并发现我需要在将异常传递回队列之前格式化异常,因为回溯不是 pickle'able:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s

我找到了一个完整的例子here
n
nmz787

这是我的简单黑客/解决方法......不是最全面的,但我认为比我在写这篇文章之前找到的任何其他答案更容易修改和更容易阅读和理解:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)

j
juan Isaza

有这个很棒的包裹

包:https://pypi.python.org/pypi/multiprocessing-logging/

代码:https://github.com/jruere/multiprocessing-logging

安装:

pip install multiprocessing-logging

然后加:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()

这个库实际上是基于对当前 SO 帖子的另一条评论:stackoverflow.com/a/894284/1698058
来源:stackoverflow.com/a/894284/1663382 除了主页上的文档外,我还欣赏该模块的示例用法。
如果多处理上下文为 spawn,则此模块不起作用(MacOS 的默认值 > 3.8)
O
Orsiris de Jong

对于可能需要这个的人,我为 multiprocessing_logging 包编写了一个装饰器,它将当前进程名称添加到日志中,因此可以清楚谁记录了什么。

它还运行 install_mp_handler() 因此在创建池之前运行它变得无用。

这让我可以看到哪个工作人员创建了哪些日志消息。

这是带有示例的蓝图:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')

c
cdleary

一种替代方法是将多处理日志写入已知文件并注册一个 atexit 处理程序以加入这些进程并在 stderr 上读取它;但是,您不会以这种方式获得到 stderr 上输出消息的实时流。


您在下面提出的方法是否与您在此处的评论中的方法相同stackoverflow.com/questions/641420/…
C
Community

如果您在 logging 模块中的锁、线程和分叉的组合中发生死锁,则会在 bug report 6721 中报告(另请参阅 related SO question)。

发布了一个小的修复解决方案 here

但是,这只会修复 logging 中的任何潜在死锁。这并不能解决事情可能会出现乱码的问题。请参阅此处提供的其他答案。


u
user1460675

提到的最简单的想法:

获取当前进程的文件名和进程 ID。

设置一个 [WatchedFileHandler][1]。此处详细讨论了此处理程序的原因,但简而言之,其他日志处理程序存在某些更糟糕的竞争条件。这个具有最短的竞争条件窗口。选择将日志保存到的路径,例如 /var/log/...

选择将日志保存到的路径,例如 /var/log/...