mirror of https://github.com/nicolargo/glances.git
Remove debug message
This commit is contained in:
parent
b7c6cce373
commit
12083f33fc
|
|
@ -1,4 +1,13 @@
|
||||||
|
#
|
||||||
|
# This file is part of Glances.
|
||||||
|
#
|
||||||
|
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com>
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: LGPL-3.0-only
|
||||||
|
#
|
||||||
|
|
||||||
"""NATS interface class."""
|
"""NATS interface class."""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
@ -29,10 +38,9 @@ class Export(GlancesExport):
|
||||||
exit('Missing NATS config')
|
exit('Missing NATS config')
|
||||||
|
|
||||||
self.prefix = self.prefix or 'glances'
|
self.prefix = self.prefix or 'glances'
|
||||||
|
# Host is a comma-separated list of NATS servers
|
||||||
self.hosts = self.host
|
self.hosts = self.host
|
||||||
|
|
||||||
logger.info(f"Initializing NATS export with prefix '{self.prefix}' to {self.hosts}")
|
|
||||||
|
|
||||||
# Create a persistent event loop in a background thread
|
# Create a persistent event loop in a background thread
|
||||||
self.loop = None
|
self.loop = None
|
||||||
self.client = None
|
self.client = None
|
||||||
|
|
@ -61,7 +69,7 @@ class Export(GlancesExport):
|
||||||
future.result(timeout=10)
|
future.result(timeout=10)
|
||||||
logger.info("NATS export initialized successfully")
|
logger.info("NATS export initialized successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Initial NATS connection failed: {e}. Will retry in background.")
|
logger.warning(f"NATS Initial connection failed: {e}. Will retry in background.")
|
||||||
|
|
||||||
def _run_event_loop(self):
|
def _run_event_loop(self):
|
||||||
"""Run event loop in background thread."""
|
"""Run event loop in background thread."""
|
||||||
|
|
@ -69,21 +77,18 @@ class Export(GlancesExport):
|
||||||
self.loop = asyncio.new_event_loop()
|
self.loop = asyncio.new_event_loop()
|
||||||
asyncio.set_event_loop(self.loop)
|
asyncio.set_event_loop(self.loop)
|
||||||
self._loop_ready.set()
|
self._loop_ready.set()
|
||||||
logger.info("NATS event loop started")
|
|
||||||
self.loop.run_forever()
|
self.loop.run_forever()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._loop_exception = e
|
self._loop_exception = e
|
||||||
self._loop_ready.set()
|
self._loop_ready.set()
|
||||||
logger.error(f"Event loop thread error: {e}")
|
logger.error(f"NATS Export Event loop thread error: {e}")
|
||||||
finally:
|
finally:
|
||||||
# Clean up any pending tasks
|
|
||||||
pending = asyncio.all_tasks(self.loop)
|
pending = asyncio.all_tasks(self.loop)
|
||||||
for task in pending:
|
for task in pending:
|
||||||
task.cancel()
|
task.cancel()
|
||||||
if pending:
|
if pending:
|
||||||
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||||
self.loop.close()
|
self.loop.close()
|
||||||
logger.info("NATS event loop stopped")
|
|
||||||
|
|
||||||
async def _connect(self):
|
async def _connect(self):
|
||||||
"""Connect to NATS with error handling."""
|
"""Connect to NATS with error handling."""
|
||||||
|
|
@ -92,11 +97,11 @@ class Export(GlancesExport):
|
||||||
try:
|
try:
|
||||||
await self.client.close()
|
await self.client.close()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug(f"Error closing existing NATS client: {e}")
|
logger.debug(f"NATS Error closing existing client: {e}")
|
||||||
|
|
||||||
self.client = NATS()
|
self.client = NATS()
|
||||||
|
|
||||||
logger.info(f"Connecting to NATS servers: {self.hosts}")
|
logger.debug(f"NATS Connecting to servers: {self.hosts}")
|
||||||
|
|
||||||
# Configure with reconnection callbacks
|
# Configure with reconnection callbacks
|
||||||
await self.client.connect(
|
await self.client.connect(
|
||||||
|
|
@ -109,7 +114,7 @@ class Export(GlancesExport):
|
||||||
)
|
)
|
||||||
|
|
||||||
self._connected = True
|
self._connected = True
|
||||||
logger.info(f"Successfully connected to NATS: {self.hosts}")
|
logger.debug(f"NATS Successfully connected to servers: {self.hosts}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._connected = False
|
self._connected = False
|
||||||
logger.error(f"NATS connection error: {e}")
|
logger.error(f"NATS connection error: {e}")
|
||||||
|
|
@ -122,12 +127,12 @@ class Export(GlancesExport):
|
||||||
async def _disconnected_callback(self):
|
async def _disconnected_callback(self):
|
||||||
"""Called when disconnected from NATS."""
|
"""Called when disconnected from NATS."""
|
||||||
self._connected = False
|
self._connected = False
|
||||||
logger.warning("NATS disconnected callback")
|
logger.debug("NATS disconnected callback")
|
||||||
|
|
||||||
async def _reconnected_callback(self):
|
async def _reconnected_callback(self):
|
||||||
"""Called when reconnected to NATS."""
|
"""Called when reconnected to NATS."""
|
||||||
self._connected = True
|
self._connected = True
|
||||||
logger.info("NATS reconnected callback")
|
logger.debug("NATS reconnected callback")
|
||||||
|
|
||||||
def exit(self):
|
def exit(self):
|
||||||
"""Close the NATS connection."""
|
"""Close the NATS connection."""
|
||||||
|
|
@ -140,13 +145,13 @@ class Export(GlancesExport):
|
||||||
try:
|
try:
|
||||||
future.result(timeout=5)
|
future.result(timeout=5)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error disconnecting from NATS: {e}")
|
logger.error(f"NATS Error disconnecting from server: {e}")
|
||||||
|
|
||||||
if self.loop:
|
if self.loop:
|
||||||
self.loop.call_soon_threadsafe(self.loop.stop)
|
self.loop.call_soon_threadsafe(self.loop.stop)
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
logger.info(f"NATS export shutdown complete. Total messages published: {self._publish_count}")
|
logger.debug(f"NATS export shutdown complete. Total messages published: {self._publish_count}")
|
||||||
|
|
||||||
async def _disconnect(self):
|
async def _disconnect(self):
|
||||||
"""Disconnect from NATS."""
|
"""Disconnect from NATS."""
|
||||||
|
|
@ -155,16 +160,14 @@ class Export(GlancesExport):
|
||||||
await self.client.drain()
|
await self.client.drain()
|
||||||
await self.client.close()
|
await self.client.close()
|
||||||
self._connected = False
|
self._connected = False
|
||||||
logger.info("NATS disconnected cleanly")
|
logger.debug("NATS disconnected cleanly")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in disconnect: {e}")
|
logger.error(f"NATS Error in disconnect: {e}")
|
||||||
|
|
||||||
def export(self, name, columns, points):
|
def export(self, name, columns, points):
|
||||||
"""Write the points in NATS."""
|
"""Write the points in NATS."""
|
||||||
logger.info(f"Export called: name={name}, columns={columns}, connected={self._connected}")
|
|
||||||
|
|
||||||
if self._shutdown:
|
if self._shutdown:
|
||||||
logger.warning("Export called during shutdown, skipping")
|
logger.debug("NATS Export called during shutdown, skipping")
|
||||||
return
|
return
|
||||||
|
|
||||||
if not self.loop or not self.loop.is_running():
|
if not self.loop or not self.loop.is_running():
|
||||||
|
|
@ -178,8 +181,6 @@ class Export(GlancesExport):
|
||||||
subject_name = f"{self.prefix}.{name}"
|
subject_name = f"{self.prefix}.{name}"
|
||||||
subject_data = dict(zip(columns, points))
|
subject_data = dict(zip(columns, points))
|
||||||
|
|
||||||
logger.info(f"Publishing to subject: {subject_name}")
|
|
||||||
|
|
||||||
# Submit the publish operation to the background event loop
|
# Submit the publish operation to the background event loop
|
||||||
try:
|
try:
|
||||||
future = asyncio.run_coroutine_threadsafe(
|
future = asyncio.run_coroutine_threadsafe(
|
||||||
|
|
@ -189,7 +190,6 @@ class Export(GlancesExport):
|
||||||
# Don't block forever - use a short timeout
|
# Don't block forever - use a short timeout
|
||||||
future.result(timeout=1)
|
future.result(timeout=1)
|
||||||
self._publish_count += 1
|
self._publish_count += 1
|
||||||
logger.info(f"Successfully published message #{self._publish_count} to {subject_name}")
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.warning(f"NATS publish timeout for {subject_name}")
|
logger.warning(f"NATS publish timeout for {subject_name}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -198,24 +198,16 @@ class Export(GlancesExport):
|
||||||
async def _publish(self, subject, data):
|
async def _publish(self, subject, data):
|
||||||
"""Publish data to NATS."""
|
"""Publish data to NATS."""
|
||||||
try:
|
try:
|
||||||
logger.info(f"_publish called: subject={subject}, data_length={len(data)}")
|
|
||||||
|
|
||||||
if not self._connected:
|
if not self._connected:
|
||||||
raise ConnectionClosedError("Not connected to NATS")
|
raise ConnectionClosedError("NATS Not connected to server")
|
||||||
|
|
||||||
logger.info(f"Calling client.publish for {subject}")
|
|
||||||
await self.client.publish(subject, data)
|
await self.client.publish(subject, data)
|
||||||
|
|
||||||
logger.info(f"Calling client.flush for {subject}")
|
|
||||||
await asyncio.wait_for(self.client.flush(), timeout=2.0)
|
await asyncio.wait_for(self.client.flush(), timeout=2.0)
|
||||||
|
|
||||||
logger.info(f"Successfully published and flushed to '{subject}'")
|
|
||||||
except (ConnectionClosedError, NatsTimeoutError) as e:
|
except (ConnectionClosedError, NatsTimeoutError) as e:
|
||||||
self._connected = False
|
self._connected = False
|
||||||
logger.error(f"NATS publish failed: {e}")
|
logger.error(f"NATS publish failed: {e}")
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Unexpected error in _publish: {e}", exc_info=True)
|
logger.error(f"NATS Unexpected error in _publish: {e}", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# End of glances/exports/glances_nats/__init__.py
|
# End of glances/exports/glances_nats/__init__.py
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,9 @@ import nats
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
duration = 30
|
||||||
|
subject = "glances.*"
|
||||||
|
|
||||||
nc = nats.NATS()
|
nc = nats.NATS()
|
||||||
|
|
||||||
await nc.connect(servers=["nats://localhost:4222"])
|
await nc.connect(servers=["nats://localhost:4222"])
|
||||||
|
|
@ -11,15 +14,16 @@ async def main():
|
||||||
future = asyncio.Future()
|
future = asyncio.Future()
|
||||||
|
|
||||||
async def cb(msg):
|
async def cb(msg):
|
||||||
nonlocal future
|
subject = msg.subject
|
||||||
future.set_result(msg)
|
reply = msg.reply
|
||||||
|
data = msg.data.decode()
|
||||||
|
print(f"Received a message on '{subject} {reply}': {data}")
|
||||||
|
|
||||||
await nc.subscribe("glances.*", cb=cb)
|
print(f"Receiving message from {subject} during {duration} seconds...")
|
||||||
|
await nc.subscribe(subject, cb=cb)
|
||||||
|
await asyncio.wait_for(future, duration)
|
||||||
|
|
||||||
# Wait for message to come in
|
await nc.close()
|
||||||
print("Waiting (max 30s) for a message on 'glances' subject...")
|
|
||||||
msg = await asyncio.wait_for(future, 30)
|
|
||||||
print(msg.subject, msg.data)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue