First version ok. Log message should be removed. Code should be tested.

This commit is contained in:
nicolargo 2025-12-30 19:04:07 +01:00
parent ba932e72bd
commit b7c6cce373
10 changed files with 380 additions and 2 deletions

View File

@ -242,7 +242,7 @@ Glances can export stats to:
- files: ``CSV`` and ``JSON``
- databases: ``InfluxDB``, ``ElasticSearch``, ``PostgreSQL/TimeScale``, ``Cassandra``, ``CouchDB``, ``OpenTSDB``, ``Prometheus``, ``StatsD``, ``Riemann`` and ``Graphite``
- brokers: ``RabbitMQ/ActiveMQ``, ``ZeroMQ`` and ``Kafka``
- brokers: ``RabbitMQ/ActiveMQ``, ``NATS``, ``ZeroMQ`` and ``Kafka``
- others: ``RESTful`` endpoint
Installation 🚀
@ -574,6 +574,7 @@ Extra dependencies:
- ``influxdb`` (for the InfluxDB version 1 export module)
- ``influxdb-client`` (for the InfluxDB version 2 export module)
- ``kafka-python`` (for the Kafka export module)
- ``nats-py`` (for the NATS export module)
- ``netifaces2`` (for the IP plugin)
- ``nvidia-ml-py`` (for the GPU plugin)
- ``pycouchdb`` (for the CouchDB export module)

View File

@ -890,6 +890,14 @@ password=password
# Most of the time, you should not overwrite this value
#hostname=mycomputer
[nats]
# Configuration for the --export nats option
# https://nats.io/
# Host is a separated list of NATS nodes
host=nats://localhost:4222
# Prefix for the subjects (default is 'glances')
prefix=glances
##############################################################################
# AMPS
# * enable: Enable (true) or disable (false) the AMP

View File

@ -36,6 +36,7 @@ This section describes the available exporters and how to configure them:
kafka
mqtt
mongodb
nats
opentsdb
prometheus
rabbitmq

68
docs/gw/nats.rst Normal file
View File

@ -0,0 +1,68 @@
.. _nats:
NATS
====
NATS is a message broker.
You can export statistics to a ``NATS`` server.
The connection should be defined in the Glances configuration file as
following:
.. code-block:: ini
[nats]
host=nats://localhost:4222
prefix=glances
and run Glances with:
.. code-block:: console
$ glances --export nats
Data model
-----------
Glances stats are published as JSON messagesto the following subjects:
<prefix>.<plugin>
Example:
CPU stats are published to glances.cpu
So a simple Python client will subscribe to this subject with:
import asyncio
import nats
async def main():
nc = nats.NATS()
await nc.connect(servers=["nats://localhost:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("glances.cpu", cb=cb)
# Wait for message to come in
print("Waiting (max 30 seconds) for a message on 'glances' subject...")
msg = await asyncio.wait_for(future, 30)
print(msg.subject, msg.data)
if __name__ == '__main__':
asyncio.run(main())
To subscribe to all Glannces stats use wildcard:
await nc.subscribe("glances.*", cb=cb)

View File

@ -0,0 +1,221 @@
"""NATS interface class."""
import asyncio
import threading
import time
from nats.aio.client import Client as NATS
from nats.errors import ConnectionClosedError
from nats.errors import TimeoutError as NatsTimeoutError
from glances.exports.export import GlancesExport
from glances.globals import json_dumps
from glances.logger import logger
class Export(GlancesExport):
"""This class manages the NATS export module."""
def __init__(self, config=None, args=None):
"""Init the NATS export IF."""
super().__init__(config=config, args=args)
# Load the NATS configuration file
self.export_enable = self.load_conf(
'nats',
mandatories=['host'],
options=['prefix'],
)
if not self.export_enable:
exit('Missing NATS config')
self.prefix = self.prefix or 'glances'
self.hosts = self.host
logger.info(f"Initializing NATS export with prefix '{self.prefix}' to {self.hosts}")
# Create a persistent event loop in a background thread
self.loop = None
self.client = None
self._connected = False
self._shutdown = False
self._loop_ready = threading.Event()
self._loop_exception = None
self._publish_count = 0
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
# Wait for the loop to be ready
if not self._loop_ready.wait(timeout=10):
exit("NATS event loop failed to start within timeout")
if self._loop_exception:
exit(f"NATS event loop creation failed: {self._loop_exception}")
if self.loop is None:
exit("NATS event loop is None after initialization")
# Initial connection attempt
future = asyncio.run_coroutine_threadsafe(self._connect(), self.loop)
try:
future.result(timeout=10)
logger.info("NATS export initialized successfully")
except Exception as e:
logger.warning(f"Initial NATS connection failed: {e}. Will retry in background.")
def _run_event_loop(self):
"""Run event loop in background thread."""
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._loop_ready.set()
logger.info("NATS event loop started")
self.loop.run_forever()
except Exception as e:
self._loop_exception = e
self._loop_ready.set()
logger.error(f"Event loop thread error: {e}")
finally:
# Clean up any pending tasks
pending = asyncio.all_tasks(self.loop)
for task in pending:
task.cancel()
if pending:
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
self.loop.close()
logger.info("NATS event loop stopped")
async def _connect(self):
"""Connect to NATS with error handling."""
try:
if self.client:
try:
await self.client.close()
except Exception as e:
logger.debug(f"Error closing existing NATS client: {e}")
self.client = NATS()
logger.info(f"Connecting to NATS servers: {self.hosts}")
# Configure with reconnection callbacks
await self.client.connect(
servers=[s.strip() for s in self.hosts.split(',')],
reconnect_time_wait=2,
max_reconnect_attempts=60,
error_cb=self._error_callback,
disconnected_cb=self._disconnected_callback,
reconnected_cb=self._reconnected_callback,
)
self._connected = True
logger.info(f"Successfully connected to NATS: {self.hosts}")
except Exception as e:
self._connected = False
logger.error(f"NATS connection error: {e}")
raise
async def _error_callback(self, e):
"""Called when NATS client encounters an error."""
logger.error(f"NATS error callback: {e}")
async def _disconnected_callback(self):
"""Called when disconnected from NATS."""
self._connected = False
logger.warning("NATS disconnected callback")
async def _reconnected_callback(self):
"""Called when reconnected to NATS."""
self._connected = True
logger.info("NATS reconnected callback")
def exit(self):
"""Close the NATS connection."""
super().exit()
self._shutdown = True
logger.info("NATS export shutting down")
if self.loop and self.client:
future = asyncio.run_coroutine_threadsafe(self._disconnect(), self.loop)
try:
future.result(timeout=5)
except Exception as e:
logger.error(f"Error disconnecting from NATS: {e}")
if self.loop:
self.loop.call_soon_threadsafe(self.loop.stop)
time.sleep(0.5)
logger.info(f"NATS export shutdown complete. Total messages published: {self._publish_count}")
async def _disconnect(self):
"""Disconnect from NATS."""
try:
if self.client and self._connected:
await self.client.drain()
await self.client.close()
self._connected = False
logger.info("NATS disconnected cleanly")
except Exception as e:
logger.error(f"Error in disconnect: {e}")
def export(self, name, columns, points):
"""Write the points in NATS."""
logger.info(f"Export called: name={name}, columns={columns}, connected={self._connected}")
if self._shutdown:
logger.warning("Export called during shutdown, skipping")
return
if not self.loop or not self.loop.is_running():
logger.error("NATS event loop is not running")
return
if not self._connected:
logger.warning("NATS not connected, skipping export")
return
subject_name = f"{self.prefix}.{name}"
subject_data = dict(zip(columns, points))
logger.info(f"Publishing to subject: {subject_name}")
# Submit the publish operation to the background event loop
try:
future = asyncio.run_coroutine_threadsafe(
self._publish(subject_name, json_dumps(subject_data)),
self.loop
)
# Don't block forever - use a short timeout
future.result(timeout=1)
self._publish_count += 1
logger.info(f"Successfully published message #{self._publish_count} to {subject_name}")
except asyncio.TimeoutError:
logger.warning(f"NATS publish timeout for {subject_name}")
except Exception as e:
logger.error(f"NATS publish error for {subject_name}: {e}", exc_info=True)
async def _publish(self, subject, data):
"""Publish data to NATS."""
try:
logger.info(f"_publish called: subject={subject}, data_length={len(data)}")
if not self._connected:
raise ConnectionClosedError("Not connected to NATS")
logger.info(f"Calling client.publish for {subject}")
await self.client.publish(subject, data)
logger.info(f"Calling client.flush for {subject}")
await asyncio.wait_for(self.client.flush(), timeout=2.0)
logger.info(f"Successfully published and flushed to '{subject}'")
except (ConnectionClosedError, NatsTimeoutError) as e:
self._connected = False
logger.error(f"NATS publish failed: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error in _publish: {e}", exc_info=True)
raise
# End of glances/exports/glances_nats/__init__.py

View File

@ -83,6 +83,7 @@ export = [
"influxdb>=1.0.0",
"influxdb3-python",
"kafka-python",
"nats-py",
"paho-mqtt",
"pika",
"potsdb",

View File

@ -0,0 +1,17 @@
import asyncio
import nats
async def main():
nc = nats.NATS()
await nc.connect(servers=["nats://localhost:4222"])
await nc.publish("glances.test", b'A test')
await nc.flush()
if __name__ == '__main__':
asyncio.run(main())
# To run this test script, make sure you have a NATS server running locally.

View File

@ -0,0 +1,27 @@
import asyncio
import nats
async def main():
nc = nats.NATS()
await nc.connect(servers=["nats://localhost:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("glances.*", cb=cb)
# Wait for message to come in
print("Waiting (max 30s) for a message on 'glances' subject...")
msg = await asyncio.wait_for(future, 30)
print(msg.subject, msg.data)
if __name__ == '__main__':
asyncio.run(main())
# To run this test script, make sure you have a NATS server running locally.

34
tests/test_export_nats.sh Executable file
View File

@ -0,0 +1,34 @@
#!/bin/bash
# Pre-requisites:
# - docker
# - jq
# Exit on error
set -e
echo "Stop previous nats container..."
docker stop nats-for-glances || true
docker rm nats-for-glances || true
echo "Starting nats container..."
docker run -d \
--name nats-for-glances \
-p 4222:4222 \
-p 8222:8222 \
-p 6222:6222 \
nats:latest
# Wait for InfluxDB to be ready (5 seconds)
echo "Waiting for nats to start (~ 5 seconds)..."
sleep 5
# Run glances with export to nats, stopping after 10 writes
# This will run synchronously now since we're using --stop-after
echo "Glances to export system stats to nats (duration: ~ 20 seconds)"
.venv/bin/python -m glances --config ./conf/glances.conf --export nats --stop-after 10 --quiet
# Stop and remove the nats container
echo "Stopping and removing nats container..."
docker stop nats-for-glances && docker rm nats-for-glances
echo "Script completed successfully!"

View File

@ -38,6 +38,6 @@ docker exec timescaledb-for-glances psql -d "postgres://postgres:password@localh
# Stop and remove the TimescaleDB container
echo "Stopping and removing TimescaleDB container..."
# docker stop timescaledb-for-glances && docker rm timescaledb-for-glances
docker stop timescaledb-for-glances && docker rm timescaledb-for-glances
echo "Script completed successfully!"