astra.thread_manager#

Classes

class astra.thread_manager.ThreadManager[source]#

Bases: object

start_thread(target: Callable, args: tuple = (), thread_type: str = '', device_name: str = '', thread_id: Any = None, daemon: bool = True) Thread[source]#
join_thread(thread_id: Any) None[source]#
remove_dead_threads() None[source]#
get_thread_ids() List[Any][source]#
get_thread(thread_id: Any) Thread | None[source]#
stop_thread(thread_id: Any) None[source]#
stop_all() None[source]#
is_thread_running(schedule: str) bool[source]#

Return True if any thread of the given type is currently alive.

get_thread_summary() list[dict][source]#

Return a summary list of all threads with type, device_name, and id.