astra.alpaca_device_process#

ALPACA device multiprocessing wrapper for astronomical device control.

This module provides a multiprocessing-based wrapper for ALPACA astronomical devices, enabling concurrent device polling, method execution, and data logging. It implements a producer-consumer pattern with pipes for communication between the main process and device-specific subprocesses.

Classes:

AlpacaDevice: Multiprocessing wrapper for ALPACA astronomical devices

The module supports various ALPACA device types including telescopes, cameras, filter wheels, focusers, domes, and environmental monitoring equipment.

Classes

AlpacaDevice(ip, device_type, device_number, ...)

Multiprocessing wrapper for ALPACA astronomical devices.

Exceptions

AlpacaDeviceError

Base error for AlpacaDevice failures (IPC or remote).

AlpacaDeviceIPCError

Error communicating with the device subprocess (IPC).

RemoteDeviceError

Remote/network error when the device subprocess fails to contact ALPACA HTTP server.

exception astra.alpaca_device_process.AlpacaDeviceError[source]#

Bases: Exception

Base error for AlpacaDevice failures (IPC or remote).

static from_device(device: AlpacaDevice, exc: Exception, method_name: str, method: str | None = None) AlpacaDeviceError[source]#

Create an appropriate AlpacaDeviceError subclass from a device and original exception.

exception astra.alpaca_device_process.AlpacaDeviceIPCError[source]#

Bases: AlpacaDeviceError

Error communicating with the device subprocess (IPC).

exception astra.alpaca_device_process.RemoteDeviceError[source]#

Bases: AlpacaDeviceError

Remote/network error when the device subprocess fails to contact ALPACA HTTP server.

class astra.alpaca_device_process.AlpacaDevice(ip: str, device_type: str, device_number: int, device_name: str, queue: Any, connectable: bool = True, debug: bool = False)[source]#

Bases: Process

Multiprocessing wrapper for ALPACA astronomical devices.

Provides a process-based interface for ALPACA devices with concurrent polling capabilities, method execution, and inter-process communication via pipes. Supports automatic retry logic and comprehensive error handling.

Parameters:
  • ip (str) – IP address of the ALPACA device server.

  • device_type (str) – Type of device (e.g., ‘Telescope’, ‘Camera’).

  • device_number (int) – Device number on the ALPACA server.

  • device_name (str) – User-friendly name for the device.

  • queue – Multiprocessing queue for logging and data communication.

  • connectable (bool) – Whether to attempt initial connection to the device.

  • debug (bool) – Enable debug logging for device operations.

device#

ALPACA device instance.

metadata#

Device identification information.

front_pipe#

Communication pipe for main process.

back_pipe#

Communication pipe for device subprocess.

lock#

Thread lock for pipe synchronization.

get(method: str, **kwargs) Any[source]#

Execute device method with automatic retry and error handling.

Parameters:
  • method (str) – Name of the device method to execute.

  • **kwargs – Keyword arguments to pass to the method.

Returns:

Any – Result from the device method execution.

Raises:

Exception – If the device method execution fails.

set(method: str, value: Any) Any[source]#

Set device property value with error handling.

Parameters:
  • method (str) – Name of the device property to set.

  • value (Any) – Value to assign to the property.

Returns:

Any – Result from the property setter.

Raises:

Exception – If the property setting fails.

start_poll(method: str, delay: float) None[source]#

Start continuous polling of a device method.

Parameters:
  • method (str) – Name of the device method to poll.

  • delay (float) – Polling interval in seconds.

stop_poll(method: str | None = None) None[source]#

Stop polling for a specific method or all methods.

Parameters:

method (Optional[str]) – Method name to stop polling, or None for all.

pause_polls() None[source]#

Temporarily pause all active polling operations.

resume_polls() None[source]#

Resume all paused polling operations.

poll_list() List[str][source]#

Get list of currently active polling methods.

Returns:

List[str] – List of method names being polled.

Raises:

Exception – If polling list retrieval fails.

poll_latest() Dict[str, Dict[str, Any]][source]#

Get latest polling results for all active methods.

Uses non-blocking lock acquisition to avoid blocking the WebSocket when get/set operations are in progress. If the lock is unavailable, returns cached data from the last successful read.

Returns:

Dict[str, Dict[str, Any]]

Dictionary mapping method names to

their latest values and timestamps. Returns None if cache is empty and lock is unavailable.

stop() None[source]#

Stop the device process and clean up resources.

run() None[source]#

Main process loop for handling device operations.

Runs in the device subprocess to handle method calls, polling, and inter-process communication. Sets up signal handlers for graceful shutdown.

listenFront__() bool[source]#

Listen for and process messages from the main process.

Handles various message types including method calls, polling control, and shutdown commands.

Returns:

bool – True to continue running, False to stop the process.

get__(method: str, pipe: bool = True, **kwargs) Any | Dict[str, Any][source]#

Backend method execution with retry logic and error handling.

Parameters:
  • method (str) – Name of the device method to execute.

  • pipe (bool) – Whether to send result via pipe or return directly.

  • **kwargs – Keyword arguments for the method.

Returns:

Union[Any, Dict[str, Any]] – Method result or status dictionary.

set__(method: str, value: Any) None[source]#

Backend property setter with retry logic and error handling.

Parameters:
  • method (str) – Name of the device property to set.

  • value (Any) – Value to assign to the property.

loop__(method: str, delay: float) None[source]#

Continuous polling loop for a specific device method.

Runs in a separate thread to continuously poll a device method at the specified interval, logging results to the database.

Parameters:
  • method (str) – Name of the device method to poll.

  • delay (float) – Polling interval in seconds.

start_poll__(method: str, delay: float) None[source]#

Start a new polling thread for the specified method.

Parameters:
  • method (str) – Name of the device method to start polling.

  • delay (float) – Polling interval in seconds.

stop_poll__(method: str | None = None, *args) None[source]#

Stop polling for a specific method or all methods.

Parameters:
  • method (Optional[str]) – Method name to stop polling, or None for all.

  • *args – Additional arguments (used for signal handlers).

poll_list__() None[source]#

Send current polling list via pipe.

poll_latest__() None[source]#

Send latest polling results via pipe.

stop__(*args) None[source]#

Stop the device process and close communication pipes.

Parameters:

*args – Additional arguments (used for signal handlers).

force_poll(method: str, **kwargs) None[source]#

Immediately poll a device method once and write the result to the database via the queue.