Refactored `glances.plugins.ports.PluginModel.msg_curse()`

Part of #2801.

Signed-off-by: Ariel Otilibili <otilibil@eurecom.fr>
This commit is contained in:
Ariel Otilibili 2024-10-04 20:39:33 +02:00
parent 3ede32a359
commit cbd166bf0b
1 changed files with 47 additions and 34 deletions

View File

@ -14,6 +14,7 @@ import socket
import subprocess
import threading
import time
from functools import partial, reduce
from glances.globals import BSD, MACOS, WINDOWS, bool_type
from glances.logger import logger
@ -166,14 +167,55 @@ class PluginModel(GlancesPluginModel):
return ret
def set_status_if_host(self, p):
if p['host'] is None:
status = 'None'
elif p['status'] is None:
status = 'Scanning'
elif isinstance(p['status'], bool_type) and p['status'] is True:
status = 'Open'
elif p['status'] == 0:
status = 'Timeout'
else:
# Convert second to ms
status = '{:.0f}ms'.format(p['status'] * 1000.0)
return status
def set_status_if_url(self, p):
if isinstance(p['status'], numbers.Number):
status = 'Code {}'.format(p['status'])
elif p['status'] is None:
status = 'Scanning'
else:
status = p['status']
return status
def build_str(self, name_max_width, ret, p):
if 'host' in p:
helper = self.get_ports_alert
status = self.set_status_if_host(p)
elif 'url' in p:
helper = self.get_web_alert
status = self.set_status_if_url(p)
msg = '{:{width}}'.format(p['description'][0:name_max_width], width=name_max_width)
ret.append(self.curse_add_line(msg))
msg = f'{status:>9}'
ret.append(self.curse_add_line(msg, helper(p, header=p['indice'] + '_rtt')))
ret.append(self.curse_new_line())
return ret
def msg_curse(self, args=None, max_width=None):
"""Return the dict to display in the curse interface."""
# Init the return message
# Only process if stats exist and display plugin enable...
ret = []
init = []
if not self.stats or args.disable_ports:
return ret
return init
# Max size for the interface name
if max_width:
@ -181,40 +223,11 @@ class PluginModel(GlancesPluginModel):
else:
# No max_width defined, return an empty curse message
logger.debug(f"No max_width defined for the {self.plugin_name} plugin, it will not be displayed.")
return ret
return init
# Build the string message
for p in self.stats:
if 'host' in p:
if p['host'] is None:
status = 'None'
elif p['status'] is None:
status = 'Scanning'
elif isinstance(p['status'], bool_type) and p['status'] is True:
status = 'Open'
elif p['status'] == 0:
status = 'Timeout'
else:
# Convert second to ms
status = '{:.0f}ms'.format(p['status'] * 1000.0)
msg = '{:{width}}'.format(p['description'][0:name_max_width], width=name_max_width)
ret.append(self.curse_add_line(msg))
msg = f'{status:>9}'
ret.append(self.curse_add_line(msg, self.get_ports_alert(p, header=p['indice'] + '_rtt')))
ret.append(self.curse_new_line())
elif 'url' in p:
msg = '{:{width}}'.format(p['description'][0:name_max_width], width=name_max_width)
ret.append(self.curse_add_line(msg))
if isinstance(p['status'], numbers.Number):
status = 'Code {}'.format(p['status'])
elif p['status'] is None:
status = 'Scanning'
else:
status = p['status']
msg = f'{status:>9}'
ret.append(self.curse_add_line(msg, self.get_web_alert(p, header=p['indice'] + '_rtt')))
ret.append(self.curse_new_line())
build_str_with_this_max_width = partial(self.build_str, name_max_width)
ret = reduce(build_str_with_this_max_width, self.stats, init)
# Delete the last empty line
try: