diff --git a/README.rst b/README.rst index 2adf580c..a7d780f0 100644 --- a/README.rst +++ b/README.rst @@ -242,7 +242,7 @@ Glances can export stats to: - files: ``CSV`` and ``JSON`` - databases: ``InfluxDB``, ``ElasticSearch``, ``PostgreSQL/TimeScale``, ``Cassandra``, ``CouchDB``, ``OpenTSDB``, ``Prometheus``, ``StatsD``, ``Riemann`` and ``Graphite`` -- brokers: ``RabbitMQ/ActiveMQ``, ``ZeroMQ`` and ``Kafka`` +- brokers: ``RabbitMQ/ActiveMQ``, ``NATS``, ``ZeroMQ`` and ``Kafka`` - others: ``RESTful`` endpoint Installation 🚀 @@ -574,6 +574,7 @@ Extra dependencies: - ``influxdb`` (for the InfluxDB version 1 export module) - ``influxdb-client`` (for the InfluxDB version 2 export module) - ``kafka-python`` (for the Kafka export module) +- ``nats-py`` (for the NATS export module) - ``netifaces2`` (for the IP plugin) - ``nvidia-ml-py`` (for the GPU plugin) - ``pycouchdb`` (for the CouchDB export module) diff --git a/conf/glances.conf b/conf/glances.conf index 6a0fb795..fba18a73 100644 --- a/conf/glances.conf +++ b/conf/glances.conf @@ -890,6 +890,14 @@ password=password # Most of the time, you should not overwrite this value #hostname=mycomputer +[nats] +# Configuration for the --export nats option +# https://nats.io/ +# Host is a separated list of NATS nodes +host=nats://localhost:4222 +# Prefix for the subjects (default is 'glances') +prefix=glances + ############################################################################## # AMPS # * enable: Enable (true) or disable (false) the AMP diff --git a/docs/gw/index.rst b/docs/gw/index.rst index db2dc976..c4c8947c 100644 --- a/docs/gw/index.rst +++ b/docs/gw/index.rst @@ -36,6 +36,7 @@ This section describes the available exporters and how to configure them: kafka mqtt mongodb + nats opentsdb prometheus rabbitmq diff --git a/docs/gw/nats.rst b/docs/gw/nats.rst new file mode 100644 index 00000000..9843c09b --- /dev/null +++ b/docs/gw/nats.rst @@ -0,0 +1,68 @@ +.. _nats: + +NATS +==== + +NATS is a message broker. + +You can export statistics to a ``NATS`` server. + +The connection should be defined in the Glances configuration file as +following: + +.. code-block:: ini + + [nats] + host=nats://localhost:4222 + prefix=glances + +and run Glances with: + +.. code-block:: console + + $ glances --export nats + +Data model +----------- + +Glances stats are published as JSON messagesto the following subjects: + + . + +Example: + + CPU stats are published to glances.cpu + +So a simple Python client will subscribe to this subject with: + + + import asyncio + + import nats + + + async def main(): + nc = nats.NATS() + + await nc.connect(servers=["nats://localhost:4222"]) + + future = asyncio.Future() + + async def cb(msg): + nonlocal future + future.set_result(msg) + + await nc.subscribe("glances.cpu", cb=cb) + + # Wait for message to come in + print("Waiting (max 30 seconds) for a message on 'glances' subject...") + msg = await asyncio.wait_for(future, 30) + print(msg.subject, msg.data) + + if __name__ == '__main__': + asyncio.run(main()) + +To subscribe to all Glannces stats use wildcard: + + await nc.subscribe("glances.*", cb=cb) + diff --git a/glances/exports/glances_nats/__init__.py b/glances/exports/glances_nats/__init__.py new file mode 100644 index 00000000..3cdec291 --- /dev/null +++ b/glances/exports/glances_nats/__init__.py @@ -0,0 +1,221 @@ +"""NATS interface class.""" +import asyncio +import threading +import time + +from nats.aio.client import Client as NATS +from nats.errors import ConnectionClosedError +from nats.errors import TimeoutError as NatsTimeoutError + +from glances.exports.export import GlancesExport +from glances.globals import json_dumps +from glances.logger import logger + + +class Export(GlancesExport): + """This class manages the NATS export module.""" + + def __init__(self, config=None, args=None): + """Init the NATS export IF.""" + super().__init__(config=config, args=args) + + # Load the NATS configuration file + self.export_enable = self.load_conf( + 'nats', + mandatories=['host'], + options=['prefix'], + ) + if not self.export_enable: + exit('Missing NATS config') + + self.prefix = self.prefix or 'glances' + self.hosts = self.host + + logger.info(f"Initializing NATS export with prefix '{self.prefix}' to {self.hosts}") + + # Create a persistent event loop in a background thread + self.loop = None + self.client = None + self._connected = False + self._shutdown = False + self._loop_ready = threading.Event() + self._loop_exception = None + self._publish_count = 0 + + self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) + self._loop_thread.start() + + # Wait for the loop to be ready + if not self._loop_ready.wait(timeout=10): + exit("NATS event loop failed to start within timeout") + + if self._loop_exception: + exit(f"NATS event loop creation failed: {self._loop_exception}") + + if self.loop is None: + exit("NATS event loop is None after initialization") + + # Initial connection attempt + future = asyncio.run_coroutine_threadsafe(self._connect(), self.loop) + try: + future.result(timeout=10) + logger.info("NATS export initialized successfully") + except Exception as e: + logger.warning(f"Initial NATS connection failed: {e}. Will retry in background.") + + def _run_event_loop(self): + """Run event loop in background thread.""" + try: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self._loop_ready.set() + logger.info("NATS event loop started") + self.loop.run_forever() + except Exception as e: + self._loop_exception = e + self._loop_ready.set() + logger.error(f"Event loop thread error: {e}") + finally: + # Clean up any pending tasks + pending = asyncio.all_tasks(self.loop) + for task in pending: + task.cancel() + if pending: + self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + self.loop.close() + logger.info("NATS event loop stopped") + + async def _connect(self): + """Connect to NATS with error handling.""" + try: + if self.client: + try: + await self.client.close() + except Exception as e: + logger.debug(f"Error closing existing NATS client: {e}") + + self.client = NATS() + + logger.info(f"Connecting to NATS servers: {self.hosts}") + + # Configure with reconnection callbacks + await self.client.connect( + servers=[s.strip() for s in self.hosts.split(',')], + reconnect_time_wait=2, + max_reconnect_attempts=60, + error_cb=self._error_callback, + disconnected_cb=self._disconnected_callback, + reconnected_cb=self._reconnected_callback, + ) + + self._connected = True + logger.info(f"Successfully connected to NATS: {self.hosts}") + except Exception as e: + self._connected = False + logger.error(f"NATS connection error: {e}") + raise + + async def _error_callback(self, e): + """Called when NATS client encounters an error.""" + logger.error(f"NATS error callback: {e}") + + async def _disconnected_callback(self): + """Called when disconnected from NATS.""" + self._connected = False + logger.warning("NATS disconnected callback") + + async def _reconnected_callback(self): + """Called when reconnected to NATS.""" + self._connected = True + logger.info("NATS reconnected callback") + + def exit(self): + """Close the NATS connection.""" + super().exit() + self._shutdown = True + logger.info("NATS export shutting down") + + if self.loop and self.client: + future = asyncio.run_coroutine_threadsafe(self._disconnect(), self.loop) + try: + future.result(timeout=5) + except Exception as e: + logger.error(f"Error disconnecting from NATS: {e}") + + if self.loop: + self.loop.call_soon_threadsafe(self.loop.stop) + time.sleep(0.5) + + logger.info(f"NATS export shutdown complete. Total messages published: {self._publish_count}") + + async def _disconnect(self): + """Disconnect from NATS.""" + try: + if self.client and self._connected: + await self.client.drain() + await self.client.close() + self._connected = False + logger.info("NATS disconnected cleanly") + except Exception as e: + logger.error(f"Error in disconnect: {e}") + + def export(self, name, columns, points): + """Write the points in NATS.""" + logger.info(f"Export called: name={name}, columns={columns}, connected={self._connected}") + + if self._shutdown: + logger.warning("Export called during shutdown, skipping") + return + + if not self.loop or not self.loop.is_running(): + logger.error("NATS event loop is not running") + return + + if not self._connected: + logger.warning("NATS not connected, skipping export") + return + + subject_name = f"{self.prefix}.{name}" + subject_data = dict(zip(columns, points)) + + logger.info(f"Publishing to subject: {subject_name}") + + # Submit the publish operation to the background event loop + try: + future = asyncio.run_coroutine_threadsafe( + self._publish(subject_name, json_dumps(subject_data)), + self.loop + ) + # Don't block forever - use a short timeout + future.result(timeout=1) + self._publish_count += 1 + logger.info(f"Successfully published message #{self._publish_count} to {subject_name}") + except asyncio.TimeoutError: + logger.warning(f"NATS publish timeout for {subject_name}") + except Exception as e: + logger.error(f"NATS publish error for {subject_name}: {e}", exc_info=True) + + async def _publish(self, subject, data): + """Publish data to NATS.""" + try: + logger.info(f"_publish called: subject={subject}, data_length={len(data)}") + + if not self._connected: + raise ConnectionClosedError("Not connected to NATS") + + logger.info(f"Calling client.publish for {subject}") + await self.client.publish(subject, data) + + logger.info(f"Calling client.flush for {subject}") + await asyncio.wait_for(self.client.flush(), timeout=2.0) + + logger.info(f"Successfully published and flushed to '{subject}'") + except (ConnectionClosedError, NatsTimeoutError) as e: + self._connected = False + logger.error(f"NATS publish failed: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error in _publish: {e}", exc_info=True) + raise + +# End of glances/exports/glances_nats/__init__.py diff --git a/pyproject.toml b/pyproject.toml index b0bcdbb3..5e702e7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ export = [ "influxdb>=1.0.0", "influxdb3-python", "kafka-python", + "nats-py", "paho-mqtt", "pika", "potsdb", diff --git a/tests-data/issues/issue3341-NATS/pub.py b/tests-data/issues/issue3341-NATS/pub.py new file mode 100644 index 00000000..0123b08c --- /dev/null +++ b/tests-data/issues/issue3341-NATS/pub.py @@ -0,0 +1,17 @@ +import asyncio + +import nats + + +async def main(): + nc = nats.NATS() + + await nc.connect(servers=["nats://localhost:4222"]) + + await nc.publish("glances.test", b'A test') + await nc.flush() + +if __name__ == '__main__': + asyncio.run(main()) + +# To run this test script, make sure you have a NATS server running locally. diff --git a/tests-data/issues/issue3341-NATS/sub.py b/tests-data/issues/issue3341-NATS/sub.py new file mode 100644 index 00000000..39d84101 --- /dev/null +++ b/tests-data/issues/issue3341-NATS/sub.py @@ -0,0 +1,27 @@ +import asyncio + +import nats + + +async def main(): + nc = nats.NATS() + + await nc.connect(servers=["nats://localhost:4222"]) + + future = asyncio.Future() + + async def cb(msg): + nonlocal future + future.set_result(msg) + + await nc.subscribe("glances.*", cb=cb) + + # Wait for message to come in + print("Waiting (max 30s) for a message on 'glances' subject...") + msg = await asyncio.wait_for(future, 30) + print(msg.subject, msg.data) + +if __name__ == '__main__': + asyncio.run(main()) + +# To run this test script, make sure you have a NATS server running locally. diff --git a/tests/test_export_nats.sh b/tests/test_export_nats.sh new file mode 100755 index 00000000..32497682 --- /dev/null +++ b/tests/test_export_nats.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# Pre-requisites: +# - docker +# - jq + +# Exit on error +set -e + +echo "Stop previous nats container..." +docker stop nats-for-glances || true +docker rm nats-for-glances || true + +echo "Starting nats container..." +docker run -d \ + --name nats-for-glances \ + -p 4222:4222 \ + -p 8222:8222 \ + -p 6222:6222 \ + nats:latest + +# Wait for InfluxDB to be ready (5 seconds) +echo "Waiting for nats to start (~ 5 seconds)..." +sleep 5 + +# Run glances with export to nats, stopping after 10 writes +# This will run synchronously now since we're using --stop-after +echo "Glances to export system stats to nats (duration: ~ 20 seconds)" +.venv/bin/python -m glances --config ./conf/glances.conf --export nats --stop-after 10 --quiet + +# Stop and remove the nats container +echo "Stopping and removing nats container..." +docker stop nats-for-glances && docker rm nats-for-glances + +echo "Script completed successfully!" diff --git a/tests/test_export_timescaledb.sh b/tests/test_export_timescaledb.sh index 4865cf60..6f080070 100755 --- a/tests/test_export_timescaledb.sh +++ b/tests/test_export_timescaledb.sh @@ -38,6 +38,6 @@ docker exec timescaledb-for-glances psql -d "postgres://postgres:password@localh # Stop and remove the TimescaleDB container echo "Stopping and removing TimescaleDB container..." -# docker stop timescaledb-for-glances && docker rm timescaledb-for-glances +docker stop timescaledb-for-glances && docker rm timescaledb-for-glances echo "Script completed successfully!"