Source code for astra.autofocus

"""Automated telescope autofocus system for astronomical observations.

This module provides a comprehensive autofocus system for astronomical telescopes,
integrating with ALPACA devices and the astrafocus library. It supports various
focus measurement operators, automatic target selection using Gaia catalog data,
and sophisticated focusing algorithms for optimal image quality.

Key components:
- Camera, focuser, and telescope interfaces for ALPACA devices
- Automated target selection using Gaia star catalog
- Multiple focus measurement algorithms (HFR, FFT, variance-based)
- Defocusing and refocusing capabilities
- Comprehensive logging and result visualization

Classes:
    AstraCamera: Camera interface for autofocus operations
    AstraFocuser: Focuser interface with position control
    AstraTelescope: Telescope interface for positioning
    AstraAutofocusDeviceManager: Device coordination for autofocus
    Defocuser: Controlled defocusing and refocusing operations
    Autofocuser: Main autofocus orchestrator with target selection
"""

import logging
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Union

import astropy.io.fits as fits
import astropy.units as u
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from astrafocus import ExtremumEstimatorRegistry, FocusMeasureOperatorRegistry
from astrafocus.autofocuser import (
    AnalyticResponseAutofocuser,
    NonParametricResponseAutofocuser,
)
from astrafocus.interface import (
    AutofocusDeviceManager,
    CameraInterface,
    FocuserInterface,
    TelescopeInterface,
)
from astrafocus.star_size_focus_measure_operators import StarSizeFocusMeasure
from astrafocus.targeting import (
    ZenithNeighbourhoodQuery,
)
from astropy.coordinates import AltAz, Angle, SkyCoord
from astropy.time import Time
from scipy import ndimage
from scipy.ndimage import median_filter

import astra
from astra.action_configs import AutofocusConfig, SelectionMethod
from astra.alpaca_device_process import AlpacaDevice
from astra.config import Config
from astra.logger import DatabaseLoggingHandler
from astra.paired_devices import PairedDevices
from astra.scheduler import Action

__all__ = ["Autofocuser"]


class AstraCamera(CameraInterface):
    """Camera interface for autofocus operations with ALPACA devices.

    Provides image acquisition capabilities for autofocus sequences,
    including exposure control, hot pixel removal, and data type handling.

    Args:
        observatory: Observatory instance for device control and logging.
        alpaca_device_camera (AlpacaDevice): ALPACA camera device instance.
        action (Action): Action configuration for the camera.
        maxadu: Maximum ADU value for camera, auto-detected if None.
    """

    def __init__(
        self,
        observatory: Any,
        alpaca_device_camera: AlpacaDevice,
        action: Action,
        maxadu: Optional[int] = None,
    ) -> None:
        self.observatory = observatory
        self.alpaca_device_camera = alpaca_device_camera

        self.action = action
        self.success = True

        self.maxadu = (
            maxadu if maxadu is not None else alpaca_device_camera.get("MaxADU")
        )

        super().__init__()

    def perform_exposure(
        self, texp: float, log_option: Optional[str] = None, use_light: bool = True
    ) -> np.ndarray:
        """Capture image with specified exposure time and settings.

        Args:
            texp (float): Exposure time in seconds.
            log_option (Optional[str]): Logging option for the exposure.
            use_light (bool): Whether to use light frame (vs bias frame).

        Returns:
            np.ndarray: Captured image data as numpy array.
        """
        exposure_successful, filepath = self._perform_exposure(
            texp=texp, log_option=log_option, use_light=use_light
        )
        if exposure_successful:
            with fits.open(filepath) as hdul:
                image = hdul[0].data  # type: ignore

            # image = self.remove_hot_pixels(image, kernel_size=5)
        else:
            image = np.array([])

        return image

    @staticmethod
    def remove_hot_pixels(
        image: np.ndarray, threshold: float = 5, kernel_size: int = 3
    ) -> np.ndarray:
        """Remove hot pixels from image using median filtering.

        Identifies and replaces hot pixels by comparing with a smoothed version
        of the image using median filtering.

        Args:
            image (np.ndarray): 2D image array to process.
            threshold (float): Threshold factor to identify hot pixels.
            kernel_size (int): Size of median filter kernel.

        Returns:
            np.ndarray: Image with hot pixels replaced by smoothed values.
        """
        # Create a copy of the image to avoid modifying the original
        cleaned_image = np.copy(image)

        # Create a smoothed version of the image using a median filter
        smoothed_image = median_filter(cleaned_image, size=kernel_size)

        # Calculate the difference between the original and smoothed images
        difference = cleaned_image - smoothed_image

        # Identify hot pixels based on the difference
        hot_pixels = difference > (threshold * np.std(difference))

        # Replace hot pixels with the smoothed value
        cleaned_image[hot_pixels] = smoothed_image[hot_pixels]

        return cleaned_image

    def _perform_exposure(
        self, texp: float, log_option: Optional[str] = None, use_light: bool = True
    ) -> tuple[bool, Path | None]:
        """Internal method to perform camera exposure.

        Handles the actual camera exposure operation and updates success status.

        Args:
            texp (float): Exposure time in seconds.
            log_option (Optional[str]): Logging option for the exposure.
            use_light (bool): Whether to use light frame (vs bias frame).
        """
        exposure_successful, filepath = self.observatory.perform_exposure(
            camera=self.alpaca_device_camera,
            exptime=texp,
            maxadu=self.maxadu,
            action=self.action,
            use_light=use_light,
            log_option=log_option,
        )
        self.success = exposure_successful

        if not self.success:
            self.success = False
            self.observatory.logger.warning("Exposure failed.")
            # raise ValueError("Exposure failed.")

        return exposure_successful, filepath


class AstraFocuser(FocuserInterface):
    """Focuser interface for Astra observatory automation system.

    Provides position control interface for telescope focuser through ALPACA protocol.

    Args:
        observatory (astra.Observatory): Main Astra instance for system coordination.
        alpaca_device_focuser (AlpacaDevice): ALPACA focuser device interface.
        action (Action): Action configuration for the focuser.
    """

    def __init__(
        self,
        observatory: "astra.observatory.Observatory",
        alpaca_device_focuser: AlpacaDevice,
        action: Optional[Action] = None,
        settle_time: int = 3,
    ) -> None:
        if not alpaca_device_focuser.get("Absolute"):
            raise ValueError("Focuser must be absolute for autofocusing to work.")

        self.observatory = observatory
        self.action = action
        self.alpaca_device_focuser = alpaca_device_focuser
        self.settle_time = settle_time

        current_position = self.get_current_position()
        allowed_range = (0, alpaca_device_focuser.get("MaxStep"))
        super().__init__(current_position=current_position, allowed_range=allowed_range)

    def move_focuser_to_position(
        self, new_position: int, hard_timeout: float = 120
    ) -> None:
        """Move focuser to specified position with timeout protection.

        Moves the focuser to the target position and waits for completion.
        Includes range checking and timeout protection.

        Args:
            new_position (int): Target focuser position.
            hard_timeout (float): Maximum time to wait for movement in seconds.

        Raises:
            TimeoutError: If movement takes longer than hard_timeout.
        """
        new_position = self._project_to_allowed_range(new_position)

        self.alpaca_device_focuser.get("Move", Position=new_position)
        start_time = time.time()
        while self.alpaca_device_focuser.get("IsMoving"):
            if time.time() - start_time > hard_timeout:
                raise TimeoutError("Slew timeout")
            time.sleep(0.1)

        time.sleep(self.settle_time)
        return None

    def get_current_position(self) -> int:
        """Get the current focuser position.

        Returns:
            int: Current position of the focuser.
        """
        return self.alpaca_device_focuser.get("Position")

    def _project_to_allowed_range(self, new_position: int) -> int:
        """Project position to the allowed focuser range.

        Ensures the requested position is within the focuser's allowed range,
        adjusting to boundaries if necessary with warning messages.

        Args:
            new_position (int): Requested focuser position.

        Returns:
            int: Position adjusted to allowed range.
        """
        if self.allowed_range[0] is not None and new_position < self.allowed_range[0]:
            new_position = self.allowed_range[0]
            self.observatory.logger.warning(
                f"Requested focuser position {new_position} is below the allowed range. "
                f"Moving focuser to {self.allowed_range[0]} instead."
            )
        if self.allowed_range[1] is not None and new_position > self.allowed_range[1]:
            new_position = self.allowed_range[1]
            self.observatory.logger.warning(
                f"Requested focuser position {new_position} is above the allowed range. "
                f"Moving focuser to MaxStep {self.allowed_range[1]} instead."
            )

        return new_position


class AstraTelescope(TelescopeInterface):
    """Telescope interface for Astra observatory automation system.

    Provides coordinate control and pointing interface for telescope through ALPACA protocol.

    Args:
        observatory (astra.observatory.Observatory): Observatory instance.
        alpaca_device_telescope (AlpacaDevice): ALPACA telescope device interface.
        action (Action): Action configuration for the telescope.
    """

    def __init__(
        self,
        observatory: "astra.observatory.Observatory",
        alpaca_device_telescope: AlpacaDevice,
        action: Action,
    ) -> None:
        self.observatory = observatory
        self.alpaca_device_telescope = alpaca_device_telescope

        self.action = action
        super().__init__()

    def set_telescope_position(
        self, coordinates: SkyCoord, hard_timeout: float = 120
    ) -> None:
        """Move telescope to specified celestial coordinates.

        Args:
            coordinates (SkyCoord): Target celestial coordinates.
            hard_timeout (float): Maximum time to wait for slew completion in seconds.
        """
        self.alpaca_device_telescope.get(
            "SlewToCoordinatesAsync",
            RightAscension=coordinates.ra.hour,
            Declination=coordinates.dec.deg,
        )

        # Wait for slew to finish
        start_time = time.time()
        while self.alpaca_device_telescope.get("Slewing"):
            if time.time() - start_time > hard_timeout:
                raise TimeoutError("Slew timeout")
            if not self.observatory.check_conditions(self.action):
                break

            time.sleep(1)


class AstraAutofocusDeviceManager(AutofocusDeviceManager):
    """Device manager for Astra autofocus operations.

    Coordinates camera, focuser, and telescope for automated focusing operations
    using the astrafocus library framework.

    Args:
        observatory (astra.observatory.Observatory): Observatory instance.
        action_value (dict): Configuration values for autofocus action.
        action (Action): Action configuration for the autofocus.
        astra_camera (AstraCamera): Camera interface for image capture.
        astra_focuser (AstraFocuser): Focuser interface for position control.
        astra_telescope (Optional[AstraTelescope]): Telescope interface for positioning.
    """

    def __init__(
        self,
        observatory: "astra.observatory.Observatory",
        action_value: dict,
        action: Action,
        astra_camera: AstraCamera,
        astra_focuser: AstraFocuser,
        astra_telescope: Optional[AstraTelescope] = None,
    ) -> None:
        self.observatory = observatory
        self.action_value = action_value
        self.action = action
        super().__init__(
            camera=astra_camera, focuser=astra_focuser, telescope=astra_telescope
        )

    @classmethod
    def from_action(
        cls,
        observatory: "astra.observatory.Observatory",
        action: Action,
        paired_devices: PairedDevices,
    ) -> "AstraAutofocusDeviceManager":
        """Create device manager from the scheduled action.

        Factory method to construct an autofocus device manager from
        observatory configuration and device mappings.

        Args:
            observatory (astra.observatory.Observatory): Astra Observatory instance.
            action (Action): Scheduled autofocus action.
            paired_devices (PairedDevices): Device manager instance.

        Returns:
            AstraAutofocusDeviceManager: Configured device manager instance.
        """
        action_value = action.action_value

        alpaca_device_camera = observatory.devices["Camera"][action.device_name]
        alpaca_device_focuser = observatory.devices["Focuser"][
            paired_devices["Focuser"]
        ]
        alpaca_device_telescope = observatory.devices["Telescope"][
            paired_devices["Telescope"]
        ]

        astra_camera = AstraCamera(
            observatory,
            alpaca_device_camera=alpaca_device_camera,
            action=action,
        )
        astra_focuser = AstraFocuser(
            observatory,
            alpaca_device_focuser=alpaca_device_focuser,
            action=action,
            settle_time=paired_devices.get_device_config("Focuser").get(
                "settle_time", 3
            ),
        )
        astra_telescope = AstraTelescope(
            observatory, alpaca_device_telescope=alpaca_device_telescope, action=action
        )

        return cls(
            observatory=observatory,
            action_value=action_value,
            action=action,
            astra_camera=astra_camera,
            astra_focuser=astra_focuser,
            astra_telescope=astra_telescope,
        )

    def check_conditions(self) -> bool:
        """Check if observatory conditions are suitable for autofocus.

        Returns:
            bool: True if conditions are acceptable, False otherwise.
        """
        return self.observatory.check_conditions(action=self.action)


class Defocuser:
    """Defocusing control for creating intentionally out-of-focus images.

    Used for specific observational techniques requiring defocused stellar images,
    such as photometry of very bright stars or specific calibration procedures.

    Args:
        observatory (astra.observatory.Observatory): Astra observatory instance.
        paired_devices (PairedDevices): Device manager for observatory components.
        action (Optional[Action]): Action configuration for the focuser.
    """

    def __init__(
        self,
        observatory: Any,
        paired_devices: PairedDevices,
        action: Optional[Action] = None,
    ) -> None:
        self.observatory = observatory
        self.action = action
        self.paired_devices = paired_devices

        self._focuser = AstraFocuser(
            observatory=observatory,
            alpaca_device_focuser=paired_devices.focuser,
            action=action,
        )
        self.best_focus_position = self.load_best_focus_position_from_config()

    @property
    def focuser_name(self) -> str:
        """Get the name of the connected focuser device.

        Returns:
            str: Name identifier of the focuser device.
        """
        return self.paired_devices["Focuser"]

    def load_best_focus_position_from_config(self) -> int:
        """Load the best focus position from device configuration.

        Retrieves the stored focus position from the focuser configuration.

        Returns:
            int: Best focus position from configuration.

        Raises:
            ValueError: If focuser configuration is not found or missing focus_position.
        """
        focuser_config = self.paired_devices.get_device_config("Focuser")
        if "focus_position" not in focuser_config:
            self.observatory.logger.warning(
                "No best focus position found in focuser configuration. "
                "Using current position as best focus position."
                f" Focuser: {self.focuser_name}"
                f"focuser_config: {focuser_config}"
            )
            raise ValueError("Focuser configuration not found in paired devices.")

        return focuser_config["focus_position"]

    @property
    def current_position(self) -> int:
        """Get the current focuser position.

        Returns:
            int: Current position of the focuser.
        """
        return self._focuser.get_current_position()

    def defocus(self, position: int) -> None:
        """Move focuser to a defocused position.

        Intentionally moves the focuser away from the best focus position
        for specialized observing techniques.

        Args:
            position (int): Target defocus position.
        """
        if position == self.current_position:
            self.observatory.logger.debug(
                f"Focuser {self.focuser_name} already at position "
                f"{position}. No change of focus needed."
            )
            return

        current_position = self._focuser.get_current_position()
        shift = position - current_position
        self.observatory.logger.info(
            f"Defocusing by {shift} steps from current position {current_position} "
            f"to new position {position}."
        )

        self._focuser.move_focuser_to_position(position)

    def refocus(self) -> None:
        """Return focuser to the best focus position.

        Moves the focuser back to the stored best focus position
        after defocusing operations.
        """
        if self.current_position == self.best_focus_position:
            self.observatory.logger.debug(
                f"Focuser {self.focuser_name} already at best "
                f"focus position {self.best_focus_position}. No refocusing needed."
            )
            return
        self.observatory.logger.info(
            f"Refocusing from current position {self._focuser.get_current_position()} "
            f"to the best focus position: {self.best_focus_position}."
        )
        self._focuser.move_focuser_to_position(self.best_focus_position)


[docs] class Autofocuser: """Main autofocus orchestration class for Astra observatory. Coordinates complete autofocus operations including target selection, focus measurement, and optimization using multiple algorithms. Args: observatory (Any): Main Astra instance for system coordination. action (Action): Action configuration for the autofocus. paired_devices (PairedDevices): Device manager for observatory components. action_value (dict): Configuration values for autofocus action. save_path (Optional[Path]): Directory path for saving autofocus data. autofocuser (Optional[Union[NonParametricResponseAutofocuser, AnalyticResponseAutofocuser]]): Specific autofocus algorithm instance. success (bool): Success flag for autofocus operation. """ def __init__( self, observatory: Any, action: Action, paired_devices: PairedDevices, autofocuser: Optional[ Union[NonParametricResponseAutofocuser, AnalyticResponseAutofocuser] ] = None, success: bool = True, ) -> None: self.observatory = observatory self.action = action self.paired_devices = paired_devices self.action_value = action.action_value self.autofocuser = autofocuser self.success = success self.config: AutofocusConfig = action.action_value # type: ignore default_dict = paired_devices.get_device_config("Camera").get("autofocus", {}) logging.info(f"default_dict {default_dict}") if ( self.config.calibration_field.fov_width == 0 or self.config.calibration_field.fov_height == 0 ): fov_width, fov_height = self.determine_default_field_of_view( paired_devices, default_dict ) self.config.calibration_field.fov_width = fov_width self.config.calibration_field.fov_height = fov_height self._initialise_logging() self.observatory.logger.info(f"Loaded action values {self.action_value}") self.observatory.logger.info(f"Autofocus configuration: {self.config}.") @property def best_focus_position(self) -> int: """Get the best focus position found by the autofocuser. Returns: int: Optimal focus position determined by autofocus algorithm. Raises: ValueError: If autofocuser has not been set up yet. """ if self.autofocuser is None: raise ValueError("Autofocuser has not been set up yet.") return self.autofocuser.best_focus_position
[docs] def run(self) -> bool: """Execute the autofocus sequence. Runs the complete autofocus operation using the configured algorithm. Returns: bool: True if autofocus completed successfully, False otherwise. """ if not self.success or not self.observatory.check_conditions( action=self.action ): return False try: if not self.autofocuser: raise ValueError("Autofocuser has not been set up yet.") return self.autofocuser.run() except Exception as e: self.observatory.logger.report_device_issue( device_type="Autofocuser", device_name=self.paired_devices["Telescope"], message="Error running autofocus", exception=e, ) self.success = False return False
[docs] def setup(self) -> None: """Set up the autofocus system with devices and algorithms. Configures camera, focuser, telescope interfaces and initializes the autofocus algorithm with specified parameters. """ if not self._check_conditions(): return try: self._setup() except Exception as e: self.observatory.logger.report_device_issue( device_type="Autofocuser", device_name=self.paired_devices["Telescope"], message="Error extracting action_value for autofocus", exception=e, ) self.success = False
[docs] def _setup(self) -> None: """Internal setup method for autofocus configuration. Configures save paths, device managers, focus measure operators, and autofocus algorithms based on action values. """ autofocus_device_manager = AstraAutofocusDeviceManager.from_action( self.observatory, action=self.action, paired_devices=self.paired_devices, ) focus_measure_operator = self.determine_focus_measure_operator() # Reduce exposure time if necessary if self.config.reduce_exposure_time: # Clean image, CustomImageClass self.config.exptime = self.reduce_exposure_time( autofocus_device_manager=autofocus_device_manager, exposure_time=self.config.exptime, reduction_factor=2, max_reduction_steps=5, minimal_exposure_time=0.1, ) initial_focus_position = None if ( self.config.search_range_is_relative and self.config.search_range is not None ): initial_focus_position = self.paired_devices.get_device_config( "Focuser" ).get("focus_position", None) if initial_focus_position is None: initial_focus_position = ( autofocus_device_manager.focuser.get_current_position() ) self.observatory.logger.info( "No best focus position found in focuser configuration. " f"Using current position {initial_focus_position} " "to define autofocus search range instead." ) autofocus_args = dict( autofocus_device_manager=autofocus_device_manager, search_range=self.config.search_range, # None defaults to allowed focuser range n_steps=self.config.n_steps, n_exposures=self.config.n_exposures, decrease_search_range=self.config.decrease_search_range, exposure_time=self.config.exptime, # save_path=self.config.save_path, secondary_focus_measure_operators=self.config._secondary_focus_measure_operators, focus_measure_operator_kwargs=self.config.focus_measure_operator_kwargs, search_range_is_relative=self.config.search_range_is_relative, initial_position=initial_focus_position, keep_images=True, ) self.observatory.logger.debug(f"Autofocus arguments: {autofocus_args}") if issubclass(focus_measure_operator, StarSizeFocusMeasure): autofocuser = AnalyticResponseAutofocuser( focus_measure_operator=focus_measure_operator, percent_to_cut=self.config.percent_to_cut, **autofocus_args, ) self.observatory.logger.info( f"Using the focus_measure_operator {self.config.focus_measure_operator_name} " ) else: extremum_estimator = self.determine_extremum_estimator() autofocuser = NonParametricResponseAutofocuser( focus_measure_operator=focus_measure_operator(), extremum_estimator=extremum_estimator, **autofocus_args, ) self.observatory.logger.info( f"Using the extremum_estimator {extremum_estimator}" ) self.observatory.logger.debug(f"Using autofocuser {autofocuser}.") self.autofocuser = autofocuser
[docs] def determine_autofocus_calibration_field(self): """Determine optimal celestial coordinates for autofocus operation. Selects suitable star field for autofocus using Gaia catalog data and zenith neighborhood analysis. Uses various selection criteria including magnitude ranges, field of view, and star density preferences. Selection methods: - 'single': Isolated star closest to zenith - 'maximal': Star with maximum neighbors in field - 'any': Any suitable star closest to zenith Updates self.config.calibration_field.coordinates with selected coordinates or sets self.success to False if no suitable field found. """ if not self._check_conditions(): return None try: self._determine_autofocus_calibration_field() if not isinstance(self.config.calibration_field.coordinates, SkyCoord): self.success = False except Exception as e: self.success = False self.observatory.logger.report_device_issue( device_type="Autofocuser", device_name=self.paired_devices["Telescope"], message="Error determining autofocus calibration field", exception=e, )
[docs] def _determine_autofocus_calibration_field(self) -> None: """Determine the calibration field for the autofocus using config attributes.""" calibration_config = self.config.calibration_field # Use user-specified coordinates if present if calibration_config.ra is not None and calibration_config.dec is not None: self.observatory.logger.info( "Using user-specified calibration coordinates for autofocus." ) calibration_config.coordinates = SkyCoord( ra=Angle(float(calibration_config.ra), u.deg), dec=Angle(float(calibration_config.dec), u.deg), ) return self.observatory.logger.info("Determining autofocus calibration field.") try: observatory_location = ( self.observatory.image_handler.get_observatory_location() ) logging.info( f"Observatory location determined to be at {observatory_location}." ) except Exception as e: raise ValueError(f"Error determining observatory location: {str(e)}.") try: if not Config().gaia_db.exists() or not calibration_config.use_gaia: raise ValueError("gaia_tmass_db_path not specified in config.") self.observatory.logger.info( f"Computing coordinates for the autofocus target with maximal zenith angle of " f"{calibration_config.maximal_zenith_angle}." ) zenith_neighbourhood_query = ( ZenithNeighbourhoodQuery.create_from_location_and_angle( db_path=Config().gaia_db, observatory_location=observatory_location, observation_time=calibration_config.observation_time, maximal_zenith_angle=calibration_config.maximal_zenith_angle, maximal_number_of_stars=calibration_config.maximal_number_of_stars, ) ) self.observatory.logger.info( "Zenith was determined to be at " f"{zenith_neighbourhood_query.zenith_neighbourhood.zenith.icrs!r}." ) min_phot_g_mean_mag, max_phot_g_mean_max = calibration_config.g_mag_range min_j_m, max_j_m = calibration_config.j_mag_range znqr = zenith_neighbourhood_query.query_shardwise( n_sub_div=20, min_phot_g_mean_mag=min_phot_g_mean_mag, max_phot_g_mean_mag=max_phot_g_mean_max, min_j_m=min_j_m, max_j_m=max_j_m, ) self.observatory.logger.info( f"Retrieved {len(znqr)} stars in the neighbourhood of the zenith from the database " "within the desired magnitude ranges.", ) if not self.observatory.check_conditions(action=self.action): return znqr.determine_stars_in_neighbourhood( height=calibration_config.fov_height, width=calibration_config.fov_width, ) if not self.observatory.check_conditions(action=self.action): return znqr.sort_values(["zenith_angle", "n"], ascending=[True, True]) selection_method = calibration_config.selection_method if selection_method == SelectionMethod.SINGLE: centre_coordinates = znqr.get_sky_coord_of_select_star( np.argmax(znqr.n == 1) ) elif selection_method == SelectionMethod.MAXIMAL: centre_coordinates = znqr.get_sky_coord_of_select_star( np.argmax(znqr.n) ) elif selection_method == SelectionMethod.ANY: centre_coordinates = znqr.get_sky_coord_of_select_star(0) else: # This should never happen due to enum selection raise ValueError(f"Unknown selection_method: {selection_method}") if centre_coordinates is None or not isinstance( centre_coordinates, SkyCoord ): raise ValueError("No suitable calibration field found.") calibration_config.coordinates = centre_coordinates except Exception as e: if not self.observatory.check_conditions(action=self.action): return self.observatory.logger.warning( f"Error determining autofocus target coordinates: {str(e)}. " "Attempt to autofocus at zenith.", ) try: calibration_config.coordinates = SkyCoord( AltAz( obstime=Time.now(), location=observatory_location, alt=Angle(90.0, unit=u.deg), az=Angle(0.0, unit=u.deg), ) ).icrs # type: ignore self.observatory.logger.info( "Autofocus target coordinates set to zenith." ) except Exception as e: raise ValueError( f"Error determining zenith: {str(e)}." "This is likely due to an error in the observatory location in the header." )
[docs] def slew_to_calibration_field(self) -> None: """Slew telescope to the determined autofocus calibration field. Moves the telescope to the coordinates selected for autofocus operations. Updates action_value with target coordinates and initiates observatory setup. """ if not self.success: return None self.observatory.logger.debug( "Slewing to autofocus calibration field: " f"{self.config.calibration_field.coordinates!r}" ) self.action_value["ra"] = self.config.calibration_field.coordinates.ra.deg self.action_value["dec"] = self.config.calibration_field.coordinates.dec.deg try: self.observatory.setup_observatory(self.paired_devices, self.action_value) except Exception as e: self.observatory.logger.report_device_issue( device_type="Autofocuser", device_name=self.paired_devices["Telescope"], message="Error slewing to autofocus calibration field", exception=e, ) self.success = False
[docs] def reduce_exposure_time( self, autofocus_device_manager: AstraAutofocusDeviceManager, exposure_time: float, reduction_factor: float = 2, max_reduction_steps: int = 5, minimal_exposure_time: float = 0.1, ) -> float: """Automatically reduce exposure time to prevent saturation. Takes test exposures and progressively reduces exposure time if saturation is detected, ensuring optimal image quality for focus measurements. Args: autofocus_device_manager (AstraAutofocusDeviceManager): Device manager for camera access. exposure_time (float): Initial exposure time in seconds. reduction_factor (float): Factor by which to reduce exposure time. max_reduction_steps (int): Maximum number of reduction iterations. minimal_exposure_time (float): Minimum allowed exposure time. Returns: float: Optimal exposure time that avoids saturation. """ new_exposure_time = exposure_time for _ in range(max_reduction_steps): if new_exposure_time < minimal_exposure_time: self.observatory.logger.warning( f"Minimal exposure time of {minimal_exposure_time} reached. " f"Cannot reduce exposure time further. Image might still be saturated.", ) return new_exposure_time * reduction_factor image = autofocus_device_manager.camera.perform_exposure( texp=new_exposure_time, use_light=True ) clean = ndimage.median_filter(image, size=4, mode="mirror") band_corr = np.median(clean, axis=1).reshape(-1, 1) band_clean = clean - band_corr if band_clean.max() > 0.9 * autofocus_device_manager.camera.maxadu: new_exposure_time = new_exposure_time / reduction_factor else: break if band_clean.max() > 0.9 * autofocus_device_manager.camera.maxadu: self.observatory.logger.warning( f"Reduced exposure time of {exposure_time} s is still saturating. " ) elif new_exposure_time != exposure_time: self.observatory.logger.warning( f"Reduced exposure time from {exposure_time} to {new_exposure_time} " f"to avoid saturation.", ) return new_exposure_time
[docs] def make_summary_plot(self) -> None: """Create visualization plot of autofocus results. Generates a summary plot showing focus measure vs focuser position with the determined best focus position marked. Saves plot to the autofocus results directory. """ try: if self.success is False: return # Determine directory to write the summary to. Prefer configured save_path; # if not provided, fall back to the directory containing the last saved # autofocus image for this camera (if available). save_dir: Path | None = None if self.config.save_path is not None: save_dir = Path(self.config.save_path) if save_dir is None: # try to find last image saved by this camera try: image_handler = self.observatory.get_image_handler( self.action.device_name ) last_image_path = getattr(image_handler, "last_image_path", None) except Exception: last_image_path = None if last_image_path is None: self.observatory.logger.warning( "Skipping creation of summary plot: no save_path configured and no last image available." ) return save_dir = last_image_path.parent # Obtain focus record dataframe. Prefer in-memory record from the # astrafocus autofocuser instance; if not available, try reading CSVs # from the chosen directory. df: pd.DataFrame | None = None if ( hasattr(self, "autofocuser") and getattr(self.autofocuser, "focus_record", None) is not None ): try: df = self.autofocuser.focus_record except Exception: df = None if df is None: assert save_dir is not None csv_files = sorted( [p for p in Path(save_dir).iterdir() if p.suffix == ".csv"], key=lambda p: p.stat().st_mtime, ) if not csv_files: self.observatory.logger.error( f"No focus record CSV found in {save_dir}. Skipping summary plot." ) return df = pd.read_csv(csv_files[-1]) df = df.sort_values("focus_pos") matplotlib.use("Agg") _, ax = plt.subplots(dpi=300) ax.plot( df["focus_pos"], df["focus_measure"], color="black", marker=".", ls="" ) ax.set_xlabel("Focuser position") ax.set_ylabel(f"Focus measure ({self.config.focus_measure_operator_name})") ax.axvline( self.best_focus_position, color="red", ls="--", zorder=-1, label="Best focus position", ) ax.legend() # Build output filename: if we read a CSV file use its stem, otherwise timestamp it. if "csv_files" in locals() and csv_files: out_name = f"{csv_files[-1].stem}.png" else: out_name = ( f"autofocus_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" ) assert save_dir is not None out_path = Path(save_dir) / out_name plt.savefig(out_path) plt.close() except Exception as e: self.observatory.logger.exception(f"Error creating summary plot: {str(e)}")
[docs] def create_result_file(self) -> None: """Create text file with autofocus results summary. Writes a summary file containing the best focus position, algorithm used, and autofocuser configuration for future reference. """ if self.success is False: return save_dir: Path | None = None if self.config.save_path is not None: save_dir = Path(self.config.save_path) if save_dir is None: try: image_handler = self.observatory.get_image_handler( self.action.device_name ) last_image_path = getattr(image_handler, "last_image_path", None) except Exception: last_image_path = None if last_image_path is None: self.observatory.logger.error( "Skipping creation of log file: no save_path configured and no last image available." ) return save_dir = last_image_path.parent # derive a timestring for filename; prefer an adjacent CSV if present timestr = None assert save_dir is not None csv_files = sorted( [p for p in Path(save_dir).iterdir() if p.suffix == ".csv"], key=lambda p: p.stat().st_mtime, ) if csv_files: timestr = csv_files[-1].stem.split("_")[0] if not timestr: timestr = datetime.now().strftime("%Y%m%d_%H%M%S") result_file_path = Path(save_dir) / f"{timestr}_result.txt" try: with open(result_file_path, "w") as result_file: result_file.write(f"Best focus position: {self.best_focus_position}\n") result_file.write( f"Focus measure operator: {self.config.focus_measure_operator_name}\n" ) result_file.write(f"Autofocuser: {self.autofocuser}\n") except Exception as e: self.observatory.logger.exception(f"Error creating log file: {str(e)}")
[docs] def _initialise_logging(self) -> None: """Set up logging integration with astrafocus library. Configures the astrafocus logger to use Astra's logging system for consistent log formatting and storage. """ if logging.getLogger("astrafocus").hasHandlers(): logging.getLogger("astrafocus").handlers.clear() logging.getLogger("astrafocus").addHandler( DatabaseLoggingHandler(self.observatory.database_manager) )
[docs] def _check_conditions(self) -> bool: """Verify observatory conditions are suitable for autofocus. Checks weather, equipment status, and other conditions before proceeding with autofocus operations. Returns: bool: True if conditions are acceptable, False otherwise. """ if not self.observatory.check_conditions(action=self.action): self.observatory.logger.error("Autofocus aborted due to bad conditions.") self.success = False return self.success
[docs] def save_best_focus_position(self) -> None: """Save determined best focus position to observatory configuration. Updates the focuser configuration with the optimal focus position found during autofocus operation for future use. """ if not self.success or not self.config.save: return self.observatory.logger.info( f"Saving best focus position {self.best_focus_position} " f"of type {type(self.best_focus_position)} " ) self.paired_devices.get_device_config("Focuser")["focus_position"] = int( self.best_focus_position ) self.paired_devices.observatory_config.save()
[docs] def determine_focus_measure_operator(self): """Select focus measurement algorithm from configuration. Determines the appropriate focus measure operator based on user preferences, supporting various algorithms like HFR, 2D Gaussian, FFT, and variance-based methods. Returns: Tuple[Any, str]: Focus measure operator class and descriptive name. """ focus_measure_operator = FocusMeasureOperatorRegistry.from_name( self.config.focus_measure_operator ) return focus_measure_operator
[docs] def determine_extremum_estimator( self, ): """Select extremum estimation algorithm for focus curve analysis. Chooses appropriate curve fitting method for determining optimal focus from focus measure vs position data. Supports LOWESS, median filter, spline, and RBF methods. Returns: astrafee.RobustExtremumEstimator: Configured extremum estimator instance. """ extremum_estimator_class = ExtremumEstimatorRegistry.from_name( self.config.extremum_estimator ) extremum_estimator = extremum_estimator_class( **self.config.extremum_estimator_kwargs ) self.observatory.logger.info( f"Initialised extremum estimator: {extremum_estimator.__class__.__name__} " f"with parameters: {self.config.extremum_estimator_kwargs}" ) return extremum_estimator
[docs] def calculate_field_of_view(self, paired_devices): """ Calculate the field of view of the camera-telescope system. """ try: camera = paired_devices.camera telescope = paired_devices.telescope # Convert microns to meters pixel_size = 1e-6 * np.array( [camera.get("PixelSizeX"), camera.get("PixelSizeY")] ) number_of_pixels = np.array([camera.get("NumX"), camera.get("NumY")]) focal_length = telescope.get("FocalLength") # meters # plate_scale = np.arctan(pixel_size / focal_length) # field_of_view = plate_scale * number_of_pixels sensor_size = pixel_size * number_of_pixels # [sx, sy] fov = 2.0 * np.arctan(sensor_size / (2.0 * focal_length)) * (180.0 / np.pi) return fov except Exception as e: field_of_view = np.array([np.nan, np.nan]) self.observatory.error( f"Error calculating field of view from paired devices. Exception: {e}" ) return field_of_view
[docs] def determine_default_field_of_view(self, paired_devices, default_dict): field_of_view = self.calculate_field_of_view(paired_devices) fov_width = float(default_dict.get("fov_width", field_of_view[0])) fov_height = float(default_dict.get("fov_height", field_of_view[1])) self.observatory.logger.info( f"Determined field of view width={fov_width}, height={fov_height}." ) return fov_width, fov_height