Source code for astra.calibrate_guiding

"""Guiding calibration system for telescope autoguiding setup.

This module provides automated calibration of telescope guiding systems by
measuring pixel-to-time scales and determining camera orientation relative
to telescope mount axes. It performs systematic nudges in cardinal directions
and analyzes the resulting star field shifts to create calibration parameters.

Classes:
    CustomImageClass: Enhanced image processing with background subtraction
    GuidingCalibrator: Main calibration orchestrator for guiding systems
"""

import time
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, Tuple

import numpy as np
from alpaca.telescope import GuideDirections
from donuts import Donuts
from ruamel.yaml import YAML

import astra
from astra.config import Config
from astra.image_handler import ImageHandler
from astra.paired_devices import PairedDevices
from astra.scheduler import Action
from astra.utils import CustomImageClass


[docs] class GuidingCalibrator: """Automated telescope guiding calibration system. Orchestrates the complete guiding calibration process by systematically pulsing the telescope mount in cardinal directions and measuring the resulting star field shifts to determine pixel-to-time scales and camera orientation relative to mount axes. Attributes: astra_observatory: Observatory instance for device control. action: Action instance containing calibration information. paired_devices: Dictionary of paired device names. hdr: FITS header data for images. save_path: Directory for saving calibration data and images. pulse_time: Duration of guide pulses in milliseconds. exptime: Exposure time for calibration images. settle_time: Wait time after pulses before exposing. number_of_cycles: Number of calibration cycles to perform. """ def __init__( self, astra_observatory: "astra.observatory.Observatory", # type: ignore action: Action, paired_devices: Dict[str, str], image_handler: ImageHandler, save_path: Path | None = None, pulse_time: float = 5000, exptime: float = 5, settle_time: float = 10, number_of_cycles: int = 10, ): self.astra_observatory = astra_observatory self.action = action self.paired_devices = paired_devices self.image_handler = image_handler self.image_handler.image_directory = ( save_path if save_path is not None else (Config().paths.images) ) self.pulse_time = action.action_value.get("pulse_time", pulse_time) self.exptime = action.action_value.get("exptime", exptime) self.settle_time = action.action_value.get("settle_time", settle_time) self.number_of_cycles = action.action_value.get( "number_of_cycles", number_of_cycles ) self._directions = defaultdict(list) self._scales = defaultdict(list) self._calibration_config = {} self._camera = astra_observatory.devices["Camera"][action.device_name] self._telescope = astra_observatory.devices["Telescope"][ paired_devices["Telescope"] ] self.image_handler.image_directory.mkdir(parents=True, exist_ok=True)
[docs] def run(self) -> None: """Execute complete guiding calibration sequence. Performs telescope slewing, calibration cycles, configuration completion, and saves results to observatory configuration. """ self.slew_telescope_one_hour_east_of_sidereal_meridian() success = self.perform_calibration_cycles() if success: self.complete_calibration_config() self.save_calibration_config() self.update_observatory_config()
[docs] def slew_telescope_one_hour_east_of_sidereal_meridian(self) -> None: """Position telescope one hour east of meridian for calibration. Slews telescope to RA = LST - 1 hour, Dec = 0 degrees to provide optimal conditions for guiding calibration with good star tracking and minimal atmospheric effects. Raises: ValueError: If telescope slewing fails. """ local_sidereal_time = self._telescope.get("SiderealTime") target_right_ascension = local_sidereal_time - 1 # Normalize RA to 0-24 hours if target_right_ascension < 0: target_right_ascension += 24 elif target_right_ascension >= 24: target_right_ascension -= 24 self.astra_observatory.logger.info( f"Local sidereal time: {local_sidereal_time:.2f} hours. " f"Slewing one hour east to: RA = {target_right_ascension:.2f} hours, " "Dec = 0 degrees..." ) try: self._telescope.get( "SlewToCoordinatesAsync", RightAscension=target_right_ascension, Declination=0, ) time.sleep(1) # Wait for slew to finish self.astra_observatory.wait_for_slew(self.paired_devices) except Exception as e: raise ValueError(f"Failed to slew telescope: {e}")
[docs] def perform_calibration_cycles(self) -> None: """Execute systematic guiding calibration cycles. Performs multiple cycles of telescope nudges in North, South, East, and West directions, measuring star field shifts to determine pixel scales and camera orientation. Each cycle improves measurement accuracy. """ success = False image_path = self._perform_exposure() if image_path is None: return success donuts_ref = self._apply_donuts(image_path) for i in range(self.number_of_cycles): self.astra_observatory.logger.info( f"=== Starting cycle {i + 1} of {self.number_of_cycles} ===" ) for direction in [ GuideDirections.guideNorth, GuideDirections.guideSouth, GuideDirections.guideEast, GuideDirections.guideWest, ]: # Nudging to determine the scale and orientation of the camera self._pulse_guide_telescope(direction, self.pulse_time) image_path = self._perform_exposure() if image_path is None: return success shift = donuts_ref.measure_shift(image_path) direction_literal, magnitude = self._determine_shift_direction( shift, self.astra_observatory.logger ) direction_name = direction.name.removeprefix( "guide" ) # North, South, East, West self._directions[direction_name].append(direction_literal) self._scales[direction_name].append(magnitude) self.astra_observatory.logger.info( f"Shift {direction_name} results in direction {direction_literal} " f"of {magnitude:.2f} pixels (raw: x={shift.x.value:.2f}, y={shift.y.value:.2f})" ) donuts_ref = self._apply_donuts(image_path) self.astra_observatory.logger.info("Calibration cycles complete.") self.astra_observatory.logger.info( f"Directions: {str(self._directions)}; Scales: {str(self._scales)}" ) success = True return success
[docs] def complete_calibration_config(self) -> None: """Generate final calibration configuration from measurements. Processes collected direction and scale measurements to create PIX2TIME conversion factors, determine RA axis orientation, and validate measurement consistency across cycles. Raises: ValueError: If direction measurements are inconsistent across cycles. """ calibration_config = { "PIX2TIME": {"+x": None, "-x": None, "+y": None, "-y": None}, "RA_AXIS": None, "DIRECTIONS": {"+x": None, "-x": None, "+y": None, "-y": None}, } self.astra_observatory.logger.info("Checking directions...") # Validate that we have all four cardinal directions detected_directions = set() for direction_name in self._directions: # Check that the directions are the same every time for each orientation if len(set(self._directions[direction_name])) != 1: raise ValueError( "Directions must be the same across all cycles. " f"Direction number {direction_name} has {self._directions[direction_name]}." ) direction_literal = self._directions[direction_name][0] detected_directions.add(direction_literal) if direction_name == "East": calibration_config["RA_AXIS"] = "x" if "x" in direction_literal else "y" calibration_config["PIX2TIME"][direction_literal] = float( self.pulse_time / np.average(self._scales[direction_name]) ) calibration_config["DIRECTIONS"][direction_literal] = direction_name # Validate that we detected movements on both axes has_x_axis = any("x" in d for d in detected_directions) has_y_axis = any("y" in d for d in detected_directions) if not (has_x_axis and has_y_axis): self.astra_observatory.logger.error( f"Calibration failed: Only detected movements on one axis. " f"Detected directions: {detected_directions}. " f"North/South/East/West mappings: {dict(self._directions)}" ) raise ValueError( f"Calibration error: movements detected on only one camera axis. " f"Expected movements on both x and y axes, but got: {detected_directions}. " f"This suggests an issue with camera orientation, mount behavior, or shift detection. " f"Check the raw shift values in the logs." ) self.astra_observatory.logger.info("Directions are consistent") self._calibration_config.update(calibration_config)
[docs] def save_calibration_config(self) -> None: """Save calibration configuration to YAML file with nice formatting. Uses ruamel.yaml to create readable output with proper indentation, preserved structure, and better formatting for nested dictionaries. """ output_path = self.image_handler.image_directory / "calibration_config.yaml" yaml_writer = YAML() yaml_writer.default_flow_style = False yaml_writer.preserve_quotes = True yaml_writer.indent(mapping=2, sequence=2, offset=0) yaml_writer.width = 4096 # Prevent line wrapping with open(output_path, "w") as file: yaml_writer.dump(self._calibration_config, file) self.astra_observatory.logger.info(f"Calibration config saved to {output_path}")
[docs] def update_observatory_config(self) -> None: """Update observatory configuration with calibration results. Integrates the calculated calibration parameters into the observatory configuration file for the specific camera being calibrated. """ paired_devices = PairedDevices.from_observatory( observatory=self.astra_observatory, camera_name=self.action.device_name, ) telescope_config = paired_devices.get_device_config("Telescope") telescope_config["guider"].update(self._calibration_config) paired_devices.observatory_config.save() self.astra_observatory.logger.info("Observatory config updated.")
[docs] @staticmethod def _determine_shift_direction(shift: Any, logger: Any = None) -> Tuple[str, float]: """Analyze donuts shift measurement to determine direction and magnitude. Processes shift measurements to identify the primary axis of movement and calculate the pixel displacement magnitude for calibration. Args: shift (Any): Donuts shift measurement object with x and y value attributes. logger (Any): Optional logger for debug output. Returns: Tuple[str, float]: Direction literal ('+x', '-x', '+y', '-y') and pixel displacement magnitude. """ sx = shift.x.value sy = shift.y.value if logger: logger.debug(f"Raw shift values: sx={sx:.4f}, sy={sy:.4f}") if abs(sx) > abs(sy): if sx > 0: direction_literal = "-x" else: direction_literal = "+x" magnitude = abs(sx) else: if sy > 0: direction_literal = "-y" else: direction_literal = "+y" magnitude = abs(sy) return direction_literal, magnitude
[docs] def _pulse_guide_telescope( self, guide_direction: GuideDirections, duration: float ) -> None: """Execute telescope guide pulse in specified direction. Sends guide pulse command to telescope mount and waits for completion. Logs telescope position after pulse for verification. Args: guide_direction (GuideDirections): Cardinal direction for guide pulse from GuideDirections enum. duration (float): Pulse duration in milliseconds. Raises: ValueError: If guide direction is invalid. """ if guide_direction not in GuideDirections: raise ValueError("Invalid direction") self.astra_observatory.logger.info( f"Pulse guiding {guide_direction.name} for {duration} ms" ) self._telescope.get("PulseGuide")(guide_direction, duration) while self._telescope.get("IsPulseGuiding"): self.astra_observatory.logger.debug("Pulse guiding...") time.sleep(0.1) while self._telescope.get("Slewing"): self.astra_observatory.logger.debug("Slewing...") time.sleep(0.1) ra = (self._telescope.get("RightAscension") / 24) * 360 dec = self._telescope.get("Declination") self.astra_observatory.logger.info(f"RA: {ra:.8f} deg, DEC: {dec:.8f} deg")
[docs] @staticmethod def _apply_donuts(image_path: Path) -> Donuts: """Create Donuts instance for image shift measurement. Configures Donuts with custom image processing for accurate star shift detection during guiding calibration. Args: image_path (Path): Path object pointing to FITS image file. Returns: Donuts: Configured Donuts instance for shift measurements. """ return Donuts( image_path, normalise=False, subtract_bkg=False, downweight_edges=False, image_class=CustomImageClass, )
[docs] def _perform_exposure(self) -> Path: """Capture calibration image with specified parameters. Waits for telescope settling, then captures image using configured exposure time and saves to calibration directory. Returns: Path: Path to captured FITS image file. Raises: ValueError: If image exposure fails. """ self.astra_observatory.logger.info(f"Waiting {self.settle_time} s to settle...") time.sleep(self.settle_time) success, file_path = self.astra_observatory.perform_exposure( camera=self._camera, exptime=self.exptime, maxadu=self._camera.get("MaxADU"), action=self.action, use_light=True, log_option=None, maximal_sleep_time=0.1, wcs=None, ) if not success: return None return file_path