diff --git a/glances/exports/export.py b/glances/exports/export.py index bbd37306..c0cf7422 100644 --- a/glances/exports/export.py +++ b/glances/exports/export.py @@ -1,7 +1,7 @@ # # This file is part of Glances. # -# SPDX-FileCopyrightText: 2022 Nicolas Hennion +# SPDX-FileCopyrightText: 2026 Nicolas Hennion # # SPDX-License-Identifier: LGPL-3.0-only # diff --git a/glances/exports/export_asyncio.py b/glances/exports/export_asyncio.py new file mode 100644 index 00000000..71ec6a98 --- /dev/null +++ b/glances/exports/export_asyncio.py @@ -0,0 +1,168 @@ +# +# This file is part of Glances. +# +# SPDX-FileCopyrightText: 2026 Nicolas Hennion +# +# SPDX-License-Identifier: LGPL-3.0-only +# + +""" +I am your son... +...abstract class for AsyncIO-based Glances exports. +""" + +import asyncio +import threading +import time +from abc import abstractmethod + +from glances.exports.export import GlancesExport +from glances.logger import logger + + +class GlancesExportAsyncio(GlancesExport): + """Abstract class for AsyncIO-based export modules. + + This class manages a persistent event loop in a background thread, + allowing child classes to use AsyncIO operations for exporting data. + + Child classes must implement: + - async _async_init(): AsyncIO initialization (e.g., connection setup) + - async _async_exit(): AsyncIO cleanup (e.g., disconnection) + - async _async_export(name, columns, points): AsyncIO export operation + """ + + def __init__(self, config=None, args=None): + """Init the AsyncIO export interface.""" + super().__init__(config=config, args=args) + + # AsyncIO event loop management + self.loop = None + self._loop_ready = threading.Event() + self._loop_exception = None + self._shutdown = False + + # Start the background event loop thread + self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) + self._loop_thread.start() + + # Wait for the loop to be ready + if not self._loop_ready.wait(timeout=10): + raise RuntimeError("AsyncIO event loop failed to start within timeout") + + if self._loop_exception: + raise RuntimeError(f"AsyncIO event loop creation failed: {self._loop_exception}") + + if self.loop is None: + raise RuntimeError("AsyncIO event loop is None after initialization") + + # Call child class AsyncIO initialization + future = asyncio.run_coroutine_threadsafe(self._async_init(), self.loop) + try: + future.result(timeout=10) + logger.debug(f"{self.export_name} AsyncIO export initialized successfully") + except Exception as e: + logger.warning(f"{self.export_name} AsyncIO initialization failed: {e}. Will retry in background.") + + def _run_event_loop(self): + """Run event loop in background thread.""" + try: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self._loop_ready.set() + self.loop.run_forever() + except Exception as e: + self._loop_exception = e + self._loop_ready.set() + logger.error(f"{self.export_name} AsyncIO event loop thread error: {e}") + finally: + # Clean up pending tasks + pending = asyncio.all_tasks(self.loop) + for task in pending: + task.cancel() + if pending: + self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + self.loop.close() + + @abstractmethod + async def _async_init(self): + """AsyncIO initialization method. + + Child classes should implement this method to perform AsyncIO-based + initialization such as connecting to servers, setting up clients, etc. + + This method is called once during __init__ after the event loop is ready. + """ + pass + + @abstractmethod + async def _async_exit(self): + """AsyncIO cleanup method. + + Child classes should implement this method to perform AsyncIO-based + cleanup such as disconnecting from servers, closing clients, etc. + + This method is called during exit() before stopping the event loop. + """ + pass + + @abstractmethod + async def _async_export(self, name, columns, points): + """AsyncIO export method. + + Child classes must implement this method to perform the actual + export operation using AsyncIO. + + :param name: plugin name + :param columns: list of column names + :param points: list of values corresponding to columns + """ + pass + + def exit(self): + """Close the AsyncIO export module.""" + super().exit() + self._shutdown = True + logger.info(f"{self.export_name} AsyncIO export shutting down") + + # Call child class cleanup + if self.loop: + future = asyncio.run_coroutine_threadsafe(self._async_exit(), self.loop) + try: + future.result(timeout=5) + except Exception as e: + logger.error(f"{self.export_name} Error in AsyncIO cleanup: {e}") + + # Stop the event loop + if self.loop: + self.loop.call_soon_threadsafe(self.loop.stop) + time.sleep(0.5) + + logger.debug(f"{self.export_name} AsyncIO export shutdown complete") + + def export(self, name, columns, points): + """Export data using AsyncIO. + + This method bridges the synchronous export() interface with + the AsyncIO _async_export() implementation. + """ + if self._shutdown: + logger.debug(f"{self.export_name} Export called during shutdown, skipping") + return + + if not self.loop or not self.loop.is_running(): + logger.error(f"{self.export_name} AsyncIO event loop is not running") + return + + # Submit the export operation to the background event loop + try: + future = asyncio.run_coroutine_threadsafe( + self._async_export(name, columns, points), + self.loop + ) + # Don't block forever - use a short timeout + future.result(timeout=1) + except asyncio.TimeoutError: + logger.warning(f"{self.export_name} AsyncIO export timeout for {name}") + except Exception as e: + logger.error(f"{self.export_name} AsyncIO export error for {name}: {e}", exc_info=True) diff --git a/glances/exports/glances_nats/__init__.py b/glances/exports/glances_nats/__init__.py index 47d44338..97941cfd 100644 --- a/glances/exports/glances_nats/__init__.py +++ b/glances/exports/glances_nats/__init__.py @@ -1,96 +1,57 @@ # # This file is part of Glances. # -# SPDX-FileCopyrightText: 2022 Nicolas Hennion +# SPDX-FileCopyrightText: 2026 Nicolas Hennion # # SPDX-License-Identifier: LGPL-3.0-only # """NATS interface class.""" -import asyncio -import threading -import time - from nats.aio.client import Client as NATS from nats.errors import ConnectionClosedError from nats.errors import TimeoutError as NatsTimeoutError -from glances.exports.export import GlancesExport +from glances.exports.export_asyncio import GlancesExportAsyncio from glances.globals import json_dumps from glances.logger import logger -class Export(GlancesExport): +class Export(GlancesExportAsyncio): """This class manages the NATS export module.""" def __init__(self, config=None, args=None): """Init the NATS export IF.""" - super().__init__(config=config, args=args) + # Load the NATS configuration file before calling super().__init__ + # because super().__init__ will call _async_init() which needs config + self.config = config + self.args = args + self.export_name = self.__class__.__module__ - # Load the NATS configuration file - self.export_enable = self.load_conf( + export_enable = self.load_conf( 'nats', mandatories=['host'], options=['prefix'], ) - if not self.export_enable: + if not export_enable: exit('Missing NATS config') self.prefix = self.prefix or 'glances' # Host is a comma-separated list of NATS servers self.hosts = self.host - # Create a persistent event loop in a background thread - self.loop = None + # NATS-specific attributes self.client = None self._connected = False - self._shutdown = False - self._loop_ready = threading.Event() - self._loop_exception = None self._publish_count = 0 - self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) - self._loop_thread.start() + # Call parent __init__ which will start event loop and call _async_init() + super().__init__(config=config, args=args) - # Wait for the loop to be ready - if not self._loop_ready.wait(timeout=10): - exit("NATS event loop failed to start within timeout") + # Restore export_enable after super().__init__() resets it to False + self.export_enable = export_enable - if self._loop_exception: - exit(f"NATS event loop creation failed: {self._loop_exception}") - - if self.loop is None: - exit("NATS event loop is None after initialization") - - # Initial connection attempt - future = asyncio.run_coroutine_threadsafe(self._connect(), self.loop) - try: - future.result(timeout=10) - logger.info("NATS export initialized successfully") - except Exception as e: - logger.warning(f"NATS Initial connection failed: {e}. Will retry in background.") - - def _run_event_loop(self): - """Run event loop in background thread.""" - try: - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - self._loop_ready.set() - self.loop.run_forever() - except Exception as e: - self._loop_exception = e - self._loop_ready.set() - logger.error(f"NATS Export Event loop thread error: {e}") - finally: - pending = asyncio.all_tasks(self.loop) - for task in pending: - task.cancel() - if pending: - self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) - self.loop.close() - - async def _connect(self): + async def _async_init(self): """Connect to NATS with error handling.""" try: if self.client: @@ -114,7 +75,7 @@ class Export(GlancesExport): ) self._connected = True - logger.debug(f"NATS Successfully connected to servers: {self.hosts}") + logger.info(f"NATS Successfully connected to servers: {self.hosts}") except Exception as e: self._connected = False logger.error(f"NATS connection error: {e}") @@ -134,46 +95,19 @@ class Export(GlancesExport): self._connected = True logger.debug("NATS reconnected callback") - def exit(self): - """Close the NATS connection.""" - super().exit() - self._shutdown = True - logger.info("NATS export shutting down") - - if self.loop and self.client: - future = asyncio.run_coroutine_threadsafe(self._disconnect(), self.loop) - try: - future.result(timeout=5) - except Exception as e: - logger.error(f"NATS Error disconnecting from server: {e}") - - if self.loop: - self.loop.call_soon_threadsafe(self.loop.stop) - time.sleep(0.5) - - logger.debug(f"NATS export shutdown complete. Total messages published: {self._publish_count}") - - async def _disconnect(self): + async def _async_exit(self): """Disconnect from NATS.""" try: if self.client and self._connected: await self.client.drain() await self.client.close() self._connected = False - logger.debug("NATS disconnected cleanly") + logger.debug(f"NATS disconnected cleanly. Total messages published: {self._publish_count}") except Exception as e: logger.error(f"NATS Error in disconnect: {e}") - def export(self, name, columns, points): - """Write the points in NATS.""" - if self._shutdown: - logger.debug("NATS Export called during shutdown, skipping") - return - - if not self.loop or not self.loop.is_running(): - logger.error("NATS event loop is not running") - return - + async def _async_export(self, name, columns, points): + """Write the points to NATS using AsyncIO.""" if not self._connected: logger.warning("NATS not connected, skipping export") return @@ -181,33 +115,19 @@ class Export(GlancesExport): subject_name = f"{self.prefix}.{name}" subject_data = dict(zip(columns, points)) - # Submit the publish operation to the background event loop - try: - future = asyncio.run_coroutine_threadsafe( - self._publish(subject_name, json_dumps(subject_data)), - self.loop - ) - # Don't block forever - use a short timeout - future.result(timeout=1) - self._publish_count += 1 - except asyncio.TimeoutError: - logger.warning(f"NATS publish timeout for {subject_name}") - except Exception as e: - logger.error(f"NATS publish error for {subject_name}: {e}", exc_info=True) - - async def _publish(self, subject, data): - """Publish data to NATS.""" + # Publish data to NATS try: if not self._connected: raise ConnectionClosedError("NATS Not connected to server") - await self.client.publish(subject, data) - await asyncio.wait_for(self.client.flush(), timeout=2.0) + await self.client.publish(subject_name, json_dumps(subject_data)) + await self.client.flush(timeout=2.0) + self._publish_count += 1 except (ConnectionClosedError, NatsTimeoutError) as e: self._connected = False - logger.error(f"NATS publish failed: {e}") + logger.error(f"NATS publish failed for {subject_name}: {e}") raise except Exception as e: - logger.error(f"NATS Unexpected error in _publish: {e}", exc_info=True) + logger.error(f"NATS Unexpected error publishing {subject_name}: {e}", exc_info=True) raise # End of glances/exports/glances_nats/__init__.py