add: generic timeout wrappers

For use with blocking functions and utilities

Co-authored-by: Raz Crimson <52282402+razcrimson@users.noreply.github.com>
This commit is contained in:
Nicolas Hennion 2025-09-28 05:51:32 +05:30 committed by RazCrimson
parent 420cf2bdde
commit da700c8cfc
3 changed files with 102 additions and 0 deletions

View File

@ -27,6 +27,7 @@ import weakref
from collections import OrderedDict
from configparser import ConfigParser, NoOptionError, NoSectionError
from datetime import datetime
from multiprocessing import Process, Queue
from operator import itemgetter, methodcaller
from statistics import mean
from typing import Any, Optional, Union
@ -584,3 +585,32 @@ def atoi(text):
def natural_keys(text):
"""Return a text in a natural/human readable format."""
return [atoi(c) for c in re.split(r'(\d+)', text)]
def exit_after(seconds, default=None):
"""Exit the function if it takes more than 'seconds' seconds to complete.
In this case, return the value of 'default' (default: None)."""
def handler(q, func, args, kwargs):
q.put(func(*args, **kwargs))
def decorator(func):
def wraps(*args, **kwargs):
q = Queue()
p = Process(target=handler, args=(q, func, args, kwargs))
p.start()
p.join(timeout=seconds)
if not p.is_alive():
return q.get()
p.terminate()
p.join(timeout=0.1)
if p.is_alive():
# Kill in case processes doesn't terminate
# Happens with cases like broken NFS connections
p.kill()
return default
return wraps
return decorator

View File

@ -14,6 +14,12 @@ I am your father...
import copy
import re
import threading
try:
import thread
except ImportError:
import _thread as thread
from glances.actions import GlancesActions
from glances.events_list import glances_events
@ -1212,7 +1218,25 @@ class GlancesPluginModel:
return wrapper
def _exit_after(second):
"""Exit the function if it takes more than 'second' seconds to complete."""
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(second, thread.interrupt_main, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
# Mandatory to call the decorator in child classes
_check_decorator = staticmethod(_check_decorator)
_log_result_decorator = staticmethod(_log_result_decorator)
_manage_rate = staticmethod(_manage_rate)
_exit_after = staticmethod(_exit_after)

View File

@ -0,0 +1,48 @@
import time
from multiprocessing import Process, Queue
import psutil
def exit_after(seconds, default=None):
"""Exit the function if it takes more than 'second' seconds to complete.
In this case, return the value of 'default' (default: None)."""
def handler(q, func, args, kwargs):
q.put(func(*args, **kwargs))
def decorator(func):
def wraps(*args, **kwargs):
q = Queue()
p = Process(target=handler, args=(q, func, args, kwargs))
p.start()
p.join(timeout=seconds)
if not p.is_alive():
return q.get()
p.terminate()
p.join(timeout=0.1)
if p.is_alive():
# Kill in case processes doesn't terminate
# Happens with cases like broken NFS connections
p.kill()
return default
return wraps
return decorator
class Issue3290:
@exit_after(1, default=None)
def blocking_io_call(self, fs):
try:
return psutil.disk_usage(fs)
except OSError:
return None
issue = Issue3290()
while True:
print(f"{time.time()} {issue.blocking_io_call('/home/nicolargo/tmp/hang')}")
time.sleep(1)