"""ALPACA device multiprocessing wrapper for astronomical device control.
This module provides a multiprocessing-based wrapper for ALPACA astronomical
devices, enabling concurrent device polling, method execution, and data logging.
It implements a producer-consumer pattern with pipes for communication between
the main process and device-specific subprocesses.
Classes:
AlpacaDevice: Multiprocessing wrapper for ALPACA astronomical devices
The module supports various ALPACA device types including telescopes, cameras,
filter wheels, focusers, domes, and environmental monitoring equipment.
"""
import os
import signal
import time
from datetime import UTC, datetime
from multiprocessing import Lock, Pipe, Process
from threading import Lock as ThreadingLock
from threading import Thread
from typing import Any, Dict, List, Optional, Union
import requests
from alpaca.camera import Camera
from alpaca.covercalibrator import CoverCalibrator
from alpaca.dome import Dome
from alpaca.filterwheel import FilterWheel
from alpaca.focuser import Focuser
from alpaca.observingconditions import ObservingConditions
from alpaca.rotator import Rotator
from alpaca.safetymonitor import SafetyMonitor
from alpaca.switch import Switch
from alpaca.telescope import Telescope
ALPACA_DEVICE_TYPES = {
"Telescope": Telescope,
"Camera": Camera,
"CoverCalibrator": CoverCalibrator,
"Dome": Dome,
"FilterWheel": FilterWheel,
"Focuser": Focuser,
"ObservingConditions": ObservingConditions,
"Rotator": Rotator,
"SafetyMonitor": SafetyMonitor,
"Switch": Switch,
}
# https://medium.com/@sampsa.riikonen/doing-python-multiprocessing-the-right-way-a54c1880e300
# https://stackoverflow.com/questions/27435284/multiprocessing-vs-multithreading-vs-asyncio
[docs]
class AlpacaDeviceError(Exception):
"""Base error for AlpacaDevice failures (IPC or remote)."""
[docs]
@staticmethod
def from_device(
device: "AlpacaDevice",
exc: Exception,
method_name: str,
method: str | None = None,
) -> "AlpacaDeviceError":
"""Create an appropriate AlpacaDeviceError subclass from a device and original exception."""
if method is not None:
method_name = f"{method}('{method_name}')"
msg = f"{device.device_type} {device.device_name}: '{method_name}' failed: {str(exc)}"
# classify the exception as remote/network if it looks like a requests/HTTP error
is_remote = False
try:
if isinstance(exc, requests.RequestException):
is_remote = True
except Exception:
# requests may not be available or exc may not be the same class object;
# fall back to message heuristics
pass
if not is_remote:
s = str(exc).lower()
if (
"connection refused" in s
or "max retries exceeded" in s
or "newconnectionerror" in s
):
is_remote = True
if is_remote:
e = RemoteDeviceError(msg)
else:
e = AlpacaDeviceIPCError(msg)
# preserve original exception as cause so callers can inspect it
try:
e.__cause__ = exc
except Exception:
pass
return e
[docs]
class AlpacaDeviceIPCError(AlpacaDeviceError):
"""Error communicating with the device subprocess (IPC)."""
[docs]
class RemoteDeviceError(AlpacaDeviceError):
"""Remote/network error when the device subprocess fails to contact ALPACA HTTP server."""
[docs]
class AlpacaDevice(Process):
"""Multiprocessing wrapper for ALPACA astronomical devices.
Provides a process-based interface for ALPACA devices with concurrent
polling capabilities, method execution, and inter-process communication
via pipes. Supports automatic retry logic and comprehensive error handling.
Args:
ip (str): IP address of the ALPACA device server.
device_type (str): Type of device (e.g., 'Telescope', 'Camera').
device_number (int): Device number on the ALPACA server.
device_name (str): User-friendly name for the device.
queue: Multiprocessing queue for logging and data communication.
connectable (bool): Whether to attempt initial connection to the device.
debug (bool): Enable debug logging for device operations.
Attributes:
device: ALPACA device instance.
metadata: Device identification information.
front_pipe: Communication pipe for main process.
back_pipe: Communication pipe for device subprocess.
lock: Thread lock for pipe synchronization.
"""
def __init__(
self,
ip: str,
device_type: str,
device_number: int,
device_name: str,
queue: Any,
connectable: bool = True,
debug: bool = False,
) -> None:
super().__init__()
self.front_pipe, self.back_pipe = Pipe()
self.lock = Lock()
self.queue = queue
self.debug = debug
if device_type in [
"Telescope",
"Camera",
"CoverCalibrator",
"Dome",
"FilterWheel",
"Focuser",
"ObservingConditions",
"Rotator",
"SafetyMonitor",
"Switch",
]:
self.device = ALPACA_DEVICE_TYPES[device_type](ip, device_number)
else:
self.queue.put(
(
{
"ip": ip,
"device_type": device_type,
"device_number": device_number,
"device_name": device_name,
},
{
"type": "log",
"data": (
"warning",
f"{device_type} is not a valid device type",
),
},
)
)
self.ip = ip
self.device_number = device_number
self.device_type = device_type
self.device_name = device_name
self.metadata = {
"ip": ip,
"device_type": device_type,
"device_number": device_number,
"device_name": device_name,
}
self.connectable = connectable
self._poll_list: List[str] = []
self._poll_latest: Dict[str, Dict[str, Any]] = {}
self._poll_pause = False
# _poll_lock is created in run() because threading.Lock cannot be pickled
# for subprocess spawning on macOS
self._poll_lock: Optional[ThreadingLock] = None
# Cache for poll_latest in main process - allows non-blocking reads
# when get/set operations hold the lock
self._cached_poll_latest: Dict[str, Dict[str, Any]] = {}
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": ("info", f"{device_type} {device_name} loaded"),
},
)
)
## FRONTEND METHODS
[docs]
def get(self, method: str, **kwargs) -> Any:
"""Execute device method with automatic retry and error handling.
Args:
method (str): Name of the device method to execute.
**kwargs: Keyword arguments to pass to the method.
Returns:
Any: Result from the device method execution.
Raises:
Exception: If the device method execution fails.
"""
## method getter
with self.lock:
self.front_pipe.send(["get", {"method": method, **kwargs}])
msg = self.front_pipe.recv()
if isinstance(msg, Exception):
raise AlpacaDeviceError.from_device(
self, msg, method_name="get", method=method
)
else:
return msg
[docs]
def set(self, method: str, value: Any) -> Any:
"""Set device property value with error handling.
Args:
method (str): Name of the device property to set.
value (Any): Value to assign to the property.
Returns:
Any: Result from the property setter.
Raises:
Exception: If the property setting fails.
"""
## property setter
with self.lock:
self.front_pipe.send(["set", {"method": method, "value": value}])
msg = self.front_pipe.recv()
if isinstance(msg, Exception):
raise AlpacaDeviceError.from_device(
self, msg, method_name="set", method=method
)
else:
return msg
[docs]
def start_poll(self, method: str, delay: float) -> None:
"""Start continuous polling of a device method.
Args:
method (str): Name of the device method to poll.
delay (float): Polling interval in seconds.
"""
with self.lock:
self.front_pipe.send(["start_poll", {"method": method, "delay": delay}])
[docs]
def stop_poll(self, method: Optional[str] = None) -> None:
"""Stop polling for a specific method or all methods.
Args:
method (Optional[str]): Method name to stop polling, or None for all.
"""
with self.lock:
self.front_pipe.send(["stop_poll", {"method": method}])
[docs]
def pause_polls(self) -> None:
"""Temporarily pause all active polling operations."""
with self.lock:
self.front_pipe.send("pause_polls")
[docs]
def resume_polls(self) -> None:
"""Resume all paused polling operations."""
with self.lock:
self.front_pipe.send("resume_polls")
[docs]
def poll_list(self) -> List[str]:
"""Get list of currently active polling methods.
Returns:
List[str]: List of method names being polled.
Raises:
Exception: If polling list retrieval fails.
"""
with self.lock:
self.front_pipe.send("poll_list")
msg = self.front_pipe.recv()
if isinstance(msg, Exception):
raise AlpacaDeviceError.from_device(
device=self, exc=msg, method_name="poll_list"
)
else:
return msg
[docs]
def poll_latest(self) -> Dict[str, Dict[str, Any]]:
"""Get latest polling results for all active methods.
Uses non-blocking lock acquisition to avoid blocking the WebSocket
when get/set operations are in progress. If the lock is unavailable,
returns cached data from the last successful read.
Returns:
Dict[str, Dict[str, Any]]: Dictionary mapping method names to
their latest values and timestamps.
Returns None if cache is empty and lock
is unavailable.
"""
# Try non-blocking lock acquisition
acquired = self.lock.acquire(block=False)
if not acquired:
# Lock held by get/set - return cached data immediately
# This prevents WebSocket blocking during slow device operations
# Return None if cache is empty to match caller expectations
if not self._cached_poll_latest:
return None # type: ignore[return-value]
return dict(self._cached_poll_latest)
try:
self.front_pipe.send("poll_latest")
msg = self.front_pipe.recv()
if isinstance(msg, Exception):
# On error, still return cached data rather than raising
# to keep WebSocket responsive
if not self._cached_poll_latest:
return None # type: ignore[return-value]
return dict(self._cached_poll_latest)
else:
# Update cache with fresh data
self._cached_poll_latest = msg
return msg
finally:
self.lock.release()
[docs]
def stop(self) -> None:
"""Stop the device process and clean up resources."""
with self.lock:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"info",
f"AlpacaDevice {self.device_type} {self.device_number} stopping",
),
},
)
)
self.front_pipe.send("stop")
self.join()
## BACKEND CORE
[docs]
def run(self) -> None:
"""Main process loop for handling device operations.
Runs in the device subprocess to handle method calls, polling,
and inter-process communication. Sets up signal handlers for
graceful shutdown.
"""
# Create threading lock here because it cannot be pickled for subprocess spawning
self._poll_lock = ThreadingLock()
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"info",
f"AlpacaDevice {self.device_type} {self.device_number} started "
f"with pid [{os.getpid()}]",
),
},
)
)
self.active = True
signal.signal(signal.SIGINT, self.stop_poll__) # type: ignore
signal.signal(signal.SIGTERM, self.stop_poll__) # type: ignore
while self.active:
self.active = self.listenFront__()
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"info",
f"AlpacaDevice {self.device_type} {self.device_number} stopped",
),
},
)
)
[docs]
def listenFront__(self) -> bool:
"""Listen for and process messages from the main process.
Handles various message types including method calls, polling
control, and shutdown commands.
Returns:
bool: True to continue running, False to stop the process.
"""
try:
r = self.back_pipe.recv()
message = r[0] if len(r) == 2 else r
if message == "get":
self.get__(**r[1])
return True
elif message == "set":
self.set__(**r[1])
return True
elif message == "start_poll":
self.start_poll__(**r[1])
return True
elif message == "stop_poll":
self.stop_poll__(**r[1])
return True
elif message == "pause_polls":
self._poll_pause = True
return True
elif message == "resume_polls":
self._poll_pause = False
return True
elif message == "poll_list":
self.poll_list__()
return True
elif message == "poll_latest":
self.poll_latest__()
return True
elif message == "stop":
return False
else:
print("listenFront__ : unknown message", message)
return True
except OSError:
return False
## BACKEND METHODS
[docs]
def get__(
self, method: str, pipe: bool = True, **kwargs
) -> Union[Any, Dict[str, Any]]:
"""Backend method execution with retry logic and error handling.
Args:
method (str): Name of the device method to execute.
pipe (bool): Whether to send result via pipe or return directly.
**kwargs: Keyword arguments for the method.
Returns:
Union[Any, Dict[str, Any]]: Method result or status dictionary.
"""
## method getter
try:
# permit 3 attempts
data = "not get"
if self.debug:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"debug",
f"Getting method: {self.device_type}, {self.device_name}, {method}",
),
},
)
)
for _ in range(2):
try:
if data == "not get":
data = getattr(self.device, method)
# if kwargs, call method with kwargs
if kwargs:
if "no_kwargs" in kwargs:
data = data()
else:
data = data(**kwargs)
if self.debug:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"debug",
"Get method success: "
f"{self.device_type}, {self.device_name}, {method}",
),
},
)
)
except Exception as e:
time.sleep(0)
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"warning",
f"Get method failed with data {str(data)}: "
f"{self.device_type}, {self.device_name}, {method}, "
f"{str(e)}, trying again...",
),
},
)
)
time.sleep(1)
continue
time.sleep(0)
# final run. If error, caught by try/except
if data == "not get":
data = getattr(self.device, method)
# if kwargs, call method with kwargs
if kwargs:
if "no_kwargs" in kwargs:
data = data()
else:
data = data(**kwargs)
if self.debug:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"debug",
"Get method success: "
f"{self.device_type}, {self.device_name}, {method}",
),
},
)
)
time.sleep(0)
if pipe:
self.back_pipe.send(data) # check if valid, need args?
else:
return {"status": "success", "data": data, "message": ""}
except Exception as e:
if pipe:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"error",
f"Get method error with data {str(data)}: "
f"{self.device_type}, {self.device_name}, {method}, {str(e)}",
),
},
)
)
# send a plain Exception with the string message to avoid
# pickling issues when the original exception class isn't
# importable in the parent process (e.g. DriverException)
self.back_pipe.send(Exception(str(e)))
else:
return {
"status": "error",
"data": "null",
"message": f"Get method error: {str(e)}",
}
[docs]
def set__(self, method: str, value: Any) -> None:
"""Backend property setter with retry logic and error handling.
Args:
method (str): Name of the device property to set.
value (Any): Value to assign to the property.
"""
## property setter
try:
# permit 3 attempts
data = "not set"
if self.debug:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"debug",
f"Setting method: {self.device_type}, {self.device_name}, {method}",
),
},
)
)
for i in range(2):
try:
if data == "not set":
data = setattr(self.device, method, value)
if self.debug:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"debug",
f"Set method success: {self.device_type}, "
f"{self.device_name}, {method} with data {str(data)}",
),
},
)
)
except Exception as e:
time.sleep(0)
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"warning",
f"Set method failed with data {str(data)}: "
f"{self.device_type}, {self.device_name}, {method}, "
f"{str(e)}, trying again...",
),
},
)
)
time.sleep(1)
continue
time.sleep(0)
# final run. If error, caught by try/except
if data == "not set":
data = setattr(self.device, method, value)
if self.debug:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"debug",
f"Set method success: {self.device_type}, "
f"{self.device_name}, {method}, with data {str(data)}",
),
},
)
)
time.sleep(0)
self.back_pipe.send(data) # check if valid, need args?
except Exception as e:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"error",
f"Set method error: {self.device_type}, {self.device_name}, "
f"{method}, {str(e)}",
),
},
)
)
self.back_pipe.send(Exception(str(e)))
[docs]
def loop__(self, method: str, delay: float) -> None:
"""Continuous polling loop for a specific device method.
Runs in a separate thread to continuously poll a device method
at the specified interval, logging results to the database.
Args:
method (str): Name of the device method to poll.
delay (float): Polling interval in seconds.
"""
with self._poll_lock:
self._poll_list.append(method)
self._poll_latest[method] = {"value": None, "datetime": None}
try:
while True:
# Check if we should continue polling (under lock)
with self._poll_lock:
if method not in self._poll_list:
break
if not self._poll_pause:
get = self.get__(method, pipe=False)
if get["status"] == "success":
val = get["data"]
else:
time.sleep(1)
## try again, just in case...
get = self.get__(method, pipe=False)
if get["status"] == "success":
val = get["data"]
else:
raise ValueError(get)
time.sleep(0)
dt = datetime.now(UTC)
dt_str = dt.strftime("%Y-%m-%d %H:%M:%S.%f")
# Safely enqueue the polling result; if the queue is closed or
# otherwise unavailable, stop this poll thread gracefully instead
# of letting the exception crash the thread.
try:
self.queue.put(
(
self.metadata,
{
"type": "query",
"data": (
f"INSERT INTO polling VALUES "
f"('{self.device_type}', '{self.device_name}', "
f"'{method}', '{val}', '{dt_str}')"
),
},
)
)
except Exception as q_exc:
# Common failures include BrokenPipeError when the manager
# or parent process has exited. Record a minimal local
# failure state and stop polling this method.
dt = datetime.now(UTC)
with self._poll_lock:
if method in self._poll_latest:
self._poll_latest[method]["datetime"] = dt
self._poll_latest[method]["value"] = "null"
if method in self._poll_list:
self._poll_list.remove(method)
try:
print(
f"loop__: queue.put failed for {self.device_type} {self.device_name} {method}: {q_exc}"
)
except Exception:
# best effort to not raise from the exception handler
pass
break
with self._poll_lock:
if method in self._poll_latest:
self._poll_latest[method]["value"] = val
self._poll_latest[method]["datetime"] = dt
time.sleep(delay)
time.sleep(0)
except Exception as e:
dt = datetime.now(UTC)
with self._poll_lock:
if method in self._poll_latest:
self._poll_latest[method]["datetime"] = dt
self._poll_latest[method]["value"] = "null"
if method in self._poll_list:
self._poll_list.remove(method)
# try to enqueue an error log; if the queue is gone, fallback to
# printing and stop polling this method.
try:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"error",
f"Loop error: {self.device_type}, {self.device_name}, "
f"{method}, {str(e)}",
),
},
)
)
except Exception as q_exc:
try:
print(
f"loop__: failed to queue loop error for {self.device_type} {self.device_name} {method}: {q_exc}"
)
except Exception:
pass
[docs]
def start_poll__(self, method: str, delay: float) -> None:
"""Start a new polling thread for the specified method.
Args:
method (str): Name of the device method to start polling.
delay (float): Polling interval in seconds.
"""
if method not in self._poll_list:
Thread(target=self.loop__, args=(method, delay), daemon=True).start()
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"info",
f"{self.device_type}, {self.device_name}, {method} "
f"poll started with {delay} second cadence",
),
},
)
)
[docs]
def stop_poll__(self, method: Optional[str] = None, *args) -> None:
"""Stop polling for a specific method or all methods.
Args:
method (Optional[str]): Method name to stop polling, or None for all.
*args: Additional arguments (used for signal handlers).
"""
with self._poll_lock:
if method is None:
self._poll_list = []
self._poll_latest = {}
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"info",
f"{self.device_type}, {self.device_name}, all polls stopped",
),
},
)
)
elif method in self._poll_list:
self._poll_list = list(filter((method).__ne__, self._poll_list))
del self._poll_latest[method]
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"info",
f"{self.device_type}, {self.device_name}, {method} poll stopped."
f"{self._poll_list} left in poll list, and {self._poll_latest} "
"left in poll dict",
),
},
)
)
else:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"warning",
f"Stop poll error: {self.device_type}, {self.device_name}, "
f"{method} not in poll list.",
),
},
)
)
[docs]
def poll_list__(self) -> None:
"""Send current polling list via pipe."""
try:
with self._poll_lock:
# Send a copy to avoid iteration issues during pickling
snapshot = list(self._poll_list)
self.back_pipe.send(snapshot)
except Exception as e:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"error",
f"poll_list error: {self.device_type}, {self.device_name}, "
f"{str(e)}",
),
},
)
)
self.back_pipe.send(Exception(str(e)))
[docs]
def poll_latest__(self) -> None:
"""Send latest polling results via pipe."""
try:
with self._poll_lock:
# Send a deep copy to avoid iteration issues during pickling
snapshot = {k: dict(v) for k, v in self._poll_latest.items()}
self.back_pipe.send(snapshot)
except Exception as e:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"error",
f"poll_latest error: {self.device_type}, {self.device_name}, {str(e)}",
),
},
)
)
self.back_pipe.send(Exception(str(e)))
[docs]
def stop__(self, *args) -> None:
"""Stop the device process and close communication pipes.
Args:
*args: Additional arguments (used for signal handlers).
"""
self.active = False
# close pipes
self.front_pipe.close()
self.back_pipe.close()
[docs]
def force_poll(self, method: str, **kwargs) -> None:
"""Immediately poll a device method once and write the result to the database via the queue."""
try:
val = self.get(method, **kwargs)
dt = datetime.now(UTC)
dt_str = dt.strftime("%Y-%m-%d %H:%M:%S.%f")
self.queue.put(
(
self.metadata,
{
"type": "query",
"data": (
f"INSERT INTO polling VALUES "
f"('{self.device_type}', '{self.device_name}', "
f"'{method}', '{val}', '{dt_str}')"
),
},
)
)
except Exception as e:
self.queue.put(
(
self.metadata,
{
"type": "log",
"data": (
"error",
f"Force poll error: {self.device_type}, {self.device_name}, {method}, {str(e)}",
),
},
)
)