diff --git a/Makefile b/Makefile index f97cbc4d..90264121 100644 --- a/Makefile +++ b/Makefile @@ -134,7 +134,10 @@ test-export-influxdb-v3: ## Run interface tests with InfluxDB version 3 (Core) test-export-timescaledb: ## Run interface tests with TimescaleDB /bin/bash ./tests/test_export_timescaledb.sh -test-exports: test-export-csv test-export-json test-export-influxdb-v1 test-export-influxdb-v3 test-export-timescaledb ## Tests all exports +test-export-nats: ## Run interface tests with NATS + /bin/bash ./tests/test_export_nats.sh + +test-exports: test-export-csv test-export-json test-export-influxdb-v1 test-export-influxdb-v3 test-export-timescaledb test-export-nats ## Tests all exports # =================================================================== # Linters, profilers and cyber security diff --git a/README.rst b/README.rst index 2adf580c..a7d780f0 100644 --- a/README.rst +++ b/README.rst @@ -242,7 +242,7 @@ Glances can export stats to: - files: ``CSV`` and ``JSON`` - databases: ``InfluxDB``, ``ElasticSearch``, ``PostgreSQL/TimeScale``, ``Cassandra``, ``CouchDB``, ``OpenTSDB``, ``Prometheus``, ``StatsD``, ``Riemann`` and ``Graphite`` -- brokers: ``RabbitMQ/ActiveMQ``, ``ZeroMQ`` and ``Kafka`` +- brokers: ``RabbitMQ/ActiveMQ``, ``NATS``, ``ZeroMQ`` and ``Kafka`` - others: ``RESTful`` endpoint Installation 🚀 @@ -574,6 +574,7 @@ Extra dependencies: - ``influxdb`` (for the InfluxDB version 1 export module) - ``influxdb-client`` (for the InfluxDB version 2 export module) - ``kafka-python`` (for the Kafka export module) +- ``nats-py`` (for the NATS export module) - ``netifaces2`` (for the IP plugin) - ``nvidia-ml-py`` (for the GPU plugin) - ``pycouchdb`` (for the CouchDB export module) diff --git a/conf/glances.conf b/conf/glances.conf index 6a0fb795..fba18a73 100644 --- a/conf/glances.conf +++ b/conf/glances.conf @@ -890,6 +890,14 @@ password=password # Most of the time, you should not overwrite this value #hostname=mycomputer +[nats] +# Configuration for the --export nats option +# https://nats.io/ +# Host is a separated list of NATS nodes +host=nats://localhost:4222 +# Prefix for the subjects (default is 'glances') +prefix=glances + ############################################################################## # AMPS # * enable: Enable (true) or disable (false) the AMP diff --git a/docs/gw/index.rst b/docs/gw/index.rst index db2dc976..c4c8947c 100644 --- a/docs/gw/index.rst +++ b/docs/gw/index.rst @@ -36,6 +36,7 @@ This section describes the available exporters and how to configure them: kafka mqtt mongodb + nats opentsdb prometheus rabbitmq diff --git a/docs/gw/nats.rst b/docs/gw/nats.rst new file mode 100644 index 00000000..9843c09b --- /dev/null +++ b/docs/gw/nats.rst @@ -0,0 +1,68 @@ +.. _nats: + +NATS +==== + +NATS is a message broker. + +You can export statistics to a ``NATS`` server. + +The connection should be defined in the Glances configuration file as +following: + +.. code-block:: ini + + [nats] + host=nats://localhost:4222 + prefix=glances + +and run Glances with: + +.. code-block:: console + + $ glances --export nats + +Data model +----------- + +Glances stats are published as JSON messagesto the following subjects: + + . + +Example: + + CPU stats are published to glances.cpu + +So a simple Python client will subscribe to this subject with: + + + import asyncio + + import nats + + + async def main(): + nc = nats.NATS() + + await nc.connect(servers=["nats://localhost:4222"]) + + future = asyncio.Future() + + async def cb(msg): + nonlocal future + future.set_result(msg) + + await nc.subscribe("glances.cpu", cb=cb) + + # Wait for message to come in + print("Waiting (max 30 seconds) for a message on 'glances' subject...") + msg = await asyncio.wait_for(future, 30) + print(msg.subject, msg.data) + + if __name__ == '__main__': + asyncio.run(main()) + +To subscribe to all Glannces stats use wildcard: + + await nc.subscribe("glances.*", cb=cb) + diff --git a/glances/exports/export.py b/glances/exports/export.py index bbd37306..c0cf7422 100644 --- a/glances/exports/export.py +++ b/glances/exports/export.py @@ -1,7 +1,7 @@ # # This file is part of Glances. # -# SPDX-FileCopyrightText: 2022 Nicolas Hennion +# SPDX-FileCopyrightText: 2026 Nicolas Hennion # # SPDX-License-Identifier: LGPL-3.0-only # diff --git a/glances/exports/export_asyncio.py b/glances/exports/export_asyncio.py new file mode 100644 index 00000000..71ec6a98 --- /dev/null +++ b/glances/exports/export_asyncio.py @@ -0,0 +1,168 @@ +# +# This file is part of Glances. +# +# SPDX-FileCopyrightText: 2026 Nicolas Hennion +# +# SPDX-License-Identifier: LGPL-3.0-only +# + +""" +I am your son... +...abstract class for AsyncIO-based Glances exports. +""" + +import asyncio +import threading +import time +from abc import abstractmethod + +from glances.exports.export import GlancesExport +from glances.logger import logger + + +class GlancesExportAsyncio(GlancesExport): + """Abstract class for AsyncIO-based export modules. + + This class manages a persistent event loop in a background thread, + allowing child classes to use AsyncIO operations for exporting data. + + Child classes must implement: + - async _async_init(): AsyncIO initialization (e.g., connection setup) + - async _async_exit(): AsyncIO cleanup (e.g., disconnection) + - async _async_export(name, columns, points): AsyncIO export operation + """ + + def __init__(self, config=None, args=None): + """Init the AsyncIO export interface.""" + super().__init__(config=config, args=args) + + # AsyncIO event loop management + self.loop = None + self._loop_ready = threading.Event() + self._loop_exception = None + self._shutdown = False + + # Start the background event loop thread + self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) + self._loop_thread.start() + + # Wait for the loop to be ready + if not self._loop_ready.wait(timeout=10): + raise RuntimeError("AsyncIO event loop failed to start within timeout") + + if self._loop_exception: + raise RuntimeError(f"AsyncIO event loop creation failed: {self._loop_exception}") + + if self.loop is None: + raise RuntimeError("AsyncIO event loop is None after initialization") + + # Call child class AsyncIO initialization + future = asyncio.run_coroutine_threadsafe(self._async_init(), self.loop) + try: + future.result(timeout=10) + logger.debug(f"{self.export_name} AsyncIO export initialized successfully") + except Exception as e: + logger.warning(f"{self.export_name} AsyncIO initialization failed: {e}. Will retry in background.") + + def _run_event_loop(self): + """Run event loop in background thread.""" + try: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self._loop_ready.set() + self.loop.run_forever() + except Exception as e: + self._loop_exception = e + self._loop_ready.set() + logger.error(f"{self.export_name} AsyncIO event loop thread error: {e}") + finally: + # Clean up pending tasks + pending = asyncio.all_tasks(self.loop) + for task in pending: + task.cancel() + if pending: + self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + self.loop.close() + + @abstractmethod + async def _async_init(self): + """AsyncIO initialization method. + + Child classes should implement this method to perform AsyncIO-based + initialization such as connecting to servers, setting up clients, etc. + + This method is called once during __init__ after the event loop is ready. + """ + pass + + @abstractmethod + async def _async_exit(self): + """AsyncIO cleanup method. + + Child classes should implement this method to perform AsyncIO-based + cleanup such as disconnecting from servers, closing clients, etc. + + This method is called during exit() before stopping the event loop. + """ + pass + + @abstractmethod + async def _async_export(self, name, columns, points): + """AsyncIO export method. + + Child classes must implement this method to perform the actual + export operation using AsyncIO. + + :param name: plugin name + :param columns: list of column names + :param points: list of values corresponding to columns + """ + pass + + def exit(self): + """Close the AsyncIO export module.""" + super().exit() + self._shutdown = True + logger.info(f"{self.export_name} AsyncIO export shutting down") + + # Call child class cleanup + if self.loop: + future = asyncio.run_coroutine_threadsafe(self._async_exit(), self.loop) + try: + future.result(timeout=5) + except Exception as e: + logger.error(f"{self.export_name} Error in AsyncIO cleanup: {e}") + + # Stop the event loop + if self.loop: + self.loop.call_soon_threadsafe(self.loop.stop) + time.sleep(0.5) + + logger.debug(f"{self.export_name} AsyncIO export shutdown complete") + + def export(self, name, columns, points): + """Export data using AsyncIO. + + This method bridges the synchronous export() interface with + the AsyncIO _async_export() implementation. + """ + if self._shutdown: + logger.debug(f"{self.export_name} Export called during shutdown, skipping") + return + + if not self.loop or not self.loop.is_running(): + logger.error(f"{self.export_name} AsyncIO event loop is not running") + return + + # Submit the export operation to the background event loop + try: + future = asyncio.run_coroutine_threadsafe( + self._async_export(name, columns, points), + self.loop + ) + # Don't block forever - use a short timeout + future.result(timeout=1) + except asyncio.TimeoutError: + logger.warning(f"{self.export_name} AsyncIO export timeout for {name}") + except Exception as e: + logger.error(f"{self.export_name} AsyncIO export error for {name}: {e}", exc_info=True) diff --git a/glances/exports/glances_nats/__init__.py b/glances/exports/glances_nats/__init__.py new file mode 100644 index 00000000..97941cfd --- /dev/null +++ b/glances/exports/glances_nats/__init__.py @@ -0,0 +1,133 @@ +# +# This file is part of Glances. +# +# SPDX-FileCopyrightText: 2026 Nicolas Hennion +# +# SPDX-License-Identifier: LGPL-3.0-only +# + +"""NATS interface class.""" + +from nats.aio.client import Client as NATS +from nats.errors import ConnectionClosedError +from nats.errors import TimeoutError as NatsTimeoutError + +from glances.exports.export_asyncio import GlancesExportAsyncio +from glances.globals import json_dumps +from glances.logger import logger + + +class Export(GlancesExportAsyncio): + """This class manages the NATS export module.""" + + def __init__(self, config=None, args=None): + """Init the NATS export IF.""" + # Load the NATS configuration file before calling super().__init__ + # because super().__init__ will call _async_init() which needs config + self.config = config + self.args = args + self.export_name = self.__class__.__module__ + + export_enable = self.load_conf( + 'nats', + mandatories=['host'], + options=['prefix'], + ) + if not export_enable: + exit('Missing NATS config') + + self.prefix = self.prefix or 'glances' + # Host is a comma-separated list of NATS servers + self.hosts = self.host + + # NATS-specific attributes + self.client = None + self._connected = False + self._publish_count = 0 + + # Call parent __init__ which will start event loop and call _async_init() + super().__init__(config=config, args=args) + + # Restore export_enable after super().__init__() resets it to False + self.export_enable = export_enable + + async def _async_init(self): + """Connect to NATS with error handling.""" + try: + if self.client: + try: + await self.client.close() + except Exception as e: + logger.debug(f"NATS Error closing existing client: {e}") + + self.client = NATS() + + logger.debug(f"NATS Connecting to servers: {self.hosts}") + + # Configure with reconnection callbacks + await self.client.connect( + servers=[s.strip() for s in self.hosts.split(',')], + reconnect_time_wait=2, + max_reconnect_attempts=60, + error_cb=self._error_callback, + disconnected_cb=self._disconnected_callback, + reconnected_cb=self._reconnected_callback, + ) + + self._connected = True + logger.info(f"NATS Successfully connected to servers: {self.hosts}") + except Exception as e: + self._connected = False + logger.error(f"NATS connection error: {e}") + raise + + async def _error_callback(self, e): + """Called when NATS client encounters an error.""" + logger.error(f"NATS error callback: {e}") + + async def _disconnected_callback(self): + """Called when disconnected from NATS.""" + self._connected = False + logger.debug("NATS disconnected callback") + + async def _reconnected_callback(self): + """Called when reconnected to NATS.""" + self._connected = True + logger.debug("NATS reconnected callback") + + async def _async_exit(self): + """Disconnect from NATS.""" + try: + if self.client and self._connected: + await self.client.drain() + await self.client.close() + self._connected = False + logger.debug(f"NATS disconnected cleanly. Total messages published: {self._publish_count}") + except Exception as e: + logger.error(f"NATS Error in disconnect: {e}") + + async def _async_export(self, name, columns, points): + """Write the points to NATS using AsyncIO.""" + if not self._connected: + logger.warning("NATS not connected, skipping export") + return + + subject_name = f"{self.prefix}.{name}" + subject_data = dict(zip(columns, points)) + + # Publish data to NATS + try: + if not self._connected: + raise ConnectionClosedError("NATS Not connected to server") + await self.client.publish(subject_name, json_dumps(subject_data)) + await self.client.flush(timeout=2.0) + self._publish_count += 1 + except (ConnectionClosedError, NatsTimeoutError) as e: + self._connected = False + logger.error(f"NATS publish failed for {subject_name}: {e}") + raise + except Exception as e: + logger.error(f"NATS Unexpected error publishing {subject_name}: {e}", exc_info=True) + raise + +# End of glances/exports/glances_nats/__init__.py diff --git a/pyproject.toml b/pyproject.toml index b0bcdbb3..5e702e7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ export = [ "influxdb>=1.0.0", "influxdb3-python", "kafka-python", + "nats-py", "paho-mqtt", "pika", "potsdb", diff --git a/tests-data/issues/issue3341-NATS/pub.py b/tests-data/issues/issue3341-NATS/pub.py new file mode 100644 index 00000000..0123b08c --- /dev/null +++ b/tests-data/issues/issue3341-NATS/pub.py @@ -0,0 +1,17 @@ +import asyncio + +import nats + + +async def main(): + nc = nats.NATS() + + await nc.connect(servers=["nats://localhost:4222"]) + + await nc.publish("glances.test", b'A test') + await nc.flush() + +if __name__ == '__main__': + asyncio.run(main()) + +# To run this test script, make sure you have a NATS server running locally. diff --git a/tests-data/issues/issue3341-NATS/sub.py b/tests-data/issues/issue3341-NATS/sub.py new file mode 100644 index 00000000..07cf5b62 --- /dev/null +++ b/tests-data/issues/issue3341-NATS/sub.py @@ -0,0 +1,31 @@ +import asyncio + +import nats + + +async def main(): + duration = 30 + subject = "glances.*" + + nc = nats.NATS() + + await nc.connect(servers=["nats://localhost:4222"]) + + future = asyncio.Future() + + async def cb(msg): + subject = msg.subject + reply = msg.reply + data = msg.data.decode() + print(f"Received a message on '{subject} {reply}': {data}") + + print(f"Receiving message from {subject} during {duration} seconds...") + await nc.subscribe(subject, cb=cb) + await asyncio.wait_for(future, duration) + + await nc.close() + +if __name__ == '__main__': + asyncio.run(main()) + +# To run this test script, make sure you have a NATS server running locally. diff --git a/tests/test_export_nats.sh b/tests/test_export_nats.sh new file mode 100755 index 00000000..32497682 --- /dev/null +++ b/tests/test_export_nats.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# Pre-requisites: +# - docker +# - jq + +# Exit on error +set -e + +echo "Stop previous nats container..." +docker stop nats-for-glances || true +docker rm nats-for-glances || true + +echo "Starting nats container..." +docker run -d \ + --name nats-for-glances \ + -p 4222:4222 \ + -p 8222:8222 \ + -p 6222:6222 \ + nats:latest + +# Wait for InfluxDB to be ready (5 seconds) +echo "Waiting for nats to start (~ 5 seconds)..." +sleep 5 + +# Run glances with export to nats, stopping after 10 writes +# This will run synchronously now since we're using --stop-after +echo "Glances to export system stats to nats (duration: ~ 20 seconds)" +.venv/bin/python -m glances --config ./conf/glances.conf --export nats --stop-after 10 --quiet + +# Stop and remove the nats container +echo "Stopping and removing nats container..." +docker stop nats-for-glances && docker rm nats-for-glances + +echo "Script completed successfully!" diff --git a/tests/test_export_timescaledb.sh b/tests/test_export_timescaledb.sh index 4865cf60..6f080070 100755 --- a/tests/test_export_timescaledb.sh +++ b/tests/test_export_timescaledb.sh @@ -38,6 +38,6 @@ docker exec timescaledb-for-glances psql -d "postgres://postgres:password@localh # Stop and remove the TimescaleDB container echo "Stopping and removing TimescaleDB container..." -# docker stop timescaledb-for-glances && docker rm timescaledb-for-glances +docker stop timescaledb-for-glances && docker rm timescaledb-for-glances echo "Script completed successfully!"