astra.thread_manager#

Thread management for device operations within the Astra framework.

Key capabilities:
  • Start and manage threads for device operations

  • Track thread status and provide summaries

  • Safely stop and clean up threads

Classes

ThreadManager()

Manages threads for device operations within the Astra framework.

class astra.thread_manager.ThreadManager[source]#

Bases: object

Manages threads for device operations within the Astra framework.

threads#

List of dictionaries containing thread information.

Type:

List[Dict[str, Any]]

start_thread(target: Callable, args: tuple = (), thread_type: str = '', device_name: str = '', thread_id: Any = None, daemon: bool = True) Thread[source]#

Start a new thread for the specified target function.

join_thread(thread_id: Any) None[source]#

Wait for the specified thread to complete.

remove_dead_threads() None[source]#

Remove threads that have completed from the threads list.

get_thread_ids() List[Any][source]#

Return a list of all thread IDs.

get_thread(thread_id: Any) Thread | None[source]#

Return the thread object for the specified thread ID, or None if not found.

stop_thread(thread_id: Any) None[source]#

Stop and remove the specified thread from the threads list.

stop_all() None[source]#

Stop and remove all threads from the threads list.

is_thread_running(schedule: str) bool[source]#

Return True if any thread of the given type is currently alive.

get_thread_summary() list[dict][source]#

Return a summary list of all threads with type, device_name, and id.