Source code for astra.action_configs

"""Action configuration dataclasses for observatory operations.

Key capabilities:
    - Define structured configurations for various observatory actions
    - Validate required fields and types for action parameters
    - Provide defaults from observatory configuration
    - Support dictionary-like access to action configuration fields
"""

import typing
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, ClassVar, List, Optional, Union

import astropy.units as u
import numpy as np
from astropy.coordinates import AltAz, Angle, EarthLocation, SkyCoord
from astropy.time import Time

from astra.config import Config, ObservatoryConfig


[docs] @dataclass class BaseActionConfig: """Base class for action configurations. This class serves as a base for specific action configurations, providing validation and dictionary-like access to its fields. It supports type validation, required fields, and merging with default values, centralizing common functionality for all action configurations and ensuring that the action values passed by the user are valid. Examples: >>> from astra.action_configs import AutofocusConfig >>> autofocus_config = AutofocusConfig(exptime=3.0) >>> exptime in autofocus_config True >>> autofocus_config['exptime'] 3.0 >>> autofocus_config.get('not_available') """ EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_value": {}, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", } def __post_init__(self): self.validate()
[docs] @classmethod def from_dict(cls, config_dict: dict, default_dict: dict = {}, logger=None): """Create an instance from a dictionary, merging with defaults.""" kwargs = cls.merge_config_dicts(config_dict, default_dict) if logger is not None: logger.debug(f"Extracting action values {kwargs} for {cls.__name__}") return cls(**kwargs)
[docs] @classmethod def defaults_from_observatory_config( cls, device_name: str, device_type: str = "Camera", observatory_config: object | None = None, ) -> dict: """ Retrieve default values for this action from the observatory configuration. Returns a dict suitable for passing as `default_dict` into from_dict. """ # lazy-import to avoid cycle at module import time oc = ( observatory_config if observatory_config is not None else Config().observatory_config ) if not isinstance(oc, ObservatoryConfig): return {} action_key = cls.__name__.lower().replace("config", "") try: if hasattr(oc, "get_device_config"): device_conf = oc.get_device_config(device_type, device_name) if isinstance(device_conf, dict): return device_conf.get(action_key, {}) or {} return {} except Exception: # Fall through to empty fallback if accessor fails return {} return {}
[docs] def validate(self): """Validate required fields and types of all fields. Raises: ValueError: If required fields are missing. TypeError: If any field has an incorrect type. """ missing = [] type_errors = [] for f in self.__dataclass_fields__.values(): val = getattr(self, f.name) # Check required fields if f.metadata.get("required") and val is None: missing.append(f.name) # Type validation for all fields err = self._validate_type(f) if err: type_errors.append(err) if missing: raise ValueError( f"Missing required fields: {missing} in {self.__class__.__name__}" ) if type_errors: raise TypeError(f"Type errors in fields: {type_errors}")
def _validate_type(self, f): val = getattr(self, f.name) expected_type = self.__annotations__.get(f.name) if val is None or expected_type is None: return None origin = typing.get_origin(expected_type) args = typing.get_args(expected_type) if expected_type is float and isinstance(val, int): return None # Handle Optional/Union types if origin is Union: allowed_types = [t for t in args if t is not type(None)] for t in allowed_types: t_origin = typing.get_origin(t) if t_origin: if isinstance(val, t_origin): break elif isinstance(val, t): break else: return self.format_type_error(f, allowed_types, type(val)) # Handle lists and tuples elif origin in (list, tuple): if not isinstance(val, origin): return self.format_type_error(f, origin, type(val)) elem_type = args[0] if args else None if elem_type: # Handle Union types inside lists (e.g., List[float | int]) elem_origin = typing.get_origin(elem_type) elem_args = typing.get_args(elem_type) if ( elem_origin is Union or isinstance(elem_type, type) and elem_type.__module__ == "types" and elem_type.__name__ == "UnionType" ): allowed_elem_types = tuple( t for t in elem_args if isinstance(t, type) ) elif elem_type in (float, int): allowed_elem_types = (float, int) else: allowed_elem_types = (elem_type,) for v in val: if not isinstance(v, allowed_elem_types): return self.format_type_error( f, allowed_elem_types, type(v), specifier="elements" ) # Handle dicts elif origin is dict: if not isinstance(val, dict): return self.format_type_error(f, dict, type(val)) key_type, value_type = args if len(args) == 2 else (None, None) if key_type: key_origin = typing.get_origin(key_type) for k in val.keys(): if key_origin: if not isinstance(k, key_origin): return self.format_type_error( f, key_origin, type(k), specifier="keys" ) elif not isinstance(k, key_type): return self.format_type_error( f, key_type, type(k), specifier="keys" ) if value_type: value_origin = typing.get_origin(value_type) for v in val.values(): if value_origin: if not isinstance(v, value_origin): return self.format_type_error( f, value_origin, type(v), specifier="values" ) elif not isinstance(v, value_type): return self.format_type_error( f, value_type, type(v), specifier="values" ) # Handle enums elif isinstance(expected_type, type) and issubclass(expected_type, Enum): if not isinstance(val, expected_type): return self.format_type_error(f, expected_type, type(val)) # Handle all other types (non-parameterized) elif isinstance(expected_type, type): if not isinstance(val, expected_type): return self.format_type_error(f, expected_type, type(val)) # Otherwise, skip type check return None
[docs] @staticmethod def format_type_error(f, expected_type, val, specifier=None): return ( f"{f.name}: " + (f"{specifier} " if specifier else "") + f"expected {expected_type}, got {val}" )
[docs] def get(self, key: str, default=None): """ Get attribute value by key with optional default. Args: key: Attribute name to retrieve. default: Value to return if attribute is not found. Returns: Attribute value or default if not found. """ return getattr(self, key, default)
def __getitem__(self, key: str): return getattr(self, key) def __setitem__(self, key: str, value): return setattr(self, key, value) def __contains__(self, key: str): return hasattr(self, key)
[docs] def keys(self) -> List[str]: """Return list of field names in the dataclass.""" return [ item for item in self.__dataclass_fields__.keys() if not item.startswith("_") ]
def __iter__(self): return iter(self.keys()) def __len__(self): return len(self.keys())
[docs] def validate_filters(self, filterwheel_names: dict[str, list[str]]) -> None: """Validate that filter(s) exist in the available filterwheels. Args: filterwheel_names: Dict mapping filterwheel device names to lists of filter names. e.g., {"fw1": ["Clear", "Red", "Green", "Blue"]} Raises: ValueError: If a filter is specified but doesn't exist in any filterwheel. """ # Get filter value from the config filter_value = self.get("filter") if filter_value is None or not filterwheel_names: return # No filter specified or no filterwheels available # Handle both single filter and list of filters filters_to_check = ( [filter_value] if isinstance(filter_value, str) else filter_value ) # Collect all available filter names from all filterwheels all_available_filters = set() for fw_filters in filterwheel_names.values(): all_available_filters.update(fw_filters) # Check each filter invalid_filters = [] for f in filters_to_check: if f not in all_available_filters: invalid_filters.append(f) if invalid_filters: raise ValueError( f"Filter(s) {invalid_filters} not found in available filters: " f"{sorted(all_available_filters)}" )
[docs] def validate_subframe(self) -> None: """Validate subframe parameters. Raises: ValueError: If subframe parameters are invalid. """ subframe_width = self.get("subframe_width") subframe_height = self.get("subframe_height") subframe_center_x = self.get("subframe_center_x", 0.5) subframe_center_y = self.get("subframe_center_y", 0.5) # Check dimensions are positive if specified if subframe_width is not None and subframe_width <= 0: raise ValueError(f"subframe_width must be positive, got {subframe_width}") if subframe_height is not None and subframe_height <= 0: raise ValueError(f"subframe_height must be positive, got {subframe_height}") # Check center coordinates are in valid range [0, 1] if not (0.0 <= subframe_center_x <= 1.0): # type: ignore raise ValueError( f"subframe_center_x must be between 0 and 1, got {subframe_center_x}" ) if not (0.0 <= subframe_center_y <= 1.0): # type: ignore raise ValueError( f"subframe_center_y must be between 0 and 1, got {subframe_center_y}" ) # If only one dimension is specified, require both if (subframe_width is None) != (subframe_height is None): raise ValueError( "Both subframe_width and subframe_height must be specified together. " f"Got: width={subframe_width}, height={subframe_height}" )
[docs] def validate_visibility( self, start_time: Time, end_time: Time, observatory_location: EarthLocation, min_altitude: float = 0.0, ): """Validate that the target is visible during the scheduled observation window. Checks target visibility at the beginning, middle, and end of the planned observation to ensure the target remains observable throughout. Only implemented for object actions; override in subclasses as needed. """ return None
[docs] def has_subframe(self) -> bool: """Check if subframing is enabled. Returns: True if subframe_width and subframe_height are specified, False otherwise. """ return ( self.get("subframe_width") is not None and self.get("subframe_height") is not None )
[docs] @classmethod def merge_config_dicts(cls, config_dict: dict, default_dict: dict) -> dict: """Merge default_dict and config_dict, keeping only keys in dataclass.""" if not isinstance(config_dict, dict): config_dict = {} if not isinstance(default_dict, dict): default_dict = {} return {k: v for k, v in default_dict.items() if k in cls.__annotations__} | { k: v for k, v in config_dict.items() if k in cls.__annotations__ }
[docs] def to_jsonable(self): def convert(val): if isinstance(val, Angle): return val.deg elif isinstance(val, SkyCoord): return {"ra": val.ra.deg, "dec": val.dec.deg} # type: ignore elif isinstance(val, Time): return val.isot elif isinstance(val, Enum): return val.value elif isinstance(val, dict): return {k: convert(v) for k, v in val.items()} elif isinstance(val, list): return [convert(v) for v in val] elif hasattr(val, "__dataclass_fields__"): return {k: convert(getattr(val, k)) for k in val.__dataclass_fields__} else: return val return convert(self)
[docs] @dataclass class OpenActionConfig(BaseActionConfig): """Open the observatory for observations. Steps: 1. Opens dome shutter 2. Unparks telescope 3. Cools camera """ EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "open", "action_value": {}, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", }
[docs] def validate(self): pass
[docs] @dataclass class CloseActionConfig(BaseActionConfig): """Close the observatory safely. Steps: 1. Stop any active guiding operations 2. Stop telescope slewing and tracking 3. Park the telescope 4. Park the dome and close shutter 5. Cools camera """ EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "close", "action_value": {}, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", }
[docs] def validate(self): pass
[docs] @dataclass class CompleteHeadersActionConfig(BaseActionConfig): """Complete FITS headers after exposures finish. Uses paired device polled data to fill in FITS header fields that were unavailable at exposure time. Automatically executed at the end of every schedule. """ EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "complete_headers", "action_value": {}, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", }
[docs] def validate(self): pass
[docs] @dataclass class CoolCameraActionConfig(BaseActionConfig): """Configuration for the ``cool_camera`` schedule action. Activates the camera cooler and sets the target temperature with specified tolerance and timeout from observatory configuration. """ EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "cool_camera", "action_value": {}, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", }
[docs] def validate(self): pass
[docs] @dataclass class ObjectActionConfig(BaseActionConfig): """Capture a sequence of light frames. Workflow: 1. Pre-sequence setup (pointing, filters, focus, binning, sub-framing, headers) - Observatory opens if not already done by a prior action if coordinates specified 2. Capture exposures in succession 3. Perform pointing correction if ``pointing=true`` 4. Start autoguiding if ``guiding=true`` 5. Stop exposures, guiding, and tracking at completion """ object: str = field(metadata={"required": True}) exptime: float = field(metadata={"required": True}) ra: Optional[float] = None dec: Optional[float] = None alt: Optional[float] = None az: Optional[float] = None lookup_name: Optional[str] = None filter: Optional[str] = None focus_shift: Optional[float] = None focus_position: Optional[float] = None n: Optional[int] = None guiding: bool = False pointing: bool = False bin: int = 1 dir: Optional[str] = None execute_parallel: bool = False disable_telescope_movement: bool = False reset_guiding_reference: bool = True subframe_width: Optional[int] = None subframe_height: Optional[int] = None subframe_center_x: float = 0.5 subframe_center_y: float = 0.5 FIELD_DESCRIPTIONS: ClassVar[dict[str, str]] = { "object": "Target name.", "exptime": "Exposure time per frame in seconds.", "ra": "Right Ascension to slew to", "dec": "Declination to slew to", "alt": "Altitude coordinate when issuing Alt/Az pointings.", "az": "Azimuth coordinate when issuing Alt/Az pointings.", "lookup_name": "Instead of specifying ra/dec or alt/az, use SIMBAD/Astropy to look up coordinates for celestial body to observe (e.g., 'mars', 'M31').", "filter": "Filter name to load before imaging.", "focus_shift": "Focus offset relative to the stored best focus.", "focus_position": "Absolute focus position override.", "n": "Number of exposures in the sequence. If not specified, defaults to infinite exposures until end_time.", "guiding": "Start autoguiding with Donuts before imaging.", "pointing": "Perform pointing correction with twirl before imaging.", "bin": "Camera binning factor.", "dir": "Base directory path for saving images.", "execute_parallel": "Execute action in parallel mode when supported.", "disable_telescope_movement": "Prevent any telescope motion during the sequence.", "reset_guiding_reference": "Acquire a fresh guiding reference frame at the start.", "subframe_width": "Width of the requested subframe in binned pixels.", "subframe_height": "Height of the requested subframe in binned pixels.", "subframe_center_x": "Horizontal location of the subframe center (0=left, 1=right).", "subframe_center_y": "Vertical location of the subframe center (0=top, 1=bottom).", } EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "object", "action_value": { "object": "M42", "exptime": 60.0, "ra": 83.82208, "dec": -5.39111, "filter": "V", "n": 3, "guiding": True, "pointing": True, }, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", }
[docs] def validate(self): missing = [] for f in self.__dataclass_fields__.values(): if f.metadata.get("required") and getattr(self, f.name) is None: missing.append(f.name) if missing: raise ValueError( f"Missing required fields: {missing} in {self.__class__.__name__}" ) # Coordinate system validation has_radec = self.ra is not None or self.dec is not None has_altaz = self.alt is not None or self.az is not None # Can't mix coordinate systems if has_radec and has_altaz: raise ValueError( "Cannot specify both RA/Dec and Alt/Az coordinates. " "Use either 'ra' and 'dec' OR 'alt' and 'az', not both." ) # Must provide complete coordinate pairs if (self.ra is not None and self.dec is None) or ( self.ra is None and self.dec is not None ): raise ValueError( f"Both 'ra' and 'dec' must be provided together. Got: ra={self.ra}, dec={self.dec}" ) if (self.alt is not None and self.az is None) or ( self.alt is None and self.az is not None ): raise ValueError( f"Both 'alt' and 'az' must be provided together. Got: alt={self.alt}, az={self.az}" ) # Subframe validation self.validate_subframe()
[docs] def validate_visibility( self, start_time: Time, end_time: Time, observatory_location: EarthLocation, min_altitude: float = 0.0, ) -> None: """Validate that the target is visible during the scheduled observation window. Checks target visibility at the beginning, middle, and end of the planned observation to ensure the target remains observable throughout. Args: start_time: Observation start time as astropy Time object end_time: Observation end time as astropy Time object observatory_location: Observatory location as EarthLocation object min_altitude: Minimum altitude in degrees for target to be considered visible (default: 0°) Raises: ValueError: If RA/Dec are not provided or if target is below minimum altitude at any of the three check points (start, middle, end) Note: If RA/Dec are not provided, attempts to resolve them from 'lookup_name' or 'alt'/'az' parameters using the start time. """ ra = self.ra dec = self.dec # Try to resolve coordinates if RA/Dec are not explicitly provided if ra is None or dec is None: if self.lookup_name is not None: from astra.utils import get_body_coordinates # Resolve body position at start time target_coord = get_body_coordinates( body_name=self.lookup_name, obs_time=start_time, obs_location=observatory_location, ) ra = target_coord.ra.deg dec = target_coord.dec.deg elif self.alt is not None and self.az is not None: # Convert Alt/Az to RA/Dec for proper visibility checking over time # We assume the telescope will track the RA/Dec coordinate corresponding # to this Alt/Az at the start time. altaz_coord = SkyCoord( alt=u.Quantity(self.alt, u.deg), az=u.Quantity(self.az, u.deg), frame=AltAz(obstime=start_time, location=observatory_location), ) radec_coord = altaz_coord.transform_to("icrs") ra = radec_coord.ra.deg dec = radec_coord.dec.deg # Only check visibility if we have valid coordinates if ra is None or dec is None: return # Create target coordinate target = SkyCoord( ra=u.Quantity(ra, "deg"), dec=u.Quantity(dec, "deg"), frame="icrs", ) # Check times: start, middle, end mid_time = Time( (start_time.unix + end_time.unix) / 2, format="unix", ) check_times = [ ("start", start_time), ("middle", mid_time), ("end", end_time), ] visibility_issues = [] for label, check_time in check_times: # Transform to horizontal coordinates altaz_frame = AltAz(obstime=check_time, location=observatory_location) target_altaz = target.transform_to(altaz_frame) altitude = target_altaz.alt.deg # type: ignore if altitude < min_altitude: # type: ignore visibility_issues.append( f"{label}: altitude {altitude:.1f}° (below {min_altitude:.1f}° limit)" ) if visibility_issues: raise ValueError( f"Target '{self.object}' at RA={ra:.2f}°, Dec={dec:.2f}° " f"is not visible during observation window:\n " + "\n ".join(visibility_issues) )
[docs] @dataclass class CalibrationActionConfig(BaseActionConfig): """Capture a sequence of calibration images (bias/dark).""" exptime: List[float] = field(default_factory=list, metadata={"required": True}) n: List[int] = field(default_factory=list, metadata={"required": True}) filter: Optional[str] = None dir: Optional[str] = None bin: int = 1 execute_parallel: bool = False subframe_width: Optional[int] = None subframe_height: Optional[int] = None subframe_center_x: float = 0.5 subframe_center_y: float = 0.5 FIELD_DESCRIPTIONS: ClassVar[dict[str, str]] = { "exptime": "Exposure times (seconds) to iterate.", "n": "Exposure counts aligned with each exposure time.", "filter": "Filter name to load before imaging.", "dir": "Base directory path for saving images.", "bin": "Camera binning factor.", "execute_parallel": "Execute action in parallel mode when supported.", "subframe_width": "Width of the requested subframe in binned pixels.", "subframe_height": "Height of the requested subframe in binned pixels.", "subframe_center_x": "Horizontal subframe center (0=left, 1=right).", "subframe_center_y": "Vertical subframe center (0=top, 1=bottom).", } EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "calibration", "action_value": {"exptime": [0.0, 5.0, 30.0], "n": [10, 5, 3]}, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", }
[docs] def validate(self): missing = [] for f in self.__dataclass_fields__.values(): if f.metadata.get("required") and ( getattr(self, f.name) is None or getattr(self, f.name) == [] ): missing.append(f.name) if missing: raise ValueError( f"Missing required fields: {missing} in {self.__class__.__name__}" ) # ensure exptime and n have the same length if len(self.exptime) != len(self.n): raise ValueError( f"'exptime' and 'n' must have the same length. Got: exptime={self.exptime}, n={self.n}" ) # Subframe validation self.validate_subframe()
[docs] @dataclass class FlatsActionConfig(BaseActionConfig): """Capture a sequence of sky flats as the sky brightness evolves. Steps: 1. Wait for Sun altitude between -1° and -12° 2. Point to a near-uniform patch of sky opposite the Sun - Opens observatory if not already done by a prior action 3. Capture exposures and re-position between frames 4. Iterate through requested filters while auto-adjusting exposure times """ filter: List[str] = field(default_factory=list, metadata={"required": True}) n: List[int] = field(default_factory=list, metadata={"required": True}) dir: Optional[str] = None bin: int = 1 execute_parallel: bool = False disable_telescope_movement: bool = False subframe_width: Optional[int] = None subframe_height: Optional[int] = None subframe_center_x: float = 0.5 subframe_center_y: float = 0.5 FIELD_DESCRIPTIONS: ClassVar[dict[str, str]] = { "filter": "Filters to iterate while capturing flats.", "n": "Number of flats to capture per filter.", "dir": "Base directory path for saving images.", "bin": "Camera binning factor.", "execute_parallel": "Execute action in parallel mode when supported.", "disable_telescope_movement": "Prevent telescope motion during the sequence.", "subframe_width": "Width of the requested subframe in binned pixels.", "subframe_height": "Height of the requested subframe in binned pixels.", "subframe_center_x": "Horizontal subframe center (0=left, 1=right).", "subframe_center_y": "Vertical subframe center (0=top, 1=bottom).", } EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "flats", "action_value": {"filter": ["V", "R"], "n": [10, 10]}, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", }
[docs] def validate(self): missing = [] for f in self.__dataclass_fields__.values(): if f.metadata.get("required") and ( getattr(self, f.name) is None or getattr(self, f.name) == [] ): missing.append(f.name) if missing: raise ValueError( f"Missing required fields: {missing} in {self.__class__.__name__}" ) # ensure filter and n have the same length if len(self.filter) != len(self.n): raise ValueError( f"'filter' and 'n' must have the same length. Got: filter={self.filter}, n={self.n}" ) # Subframe validation self.validate_subframe()
[docs] @dataclass class CalibrateGuidingActionConfig(BaseActionConfig): """Calibrate guiding parameters using timed guide pulses. Steps: 1. Slews telescope to RA = LST - 1 hour, Dec = 0° at the start of sequence - Opens observatory if not already done by a prior action 2. Issues a series of guide pulses in each cardinal direction with specified duration and settling time 3. Captures exposures after each pulse and measures star shifts to determine pixel-to-time scales and camera orientation relative to mount axes 4. Averages results over specified number of cycles 5. Saves calibration parameters in the observatory configuration for use in guiding """ filter: Optional[str] = None pulse_time: int = 5000 exptime: float = 1.0 settle_time: float = 1.0 number_of_cycles: int = 10 focus_shift: Optional[float] = None focus_position: Optional[float] = None bin: int = 1 subframe_width: Optional[int] = None subframe_height: Optional[int] = None subframe_center_x: float = 0.5 subframe_center_y: float = 0.5 FIELD_DESCRIPTIONS: ClassVar[dict[str, str]] = { "filter": "Filter to use during calibration.", "pulse_time": "Duration of guide pulses in milliseconds.", "exptime": "Exposure time for calibration images.", "settle_time": "Wait time after pulses before exposing.", "number_of_cycles": "How many calibration cycles to take average over.", "focus_shift": "Focus offset relative to best focus.", "focus_position": "Absolute focus position override.", "bin": "Camera binning factor.", "subframe_width": "Width of the requested subframe in binned pixels.", "subframe_height": "Height of the requested subframe in binned pixels.", "subframe_center_x": "Horizontal subframe center (0=left, 1=right).", "subframe_center_y": "Vertical subframe center (0=top, 1=bottom).", } EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "calibrate_guiding", "action_value": {}, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", }
[docs] def validate(self): self.validate_subframe()
[docs] @dataclass class PointingModelActionConfig(BaseActionConfig): """Aid building a telescope pointing model. Astra itself does not build or maintain a pointing model. Captures a spiral of points from zenith down to 30° altitude while avoiding positions within 20° of the Moon. Plate solves each pointing and sends SyncToCoordinates commands to the mount. The receipt of these commands can be used to build a pointing model in the mount control software. The action can be configured to use the local star catalog for plate solving to speed up the process if the online Gaia catalog is unavailable or slow. """ n: int = 50 exptime: float = 3.0 dark_subtraction: bool = False object: str = "Pointing Model" use_local_db: bool = False filter: Optional[str] = None focus_shift: Optional[float] = None focus_position: Optional[float] = None bin: int = 1 dir: Optional[str] = None subframe_width: Optional[int] = None subframe_height: Optional[int] = None subframe_center_x: float = 0.5 subframe_center_y: float = 0.5 FIELD_DESCRIPTIONS: ClassVar[dict[str, str]] = { "n": "Number of points to include in the model.", "exptime": "Exposure time for each pointing image.", "dark_subtraction": "Enable dark subtraction using previously taken calibration frames of same exposure time in the same date folder.", "object": "Descriptive label for the pointing run.", "use_local_db": "Use local star catalog database for plate solving (faster).", "filter": "Filter to use for exposures.", "focus_shift": "Focus offset relative to best focus.", "focus_position": "Absolute focus position override.", "bin": "Camera binning factor.", "dir": "Directory path for saving images.", "subframe_width": "Width of the requested subframe in binned pixels.", "subframe_height": "Height of the requested subframe in binned pixels.", "subframe_center_x": "Horizontal subframe center (0=left, 1=right).", "subframe_center_y": "Vertical subframe center (0=top, 1=bottom).", } EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "pointing_model", "action_value": {}, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", }
[docs] def validate(self): self.validate_subframe()
[docs] class SelectionMethod(Enum): SINGLE = "single" MAXIMAL = "maximal" ANY = "any"
[docs] @classmethod def from_string(cls, key: str, logger=None) -> "SelectionMethod": key = key.upper() if key in cls.__members__: return cls[key] if logger is not None: logger.warning(f"Unknown selection_method: {key}. Fall back to 'SINGLE'.") return cls.SINGLE
[docs] @dataclass class AutofocusCalibrationFieldConfig(BaseActionConfig): """Configuration for automated autofocus calibration field selection.""" maximal_zenith_angle: Optional[float | int | Angle] = None airmass_threshold: float = 1.01 g_mag_range: List[float | int] = field(default_factory=lambda: [0, 10]) j_mag_range: List[float | int] = field(default_factory=lambda: [0, 10]) fov_height: float | int = 0 fov_width: float | int = 0 selection_method: SelectionMethod | str = "single" use_gaia: bool = True observation_time: Optional[Time] = None maximal_number_of_stars: int = 100_000 ra: Optional[float | int] = None dec: Optional[float | int] = None _coordinates: Optional[SkyCoord] = None FIELD_DESCRIPTIONS: ClassVar[dict[str, str]] = { "maximal_zenith_angle": "Maximum zenith angle allowed when selecting autofocus fields.", "airmass_threshold": "Highest acceptable airmass for autofocus candidates.", "g_mag_range": "Inclusive Gaia G magnitude range to consider.", "j_mag_range": "Inclusive 2MASS J magnitude range to consider.", "fov_height": "Height of the field of view in degrees.", "fov_width": "Width of the field of view in degrees.", "selection_method": "Strategy for selecting stars (single, maximal, any).", "use_gaia": "Whether to rely on Gaia catalog sources.", "observation_time": "Observation time used when evaluating constraints.", "maximal_number_of_stars": "Maximum number of stars to query or consider.", "ra": "Fixed Right Ascension used to bypass automatic selection.", "dec": "Fixed Declination used to bypass automatic selection.", } def __post_init__(self): from astrafocus.targeting import find_airmass_threshold_crossover if self.maximal_zenith_angle is None: self.maximal_zenith_angle = Angle( find_airmass_threshold_crossover( airmass_threshold=self.airmass_threshold ) * 180 / np.pi, unit=u.deg, ) elif isinstance(self.maximal_zenith_angle, (float, int)): self.maximal_zenith_angle = Angle(self.maximal_zenith_angle, unit=u.deg) elif isinstance(self.maximal_zenith_angle, Angle): pass else: raise ValueError("maximal_zenith_angle must be of type float, int.") if not isinstance(self.selection_method, SelectionMethod): self.selection_method = SelectionMethod.from_string(self.selection_method) self.validate() @property def coordinates(self) -> SkyCoord: if self._coordinates is not None: return self._coordinates raise ValueError("Calibration coordinates have not been set.") @coordinates.setter def coordinates(self, value: SkyCoord) -> None: self._coordinates = value
[docs] @classmethod def from_dict( cls, config_dict: dict, logger=None, default_dict: dict = {} ) -> "AutofocusCalibrationFieldConfig": kwargs = cls.merge_config_dicts(config_dict, default_dict) if "selection_method" in kwargs and not isinstance( kwargs["selection_method"], SelectionMethod ): kwargs["selection_method"] = SelectionMethod.from_string( kwargs["selection_method"], logger=logger ) return cls(**kwargs)
[docs] @dataclass class AutofocusConfig(BaseActionConfig): """Perform an autofocus sweep to determine the optimal focus position. Steps: 1. Select a suitable autofocus field (or use provided coordinates) - Opens observatory if not already done by a prior action 2. Move the telescope if needed 3. Capture images at different focus positions 4. Measure star sharpness in each image 5. Fit a curve to determine optimal focus 6. Save plots/results and save the best focus position in the observatory configuration """ exptime: float | int = field(default=3.0) filter: Optional[str] = None bin: int = 1 reduce_exposure_time: bool = False search_range: Optional[List[int] | int] = None search_range_is_relative: bool = False n_steps: List[int] = field(default_factory=lambda: [30, 20]) n_exposures: List[int] | int = field(default_factory=lambda: [1, 1]) decrease_search_range: bool = True star_find_threshold: float | int = 5.0 fwhm: int = 8 percent_to_cut: int = 60 focus_measure_operator: str = "HFR" save: bool = True extremum_estimator: str = "LOWESS" extremum_estimator_kwargs: dict[str, Any] = field(default_factory=dict) secondary_focus_measure_operators: List[str] = field( default_factory=lambda: [ "fft", "normalized_variance", "tenengrad", ] ) calibration_field: AutofocusCalibrationFieldConfig = field( default_factory=AutofocusCalibrationFieldConfig, metadata={"required": True, "flatten": True}, ) save_path: Optional[Path] = None subframe_width: Optional[int] = None subframe_height: Optional[int] = None subframe_center_x: float = 0.5 subframe_center_y: float = 0.5 _focus_measure_operator = None _secondary_focus_measure_operators = {} FIELD_DESCRIPTIONS: ClassVar[dict[str, str]] = { "exptime": "Exposure time for focus frames in seconds.", "filter": "Filter to use during autofocus procedure.", "bin": "Camera binning factor.", "search_range": "Range of focus positions to search. Accepts a single width or explicit bounds.", "search_range_is_relative": "Interpret search_range relative to the current focus position.", "n_steps": "Number of steps for each sweep.", "n_exposures": ( "Number of exposures at each focus position or an array specifying exposures for each sweep. " "If an integer is given, the same number of exposures is used for each sweep. " "If an array is given, the length of the array must match the number of sweeps. " ), "decrease_search_range": "Reduce the search range after each sweep.", "star_find_threshold": "DAOStarFinder threshold for star detection.", "fwhm": "DAOStarFinder FWHM of the Gaussian kernel in pixels.", "percent_to_cut": "Percentage of worst-performing focus samples to drop when shrinking the range.", "focus_measure_operator": "Focus metric to optimize (e.g., hfr, gauss, tenengrad, fft, normalized_variance).", "reduce_exposure_time": "Automatically shorten exposures to prevent saturation.", "save": "Persist the optimal focus position back into observatory configuration.", "extremum_estimator": "Curve-fitting method used to determine the minimum (LOWESS, medianfilter, spline, rbf).", "extremum_estimator_kwargs": "Additional keyword overrides for the extremum estimator.", "secondary_focus_measure_operators": "Additional focus metrics to compute for diagnostics.", "save_path": "Directory override for saving autofocus results.", "subframe_width": "Width of the requested subframe in binned pixels.", "subframe_height": "Height of the requested subframe in binned pixels.", "subframe_center_x": "Horizontal subframe center (0=left, 1=right).", "subframe_center_y": "Vertical subframe center (0=top, 1=bottom).", } EXAMPLE_SCHEDULE: ClassVar[dict] = { "device_name": "camera_name", "action_type": "autofocus", "action_value": { "exptime": 1.0, "filter": "V", "search_range_is_relative": True, "search_range": 1000, "n_steps": [30, 20], "n_exposures": [1, 1], }, "start_time": "2025-01-01 00:00:00.000", "end_time": "2025-02-01 00:00:00.000", } def __post_init__(self) -> None: from astrafocus import FocusMeasureOperatorRegistry # Store operator classes, not instances, to avoid premature initialization self._secondary_focus_measure_operators = { FocusMeasureOperatorRegistry.get( key ).name: FocusMeasureOperatorRegistry.get(key) for key in self.secondary_focus_measure_operators if key in FocusMeasureOperatorRegistry.list() } self._focus_measure_operator = FocusMeasureOperatorRegistry.from_name( self.focus_measure_operator ) self.validate() # Validate subframe after base validation self.validate_subframe()
[docs] @classmethod def from_dict( cls, config_dict: dict, logger=None, default_dict: dict = {} ) -> "AutofocusConfig": autofocus_calibration_field = AutofocusCalibrationFieldConfig.from_dict( config_dict, logger=logger, default_dict=default_dict, ) kwargs = cls.merge_config_dicts(config_dict, default_dict) kwargs["calibration_field"] = autofocus_calibration_field if len(kwargs["n_exposures"]) != len(kwargs["n_steps"]): if logger is not None: logger.warning( "'n_exposures' length does not match 'n_steps' length. " "Defaulting to 1 exposure per step." ) kwargs["n_exposures"] = [1] * len(kwargs["n_steps"]) return cls(**kwargs)
@property def focus_measure_operator_kwargs(self) -> dict: return { "star_find_threshold": self.star_find_threshold, "fwhm": self.fwhm, } @property def focus_measure_operator_name(self) -> str: return ( self._focus_measure_operator.name if self._focus_measure_operator else "Unknown" )
ACTION_CONFIGS = { "object": ObjectActionConfig, "calibration": CalibrationActionConfig, "flats": FlatsActionConfig, "calibrate_guiding": CalibrateGuidingActionConfig, "autofocus": AutofocusConfig, "pointing_model": PointingModelActionConfig, "open": OpenActionConfig, "close": CloseActionConfig, "cool_camera": CoolCameraActionConfig, "complete_headers": CompleteHeadersActionConfig, }