Merge branch 'issue3341-NATS' into develop

This commit is contained in:
nicolargo 2026-01-02 18:00:30 +01:00
commit 7160cd8ae4
13 changed files with 469 additions and 4 deletions

View File

@ -134,7 +134,10 @@ test-export-influxdb-v3: ## Run interface tests with InfluxDB version 3 (Core)
test-export-timescaledb: ## Run interface tests with TimescaleDB test-export-timescaledb: ## Run interface tests with TimescaleDB
/bin/bash ./tests/test_export_timescaledb.sh /bin/bash ./tests/test_export_timescaledb.sh
test-exports: test-export-csv test-export-json test-export-influxdb-v1 test-export-influxdb-v3 test-export-timescaledb ## Tests all exports test-export-nats: ## Run interface tests with NATS
/bin/bash ./tests/test_export_nats.sh
test-exports: test-export-csv test-export-json test-export-influxdb-v1 test-export-influxdb-v3 test-export-timescaledb test-export-nats ## Tests all exports
# =================================================================== # ===================================================================
# Linters, profilers and cyber security # Linters, profilers and cyber security

View File

@ -242,7 +242,7 @@ Glances can export stats to:
- files: ``CSV`` and ``JSON`` - files: ``CSV`` and ``JSON``
- databases: ``InfluxDB``, ``ElasticSearch``, ``PostgreSQL/TimeScale``, ``Cassandra``, ``CouchDB``, ``OpenTSDB``, ``Prometheus``, ``StatsD``, ``Riemann`` and ``Graphite`` - databases: ``InfluxDB``, ``ElasticSearch``, ``PostgreSQL/TimeScale``, ``Cassandra``, ``CouchDB``, ``OpenTSDB``, ``Prometheus``, ``StatsD``, ``Riemann`` and ``Graphite``
- brokers: ``RabbitMQ/ActiveMQ``, ``ZeroMQ`` and ``Kafka`` - brokers: ``RabbitMQ/ActiveMQ``, ``NATS``, ``ZeroMQ`` and ``Kafka``
- others: ``RESTful`` endpoint - others: ``RESTful`` endpoint
Installation 🚀 Installation 🚀
@ -574,6 +574,7 @@ Extra dependencies:
- ``influxdb`` (for the InfluxDB version 1 export module) - ``influxdb`` (for the InfluxDB version 1 export module)
- ``influxdb-client`` (for the InfluxDB version 2 export module) - ``influxdb-client`` (for the InfluxDB version 2 export module)
- ``kafka-python`` (for the Kafka export module) - ``kafka-python`` (for the Kafka export module)
- ``nats-py`` (for the NATS export module)
- ``netifaces2`` (for the IP plugin) - ``netifaces2`` (for the IP plugin)
- ``nvidia-ml-py`` (for the GPU plugin) - ``nvidia-ml-py`` (for the GPU plugin)
- ``pycouchdb`` (for the CouchDB export module) - ``pycouchdb`` (for the CouchDB export module)

View File

@ -890,6 +890,14 @@ password=password
# Most of the time, you should not overwrite this value # Most of the time, you should not overwrite this value
#hostname=mycomputer #hostname=mycomputer
[nats]
# Configuration for the --export nats option
# https://nats.io/
# Host is a separated list of NATS nodes
host=nats://localhost:4222
# Prefix for the subjects (default is 'glances')
prefix=glances
############################################################################## ##############################################################################
# AMPS # AMPS
# * enable: Enable (true) or disable (false) the AMP # * enable: Enable (true) or disable (false) the AMP

View File

@ -36,6 +36,7 @@ This section describes the available exporters and how to configure them:
kafka kafka
mqtt mqtt
mongodb mongodb
nats
opentsdb opentsdb
prometheus prometheus
rabbitmq rabbitmq

68
docs/gw/nats.rst Normal file
View File

@ -0,0 +1,68 @@
.. _nats:
NATS
====
NATS is a message broker.
You can export statistics to a ``NATS`` server.
The connection should be defined in the Glances configuration file as
following:
.. code-block:: ini
[nats]
host=nats://localhost:4222
prefix=glances
and run Glances with:
.. code-block:: console
$ glances --export nats
Data model
-----------
Glances stats are published as JSON messagesto the following subjects:
<prefix>.<plugin>
Example:
CPU stats are published to glances.cpu
So a simple Python client will subscribe to this subject with:
import asyncio
import nats
async def main():
nc = nats.NATS()
await nc.connect(servers=["nats://localhost:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("glances.cpu", cb=cb)
# Wait for message to come in
print("Waiting (max 30 seconds) for a message on 'glances' subject...")
msg = await asyncio.wait_for(future, 30)
print(msg.subject, msg.data)
if __name__ == '__main__':
asyncio.run(main())
To subscribe to all Glannces stats use wildcard:
await nc.subscribe("glances.*", cb=cb)

View File

@ -1,7 +1,7 @@
# #
# This file is part of Glances. # This file is part of Glances.
# #
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com> # SPDX-FileCopyrightText: 2026 Nicolas Hennion <nicolas@nicolargo.com>
# #
# SPDX-License-Identifier: LGPL-3.0-only # SPDX-License-Identifier: LGPL-3.0-only
# #

View File

@ -0,0 +1,168 @@
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2026 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#
"""
I am your son...
...abstract class for AsyncIO-based Glances exports.
"""
import asyncio
import threading
import time
from abc import abstractmethod
from glances.exports.export import GlancesExport
from glances.logger import logger
class GlancesExportAsyncio(GlancesExport):
"""Abstract class for AsyncIO-based export modules.
This class manages a persistent event loop in a background thread,
allowing child classes to use AsyncIO operations for exporting data.
Child classes must implement:
- async _async_init(): AsyncIO initialization (e.g., connection setup)
- async _async_exit(): AsyncIO cleanup (e.g., disconnection)
- async _async_export(name, columns, points): AsyncIO export operation
"""
def __init__(self, config=None, args=None):
"""Init the AsyncIO export interface."""
super().__init__(config=config, args=args)
# AsyncIO event loop management
self.loop = None
self._loop_ready = threading.Event()
self._loop_exception = None
self._shutdown = False
# Start the background event loop thread
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
# Wait for the loop to be ready
if not self._loop_ready.wait(timeout=10):
raise RuntimeError("AsyncIO event loop failed to start within timeout")
if self._loop_exception:
raise RuntimeError(f"AsyncIO event loop creation failed: {self._loop_exception}")
if self.loop is None:
raise RuntimeError("AsyncIO event loop is None after initialization")
# Call child class AsyncIO initialization
future = asyncio.run_coroutine_threadsafe(self._async_init(), self.loop)
try:
future.result(timeout=10)
logger.debug(f"{self.export_name} AsyncIO export initialized successfully")
except Exception as e:
logger.warning(f"{self.export_name} AsyncIO initialization failed: {e}. Will retry in background.")
def _run_event_loop(self):
"""Run event loop in background thread."""
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._loop_ready.set()
self.loop.run_forever()
except Exception as e:
self._loop_exception = e
self._loop_ready.set()
logger.error(f"{self.export_name} AsyncIO event loop thread error: {e}")
finally:
# Clean up pending tasks
pending = asyncio.all_tasks(self.loop)
for task in pending:
task.cancel()
if pending:
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
self.loop.close()
@abstractmethod
async def _async_init(self):
"""AsyncIO initialization method.
Child classes should implement this method to perform AsyncIO-based
initialization such as connecting to servers, setting up clients, etc.
This method is called once during __init__ after the event loop is ready.
"""
pass
@abstractmethod
async def _async_exit(self):
"""AsyncIO cleanup method.
Child classes should implement this method to perform AsyncIO-based
cleanup such as disconnecting from servers, closing clients, etc.
This method is called during exit() before stopping the event loop.
"""
pass
@abstractmethod
async def _async_export(self, name, columns, points):
"""AsyncIO export method.
Child classes must implement this method to perform the actual
export operation using AsyncIO.
:param name: plugin name
:param columns: list of column names
:param points: list of values corresponding to columns
"""
pass
def exit(self):
"""Close the AsyncIO export module."""
super().exit()
self._shutdown = True
logger.info(f"{self.export_name} AsyncIO export shutting down")
# Call child class cleanup
if self.loop:
future = asyncio.run_coroutine_threadsafe(self._async_exit(), self.loop)
try:
future.result(timeout=5)
except Exception as e:
logger.error(f"{self.export_name} Error in AsyncIO cleanup: {e}")
# Stop the event loop
if self.loop:
self.loop.call_soon_threadsafe(self.loop.stop)
time.sleep(0.5)
logger.debug(f"{self.export_name} AsyncIO export shutdown complete")
def export(self, name, columns, points):
"""Export data using AsyncIO.
This method bridges the synchronous export() interface with
the AsyncIO _async_export() implementation.
"""
if self._shutdown:
logger.debug(f"{self.export_name} Export called during shutdown, skipping")
return
if not self.loop or not self.loop.is_running():
logger.error(f"{self.export_name} AsyncIO event loop is not running")
return
# Submit the export operation to the background event loop
try:
future = asyncio.run_coroutine_threadsafe(
self._async_export(name, columns, points),
self.loop
)
# Don't block forever - use a short timeout
future.result(timeout=1)
except asyncio.TimeoutError:
logger.warning(f"{self.export_name} AsyncIO export timeout for {name}")
except Exception as e:
logger.error(f"{self.export_name} AsyncIO export error for {name}: {e}", exc_info=True)

View File

@ -0,0 +1,133 @@
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2026 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#
"""NATS interface class."""
from nats.aio.client import Client as NATS
from nats.errors import ConnectionClosedError
from nats.errors import TimeoutError as NatsTimeoutError
from glances.exports.export_asyncio import GlancesExportAsyncio
from glances.globals import json_dumps
from glances.logger import logger
class Export(GlancesExportAsyncio):
"""This class manages the NATS export module."""
def __init__(self, config=None, args=None):
"""Init the NATS export IF."""
# Load the NATS configuration file before calling super().__init__
# because super().__init__ will call _async_init() which needs config
self.config = config
self.args = args
self.export_name = self.__class__.__module__
export_enable = self.load_conf(
'nats',
mandatories=['host'],
options=['prefix'],
)
if not export_enable:
exit('Missing NATS config')
self.prefix = self.prefix or 'glances'
# Host is a comma-separated list of NATS servers
self.hosts = self.host
# NATS-specific attributes
self.client = None
self._connected = False
self._publish_count = 0
# Call parent __init__ which will start event loop and call _async_init()
super().__init__(config=config, args=args)
# Restore export_enable after super().__init__() resets it to False
self.export_enable = export_enable
async def _async_init(self):
"""Connect to NATS with error handling."""
try:
if self.client:
try:
await self.client.close()
except Exception as e:
logger.debug(f"NATS Error closing existing client: {e}")
self.client = NATS()
logger.debug(f"NATS Connecting to servers: {self.hosts}")
# Configure with reconnection callbacks
await self.client.connect(
servers=[s.strip() for s in self.hosts.split(',')],
reconnect_time_wait=2,
max_reconnect_attempts=60,
error_cb=self._error_callback,
disconnected_cb=self._disconnected_callback,
reconnected_cb=self._reconnected_callback,
)
self._connected = True
logger.info(f"NATS Successfully connected to servers: {self.hosts}")
except Exception as e:
self._connected = False
logger.error(f"NATS connection error: {e}")
raise
async def _error_callback(self, e):
"""Called when NATS client encounters an error."""
logger.error(f"NATS error callback: {e}")
async def _disconnected_callback(self):
"""Called when disconnected from NATS."""
self._connected = False
logger.debug("NATS disconnected callback")
async def _reconnected_callback(self):
"""Called when reconnected to NATS."""
self._connected = True
logger.debug("NATS reconnected callback")
async def _async_exit(self):
"""Disconnect from NATS."""
try:
if self.client and self._connected:
await self.client.drain()
await self.client.close()
self._connected = False
logger.debug(f"NATS disconnected cleanly. Total messages published: {self._publish_count}")
except Exception as e:
logger.error(f"NATS Error in disconnect: {e}")
async def _async_export(self, name, columns, points):
"""Write the points to NATS using AsyncIO."""
if not self._connected:
logger.warning("NATS not connected, skipping export")
return
subject_name = f"{self.prefix}.{name}"
subject_data = dict(zip(columns, points))
# Publish data to NATS
try:
if not self._connected:
raise ConnectionClosedError("NATS Not connected to server")
await self.client.publish(subject_name, json_dumps(subject_data))
await self.client.flush(timeout=2.0)
self._publish_count += 1
except (ConnectionClosedError, NatsTimeoutError) as e:
self._connected = False
logger.error(f"NATS publish failed for {subject_name}: {e}")
raise
except Exception as e:
logger.error(f"NATS Unexpected error publishing {subject_name}: {e}", exc_info=True)
raise
# End of glances/exports/glances_nats/__init__.py

View File

@ -83,6 +83,7 @@ export = [
"influxdb>=1.0.0", "influxdb>=1.0.0",
"influxdb3-python", "influxdb3-python",
"kafka-python", "kafka-python",
"nats-py",
"paho-mqtt", "paho-mqtt",
"pika", "pika",
"potsdb", "potsdb",

View File

@ -0,0 +1,17 @@
import asyncio
import nats
async def main():
nc = nats.NATS()
await nc.connect(servers=["nats://localhost:4222"])
await nc.publish("glances.test", b'A test')
await nc.flush()
if __name__ == '__main__':
asyncio.run(main())
# To run this test script, make sure you have a NATS server running locally.

View File

@ -0,0 +1,31 @@
import asyncio
import nats
async def main():
duration = 30
subject = "glances.*"
nc = nats.NATS()
await nc.connect(servers=["nats://localhost:4222"])
future = asyncio.Future()
async def cb(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print(f"Received a message on '{subject} {reply}': {data}")
print(f"Receiving message from {subject} during {duration} seconds...")
await nc.subscribe(subject, cb=cb)
await asyncio.wait_for(future, duration)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
# To run this test script, make sure you have a NATS server running locally.

34
tests/test_export_nats.sh Executable file
View File

@ -0,0 +1,34 @@
#!/bin/bash
# Pre-requisites:
# - docker
# - jq
# Exit on error
set -e
echo "Stop previous nats container..."
docker stop nats-for-glances || true
docker rm nats-for-glances || true
echo "Starting nats container..."
docker run -d \
--name nats-for-glances \
-p 4222:4222 \
-p 8222:8222 \
-p 6222:6222 \
nats:latest
# Wait for InfluxDB to be ready (5 seconds)
echo "Waiting for nats to start (~ 5 seconds)..."
sleep 5
# Run glances with export to nats, stopping after 10 writes
# This will run synchronously now since we're using --stop-after
echo "Glances to export system stats to nats (duration: ~ 20 seconds)"
.venv/bin/python -m glances --config ./conf/glances.conf --export nats --stop-after 10 --quiet
# Stop and remove the nats container
echo "Stopping and removing nats container..."
docker stop nats-for-glances && docker rm nats-for-glances
echo "Script completed successfully!"

View File

@ -38,6 +38,6 @@ docker exec timescaledb-for-glances psql -d "postgres://postgres:password@localh
# Stop and remove the TimescaleDB container # Stop and remove the TimescaleDB container
echo "Stopping and removing TimescaleDB container..." echo "Stopping and removing TimescaleDB container..."
# docker stop timescaledb-for-glances && docker rm timescaledb-for-glances docker stop timescaledb-for-glances && docker rm timescaledb-for-glances
echo "Script completed successfully!" echo "Script completed successfully!"