diff --git a/glances/globals.py b/glances/globals.py index f7d4c752..a0c62404 100644 --- a/glances/globals.py +++ b/glances/globals.py @@ -17,6 +17,7 @@ import base64 import errno import functools import importlib +import multiprocessing import os import platform import queue @@ -27,7 +28,6 @@ import weakref from collections import OrderedDict from configparser import ConfigParser, NoOptionError, NoSectionError from datetime import datetime -from multiprocessing import Process, Queue from operator import itemgetter, methodcaller from statistics import mean from typing import Any, Optional, Union @@ -97,6 +97,11 @@ viewkeys = methodcaller('keys') viewvalues = methodcaller('values') viewitems = methodcaller('items') +# Multiprocessing start method (on POSIX system) +if LINUX or BSD or SUNOS or MACOS: + ctx_mp_fork = multiprocessing.get_context('fork') +else: + ctx_mp_fork = multiprocessing.get_context() ################### # GLOBALS FUNCTIONS @@ -600,8 +605,8 @@ def exit_after(seconds, default=None): return func def wraps(*args, **kwargs): - q = Queue() - p = Process(target=handler, args=(q, func, args, kwargs)) + q = ctx_mp_fork.Queue() + p = ctx_mp_fork.Process(target=handler, args=(q, func, args, kwargs)) p.start() p.join(timeout=seconds) if not p.is_alive(): diff --git a/tests-data/issues/issue3319.py b/tests-data/issues/issue3319.py new file mode 100644 index 00000000..8846f7fd --- /dev/null +++ b/tests-data/issues/issue3319.py @@ -0,0 +1,57 @@ +#################################################################################### +# +# Migration of the code issue3290.py to use multiprocessing with 'fork' start method +# +#################################################################################### + +import multiprocessing +import time + +import psutil + +# multiprocessing.set_start_method("fork") +ctx_mp_fork = multiprocessing.get_context('fork') + + +def exit_after(seconds, default=None): + """Exit the function if it takes more than 'second' seconds to complete. + In this case, return the value of 'default' (default: None).""" + + def handler(q, func, args, kwargs): + q.put(func(*args, **kwargs)) + + def decorator(func): + def wraps(*args, **kwargs): + q = ctx_mp_fork.Queue() + p = ctx_mp_fork.Process(target=handler, args=(q, func, args, kwargs)) + p.start() + p.join(timeout=seconds) + if not p.is_alive(): + return q.get() + + p.terminate() + p.join(timeout=0.1) + if p.is_alive(): + # Kill in case processes doesn't terminate + # Happens with cases like broken NFS connections + p.kill() + return default + + return wraps + + return decorator + + +class Issue3290: + @exit_after(1, default=None) + def blocking_io_call(self, fs): + try: + return psutil.disk_usage(fs) + except OSError: + return None + + +issue = Issue3290() +while True: + print(f"{time.time()} {issue.blocking_io_call('/home/nicolargo/tmp/hang')}") + time.sleep(1)