astra.queue_manager#
Manages the multiprocessing queue and its running state.
Also starts and manages the thread that processes the queue.
- Key capabilities:
Create and manage a multiprocessing queue for inter-process communication
Maintain the running state of the queue processing
Start a dedicated thread to monitor and process queue messages
Handle database operations and logging from device processes
Ensure proper error handling and logging for queue operations
Classes
|
Manages the multiprocessing queue and its running state. |
- class astra.queue_manager.QueueManager(logger: ObservatoryLogger, database_manager: DatabaseManager, thread_manager: ThreadManager)[source]#
Bases:
objectManages the multiprocessing queue and its running state.
Also starts and manages the thread that processes the queue.
- manager#
Manager for creating shared objects.
- Type:
multiprocessing.Manager
- queue#
The multiprocessing queue for inter-process communication.
- Type:
- thread#
The thread that processes the queue.
- Type:
threading.Thread | None
- queue_get() None[source]#
Process multiprocessing queue messages for database operations and logging.
Continuously monitors the multiprocessing queue for database queries and log messages from device processes. Handles different message types and maintains system operation by processing database operations and managing thread cleanup.
- Message Types Handled:
‘query’: Executes SQL database queries from device processes
- ‘log’: Processes log messages with different severity levels
‘info’, ‘warning’, ‘error’, ‘debug’ log levels supported
Error messages are added to error_source for monitoring
- Background Operations:
Cleans up completed threads from the threads list
Maintains database consistency across multiprocessing boundaries
Ensures proper error propagation from device processes
- Error Handling:
Catches and logs queue processing errors
Adds queue errors to error_source for monitoring
Stops queue processing on fatal errors
Note
Runs continuously until queue_running flag is False
Essential for multiprocessing communication with devices
Handles both synchronous database operations and asynchronous logging
Thread cleanup prevents memory leaks in long-running operations