Source code for astra.queue_manager
import multiprocessing
from astra.database_manager import DatabaseManager
from astra.logger import ObservatoryLogger
from astra.thread_manager import ThreadManager
[docs]
class QueueManager:
"""Manages the multiprocessing queue and its running state.
Also starts and manages the thread that processes the queue.
Attributes:
manager (multiprocessing.Manager): Manager for creating shared objects.
queue (multiprocessing.Queue): The multiprocessing queue for inter-process communication.
queue_is_running (bool): Flag indicating if the queue processing is active.
thread (threading.Thread | None): The thread that processes the queue.
"""
def __init__(
self,
logger: ObservatoryLogger,
database_manager: DatabaseManager,
thread_manager: ThreadManager,
):
self.manager = multiprocessing.Manager()
self.queue = self.manager.Queue()
self.queue_is_running = True
self.database_manager = database_manager
self.thread_manager = thread_manager
self.logger = logger
[docs]
def start_queue_thread(
self,
):
self.thread_manager.start_thread(
target=self.queue_get,
thread_type="queue",
device_name="queue",
thread_id="queue",
)
[docs]
def queue_get(self) -> None:
"""
Process multiprocessing queue messages for database operations and logging.
Continuously monitors the multiprocessing queue for database queries and
log messages from device processes. Handles different message types and
maintains system operation by processing database operations and managing
thread cleanup.
Message Types Handled:
- 'query': Executes SQL database queries from device processes
- 'log': Processes log messages with different severity levels
- 'info', 'warning', 'error', 'debug' log levels supported
- Error messages are added to error_source for monitoring
Background Operations:
- Cleans up completed threads from the threads list
- Maintains database consistency across multiprocessing boundaries
- Ensures proper error propagation from device processes
Error Handling:
- Catches and logs queue processing errors
- Adds queue errors to error_source for monitoring
- Stops queue processing on fatal errors
Note:
- Runs continuously until queue_running flag is False
- Essential for multiprocessing communication with devices
- Handles both synchronous database operations and asynchronous logging
- Thread cleanup prevents memory leaks in long-running operations
"""
while self.queue_is_running:
try:
metadata, r = self.queue.get()
if r["type"] == "query":
self.database_manager.execute(r["data"])
elif r["type"] == "log":
if r["data"][0] == "info":
self.logger.info(r["data"][1])
elif r["data"][0] == "warning":
self.logger.warning(r["data"][1])
elif r["data"][0] == "error":
self.logger.report_device_issue(
device_type=metadata["device_type"],
device_name=metadata["device_name"],
message="Error for {metadata['device_name']}",
exception=r["data"][1],
)
elif r["data"][0] == "debug":
self.logger.debug(r["data"][1])
# pick up work of watchdog
self.thread_manager.remove_dead_threads()
except EOFError:
# This exception is raised when the multiprocessing queue's
# underlying connection is closed, which typically happens
# when the main process or the multiprocessing.Manager process
# is shutting down (such as at the end of a test run or program).
#
# If we did not catch EOFError here, the thread would log
# unnecessary errors during normal shutdown, cluttering logs
# and potentially causing confusion.
break
except Exception as e:
self.logger.report_device_issue(
device_type="Queue",
device_name="queue_get",
message=f"Error running queue_get for Queue, {type(e).__name__}",
exception=e,
)
self.queue_is_running = False