Refactor NATS export to use async base class

Replace custom event loop threading implementation with GlancesExportAsyncio
base class. This simplifies the code by delegating async lifecycle management
to the parent class and using standardized _async_init(), _async_exit(), and
_async_export() methods instead of manual thread and event loop handling.
This commit is contained in:
nicolargo 2026-01-02 17:28:12 +01:00
parent 6cedd16753
commit 4bd36a34a1
3 changed files with 196 additions and 108 deletions

View File

@ -1,7 +1,7 @@
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com>
# SPDX-FileCopyrightText: 2026 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#

View File

@ -0,0 +1,168 @@
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2026 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#
"""
I am your son...
...abstract class for AsyncIO-based Glances exports.
"""
import asyncio
import threading
import time
from abc import abstractmethod
from glances.exports.export import GlancesExport
from glances.logger import logger
class GlancesExportAsyncio(GlancesExport):
"""Abstract class for AsyncIO-based export modules.
This class manages a persistent event loop in a background thread,
allowing child classes to use AsyncIO operations for exporting data.
Child classes must implement:
- async _async_init(): AsyncIO initialization (e.g., connection setup)
- async _async_exit(): AsyncIO cleanup (e.g., disconnection)
- async _async_export(name, columns, points): AsyncIO export operation
"""
def __init__(self, config=None, args=None):
"""Init the AsyncIO export interface."""
super().__init__(config=config, args=args)
# AsyncIO event loop management
self.loop = None
self._loop_ready = threading.Event()
self._loop_exception = None
self._shutdown = False
# Start the background event loop thread
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
# Wait for the loop to be ready
if not self._loop_ready.wait(timeout=10):
raise RuntimeError("AsyncIO event loop failed to start within timeout")
if self._loop_exception:
raise RuntimeError(f"AsyncIO event loop creation failed: {self._loop_exception}")
if self.loop is None:
raise RuntimeError("AsyncIO event loop is None after initialization")
# Call child class AsyncIO initialization
future = asyncio.run_coroutine_threadsafe(self._async_init(), self.loop)
try:
future.result(timeout=10)
logger.debug(f"{self.export_name} AsyncIO export initialized successfully")
except Exception as e:
logger.warning(f"{self.export_name} AsyncIO initialization failed: {e}. Will retry in background.")
def _run_event_loop(self):
"""Run event loop in background thread."""
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._loop_ready.set()
self.loop.run_forever()
except Exception as e:
self._loop_exception = e
self._loop_ready.set()
logger.error(f"{self.export_name} AsyncIO event loop thread error: {e}")
finally:
# Clean up pending tasks
pending = asyncio.all_tasks(self.loop)
for task in pending:
task.cancel()
if pending:
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
self.loop.close()
@abstractmethod
async def _async_init(self):
"""AsyncIO initialization method.
Child classes should implement this method to perform AsyncIO-based
initialization such as connecting to servers, setting up clients, etc.
This method is called once during __init__ after the event loop is ready.
"""
pass
@abstractmethod
async def _async_exit(self):
"""AsyncIO cleanup method.
Child classes should implement this method to perform AsyncIO-based
cleanup such as disconnecting from servers, closing clients, etc.
This method is called during exit() before stopping the event loop.
"""
pass
@abstractmethod
async def _async_export(self, name, columns, points):
"""AsyncIO export method.
Child classes must implement this method to perform the actual
export operation using AsyncIO.
:param name: plugin name
:param columns: list of column names
:param points: list of values corresponding to columns
"""
pass
def exit(self):
"""Close the AsyncIO export module."""
super().exit()
self._shutdown = True
logger.info(f"{self.export_name} AsyncIO export shutting down")
# Call child class cleanup
if self.loop:
future = asyncio.run_coroutine_threadsafe(self._async_exit(), self.loop)
try:
future.result(timeout=5)
except Exception as e:
logger.error(f"{self.export_name} Error in AsyncIO cleanup: {e}")
# Stop the event loop
if self.loop:
self.loop.call_soon_threadsafe(self.loop.stop)
time.sleep(0.5)
logger.debug(f"{self.export_name} AsyncIO export shutdown complete")
def export(self, name, columns, points):
"""Export data using AsyncIO.
This method bridges the synchronous export() interface with
the AsyncIO _async_export() implementation.
"""
if self._shutdown:
logger.debug(f"{self.export_name} Export called during shutdown, skipping")
return
if not self.loop or not self.loop.is_running():
logger.error(f"{self.export_name} AsyncIO event loop is not running")
return
# Submit the export operation to the background event loop
try:
future = asyncio.run_coroutine_threadsafe(
self._async_export(name, columns, points),
self.loop
)
# Don't block forever - use a short timeout
future.result(timeout=1)
except asyncio.TimeoutError:
logger.warning(f"{self.export_name} AsyncIO export timeout for {name}")
except Exception as e:
logger.error(f"{self.export_name} AsyncIO export error for {name}: {e}", exc_info=True)

View File

@ -1,96 +1,57 @@
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com>
# SPDX-FileCopyrightText: 2026 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#
"""NATS interface class."""
import asyncio
import threading
import time
from nats.aio.client import Client as NATS
from nats.errors import ConnectionClosedError
from nats.errors import TimeoutError as NatsTimeoutError
from glances.exports.export import GlancesExport
from glances.exports.export_asyncio import GlancesExportAsyncio
from glances.globals import json_dumps
from glances.logger import logger
class Export(GlancesExport):
class Export(GlancesExportAsyncio):
"""This class manages the NATS export module."""
def __init__(self, config=None, args=None):
"""Init the NATS export IF."""
super().__init__(config=config, args=args)
# Load the NATS configuration file before calling super().__init__
# because super().__init__ will call _async_init() which needs config
self.config = config
self.args = args
self.export_name = self.__class__.__module__
# Load the NATS configuration file
self.export_enable = self.load_conf(
export_enable = self.load_conf(
'nats',
mandatories=['host'],
options=['prefix'],
)
if not self.export_enable:
if not export_enable:
exit('Missing NATS config')
self.prefix = self.prefix or 'glances'
# Host is a comma-separated list of NATS servers
self.hosts = self.host
# Create a persistent event loop in a background thread
self.loop = None
# NATS-specific attributes
self.client = None
self._connected = False
self._shutdown = False
self._loop_ready = threading.Event()
self._loop_exception = None
self._publish_count = 0
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
# Call parent __init__ which will start event loop and call _async_init()
super().__init__(config=config, args=args)
# Wait for the loop to be ready
if not self._loop_ready.wait(timeout=10):
exit("NATS event loop failed to start within timeout")
# Restore export_enable after super().__init__() resets it to False
self.export_enable = export_enable
if self._loop_exception:
exit(f"NATS event loop creation failed: {self._loop_exception}")
if self.loop is None:
exit("NATS event loop is None after initialization")
# Initial connection attempt
future = asyncio.run_coroutine_threadsafe(self._connect(), self.loop)
try:
future.result(timeout=10)
logger.info("NATS export initialized successfully")
except Exception as e:
logger.warning(f"NATS Initial connection failed: {e}. Will retry in background.")
def _run_event_loop(self):
"""Run event loop in background thread."""
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._loop_ready.set()
self.loop.run_forever()
except Exception as e:
self._loop_exception = e
self._loop_ready.set()
logger.error(f"NATS Export Event loop thread error: {e}")
finally:
pending = asyncio.all_tasks(self.loop)
for task in pending:
task.cancel()
if pending:
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
self.loop.close()
async def _connect(self):
async def _async_init(self):
"""Connect to NATS with error handling."""
try:
if self.client:
@ -114,7 +75,7 @@ class Export(GlancesExport):
)
self._connected = True
logger.debug(f"NATS Successfully connected to servers: {self.hosts}")
logger.info(f"NATS Successfully connected to servers: {self.hosts}")
except Exception as e:
self._connected = False
logger.error(f"NATS connection error: {e}")
@ -134,46 +95,19 @@ class Export(GlancesExport):
self._connected = True
logger.debug("NATS reconnected callback")
def exit(self):
"""Close the NATS connection."""
super().exit()
self._shutdown = True
logger.info("NATS export shutting down")
if self.loop and self.client:
future = asyncio.run_coroutine_threadsafe(self._disconnect(), self.loop)
try:
future.result(timeout=5)
except Exception as e:
logger.error(f"NATS Error disconnecting from server: {e}")
if self.loop:
self.loop.call_soon_threadsafe(self.loop.stop)
time.sleep(0.5)
logger.debug(f"NATS export shutdown complete. Total messages published: {self._publish_count}")
async def _disconnect(self):
async def _async_exit(self):
"""Disconnect from NATS."""
try:
if self.client and self._connected:
await self.client.drain()
await self.client.close()
self._connected = False
logger.debug("NATS disconnected cleanly")
logger.debug(f"NATS disconnected cleanly. Total messages published: {self._publish_count}")
except Exception as e:
logger.error(f"NATS Error in disconnect: {e}")
def export(self, name, columns, points):
"""Write the points in NATS."""
if self._shutdown:
logger.debug("NATS Export called during shutdown, skipping")
return
if not self.loop or not self.loop.is_running():
logger.error("NATS event loop is not running")
return
async def _async_export(self, name, columns, points):
"""Write the points to NATS using AsyncIO."""
if not self._connected:
logger.warning("NATS not connected, skipping export")
return
@ -181,33 +115,19 @@ class Export(GlancesExport):
subject_name = f"{self.prefix}.{name}"
subject_data = dict(zip(columns, points))
# Submit the publish operation to the background event loop
try:
future = asyncio.run_coroutine_threadsafe(
self._publish(subject_name, json_dumps(subject_data)),
self.loop
)
# Don't block forever - use a short timeout
future.result(timeout=1)
self._publish_count += 1
except asyncio.TimeoutError:
logger.warning(f"NATS publish timeout for {subject_name}")
except Exception as e:
logger.error(f"NATS publish error for {subject_name}: {e}", exc_info=True)
async def _publish(self, subject, data):
"""Publish data to NATS."""
# Publish data to NATS
try:
if not self._connected:
raise ConnectionClosedError("NATS Not connected to server")
await self.client.publish(subject, data)
await asyncio.wait_for(self.client.flush(), timeout=2.0)
await self.client.publish(subject_name, json_dumps(subject_data))
await self.client.flush(timeout=2.0)
self._publish_count += 1
except (ConnectionClosedError, NatsTimeoutError) as e:
self._connected = False
logger.error(f"NATS publish failed: {e}")
logger.error(f"NATS publish failed for {subject_name}: {e}")
raise
except Exception as e:
logger.error(f"NATS Unexpected error in _publish: {e}", exc_info=True)
logger.error(f"NATS Unexpected error publishing {subject_name}: {e}", exc_info=True)
raise
# End of glances/exports/glances_nats/__init__.py