Source code for astra.thread_manager

"""Thread management for device operations within the Astra framework.

Key capabilities:
    - Start and manage threads for device operations
    - Track thread status and provide summaries
    - Safely stop and clean up threads
"""

from threading import Thread
from typing import Any, Callable, Dict, List


[docs] class ThreadManager: """Manages threads for device operations within the Astra framework. Attributes: threads (List[Dict[str, Any]]): List of dictionaries containing thread information. """ def __init__(self): self.threads: List[Dict[str, Any]] = []
[docs] def start_thread( self, target: Callable, args: tuple = (), thread_type: str = "", device_name: str = "", thread_id: Any = None, daemon: bool = True, ) -> Thread: """Start a new thread for the specified target function.""" th = Thread(target=target, args=args, daemon=daemon) th.start() thread_info = { "type": thread_type, "device_name": device_name, "thread": th, "id": thread_id if thread_id is not None else id(th), } self.threads.append(thread_info) return th
[docs] def join_thread(self, thread_id: Any) -> None: """Wait for the specified thread to complete.""" for th_info in self.threads: if th_info["id"] == thread_id: th_info["thread"].join() break
[docs] def remove_dead_threads(self) -> None: """Remove threads that have completed from the threads list.""" self.threads = [th for th in self.threads if th["thread"].is_alive()]
[docs] def get_thread_ids(self) -> List[Any]: """Return a list of all thread IDs.""" return [th_info["id"] for th_info in self.threads]
[docs] def get_thread(self, thread_id: Any) -> Thread | None: """Return the thread object for the specified thread ID, or None if not found.""" for th_info in self.threads: if th_info["id"] == thread_id: return th_info["thread"] return None
[docs] def stop_thread(self, thread_id: Any) -> None: """Stop and remove the specified thread from the threads list.""" for th_info in self.threads: if th_info["id"] == thread_id and th_info["thread"].is_alive(): th_info["thread"].join() self.threads.remove(th_info) break
[docs] def stop_all(self) -> None: """Stop and remove all threads from the threads list.""" for th_info in self.threads: if th_info["thread"].is_alive(): th_info["thread"].join() self.threads.clear()
[docs] def is_thread_running(self, schedule: str) -> bool: """Return True if any thread of the given type is currently alive.""" for th in self.threads: if th["id"] == schedule and th["thread"].is_alive(): return True return False
[docs] def get_thread_summary(self) -> list[dict]: """ Return a summary list of all threads with type, device_name, and id. """ return [self._thread_summary(th_info) for th_info in self.threads]
def _thread_summary(self, thread_info: dict) -> dict: return { "type": thread_info.get("type"), "device_name": thread_info.get("device_name"), "id": thread_info.get("id"), }