"""
Astronomical telescope autoguiding system with PID control.
This module provides automated guiding functionality for astronomical telescopes
using image-based tracking with PID control loops. It implements the complete
guiding workflow from image acquisition to telescope correction commands.
Key Features:
- Real-time star tracking using the Donuts image registration library
- PID control loops for precise telescope corrections
- Database logging of guiding performance and corrections
- Support for German Equatorial Mount (GEM) pier side changes
- Outlier rejection and statistical analysis of guiding errors
- Automatic reference image management per field/filter combination
- Background subtraction and image cleaning for robust star detection
The system continuously monitors incoming images, compares them to reference
images, calculates pointing errors, and applies corrective pulse guide commands
to keep the telescope accurately tracking celestial objects.
Components:
CustomImageClass: Image preprocessing for robust star detection
Guider: Main autoguiding class with PID control
PID: Discrete PID controller implementation
"""
import os
import time
from datetime import UTC, datetime
from math import cos, radians
from shutil import copyfile
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
from alpaca.telescope import AlignmentModes, GuideDirections, PierSide
from astropy.io import fits
from donuts import Donuts
from astra import Config
from astra.database_manager import DatabaseManager
from astra.image_handler import ImageHandler
from astra.logger import ObservatoryLogger
from astra.paired_devices import PairedDevices
from astra.thread_manager import ThreadManager
from astra.utils import CustomImageClass
# rejection buffer length
GUIDE_BUFFER_LENGTH = 20
# number images allowed during pull in period
IMAGES_TO_STABILISE = 3
# outlier rejection sigma
SIGMA_BUFFER = 10
# max allowed shift to correct
MAX_ERROR_PIXELS = 20
# max alloed shift to correct during stabilisation
MAX_ERROR_STABIL_PIXELS = 40
# IsPulseGuiding timeout
IS_PULSE_GUIDING_TIMEOUT = 120 # seconds
[docs]
class GuiderManager:
def __init__(self, guider_dict, logger, database_manager):
"""
guider_dict: dict[str, Guider] - mapping telescope names to Guider instances
logger: logging.Logger or ObservatoryLogger
database_manager: DatabaseManager
"""
self.guider = guider_dict
self.logger = logger
self.database_manager = database_manager
[docs]
@classmethod
def from_observatory(cls, observatory) -> "GuiderManager":
guider_dict: dict[str, Guider] = {}
if "Telescope" in observatory.config:
for device_name in observatory.devices["Telescope"]:
telescope = observatory.devices["Telescope"][device_name]
telescope_index = [
i
for i, d in enumerate(observatory.config["Telescope"])
if d["device_name"] == device_name
][0]
if "guider" in observatory.config["Telescope"][telescope_index]:
guider_params = observatory.config["Telescope"][telescope_index][
"guider"
]
guider_dict[device_name] = Guider(
telescope,
logger=observatory.logger,
database_manager=observatory.database_manager,
params=guider_params,
)
return cls(guider_dict, observatory.logger, observatory.database_manager)
[docs]
def start_guider(
self,
image_handler: ImageHandler,
paired_devices: PairedDevices,
thread_manager: ThreadManager,
reset_guiding_reference: bool = True,
) -> bool:
"""
Start the autoguiding system for telescope tracking.
Initializes and starts the guiding system to maintain accurate telescope
tracking during long exposures. Creates a separate thread for guiding
operations to run concurrently with image acquisition.
Parameters:
image_handler (ImageHandler): Image handler for managing image files.
paired_devices (PairedDevices): Object containing telescope and guide
camera devices for the guiding system.
thread_manager (ThreadManager): Thread manager for tracking guiding thread.
reset_guiding_reference (bool, optional): If True, clears existing reference
image from database before starting guiding, forcing creation of a new
reference. Defaults to True.
Returns:
bool: True if guider was started successfully, False otherwise.
Process:
1. Logs guiding start for the specified telescope
2. Optionally clears existing reference image if reset flag is set
3. Creates guider thread with appropriate parameters
4. Starts the guiding thread in background
5. Adds thread to observatory's thread tracking list
Note:
- Guiding runs in a separate thread to avoid blocking main operations
- Thread is tracked in self.threads for proper cleanup
- Filter name formatting removes single quotes for compatibility
"""
self.logger.info(f"Starting guiding for {paired_devices['Telescope']}")
# Clear reference image if requested
if reset_guiding_reference:
# Get current field info from image handler if available
try:
telescope_name = paired_devices["Telescope"]
camera_name = paired_devices["Camera"]
guider = self.guider[telescope_name]
# Try to get field info from the most recent header
field = image_handler.header.get("OBJECT")
filt = image_handler.header.get("FILTER")
exptime = str(image_handler.header.get("EXPTIME"))
pierside = paired_devices.telescope.get("SideOfPier")
if field and filt and exptime:
guider.clearReferenceImage(
field=field,
filt=filt,
exptime=exptime,
camera=camera_name,
pierside=pierside,
)
else:
self.logger.warning(
"Could not determine field/filter/exptime for clearing reference. "
"New reference will be created when guiding starts."
)
except Exception as e:
self.logger.warning(f"Error clearing guiding reference: {e}")
binning = paired_devices.camera.get("BinX")
thread_manager.start_thread(
target=self.guider[paired_devices["Telescope"]].guider_loop,
args=(
paired_devices["Camera"],
image_handler,
binning,
),
thread_type="guider",
device_name=paired_devices["Telescope"],
thread_id="guider",
)
return True
[docs]
def stop_guider(self, telescope_name, thread_manager):
"""
Stop guiding for a given telescope.
This function finds the correct guider thread using the telescope's
device name, sets its running flag to False, and then waits for the
thread to terminate.
Parameters:
telescope_name (str): The name of the telescope whose guider
should be stopped.
Returns:
bool: True if the guider was stopped successfully, False otherwise.
"""
for thread_info in thread_manager.threads:
if (
thread_info["type"] == "guider"
and thread_info["device_name"] == telescope_name
):
# Get the guider instance and set its running flag to False
guider_instance = self.guider[telescope_name]
if guider_instance.running:
self.logger.info(f"Stopping guiding for {telescope_name}")
guider_instance.running = False
# Wait for the thread to finish
thread_info["thread"].join()
# Remove the thread from the list
self.logger.info(
f"Guiding for {telescope_name} stopped successfully."
)
return True
return False
[docs]
class Guider:
"""
Automated telescope guiding system with PID control and statistical analysis.
Implements a complete autoguiding solution that continuously monitors telescope
pointing accuracy and applies corrective pulse guide commands. Features include
PID control loops, outlier rejection, database logging, and support for German
Equatorial Mounts with pier side changes.
The guider maintains statistical buffers for error analysis, handles field
stabilization periods, and manages reference images per field/filter combination.
Attributes:
telescope: Alpaca telescope device for pulse guiding commands
observatory: Astra observatory instance for logging and database access
PIX2TIME: Pixel-to-millisecond conversion factors for guide pulses
DIRECTIONS: Mapping of guide directions to Alpaca constants
RA_AXIS: Which axis (x/y) corresponds to Right Ascension
PID_COEFFS: PID controller coefficients for x and y axes
running: Flag to control guiding loop execution
Example:
>>> guider = Guider(telescope, astra_instance, {
... "PIX2TIME": {"+x": 100, "-x": 100, "+y": 100, "-y": 100},
... "DIRECTIONS": {"+x": "East", "-x": "West", "+y": "North", "-y": "South"},
... "RA_AXIS": "x",
... "PID_COEFFS": {"x": {"p": 0.8, "i": 0.1, "d": 0.1}, ...}
... })
>>> guider.guider_loop("camera1", "/data/*.fits")
"""
def __init__(
self,
telescope: Any,
logger: ObservatoryLogger,
database_manager: DatabaseManager,
params: Dict[str, Any],
) -> None:
"""
Initialize the autoguider with telescope, logging, and PID parameters.
Parameters:
telescope: Alpaca telescope device for sending pulse guide commands.
logger: Astra observatory logger for logging messages.
database_manager: Astra database manager for logging guiding data.
params (dict): Configuration dictionary containing:
- PIX2TIME: Pixel to millisecond conversion factors
- DIRECTIONS: Guide direction mappings
- RA_AXIS: Which axis corresponds to RA ("x" or "y")
- PID_COEFFS: PID controller coefficients for both axes
"""
# TODO: camera angle?
self.telescope = telescope
self.logger = logger
self.database_manager = database_manager
# set up the database
self.create_tables() # this is assuming we're using the same db. Should we have a separate one for guiding?
# set up the image glob string
# create reference directory if not exists
self.reference_dir = Config().paths.images / "autoguider_ref"
self.reference_dir.mkdir(parents=True, exist_ok=True)
# pulseGuide conversions
self.PIX2TIME = params["PIX2TIME"]
# guide directions
self.DIRECTIONS = {}
for direction in params["DIRECTIONS"]:
if params["DIRECTIONS"][direction] == "North":
self.DIRECTIONS[direction] = GuideDirections.guideNorth
elif params["DIRECTIONS"][direction] == "South":
self.DIRECTIONS[direction] = GuideDirections.guideSouth
elif params["DIRECTIONS"][direction] == "East":
self.DIRECTIONS[direction] = GuideDirections.guideEast
elif params["DIRECTIONS"][direction] == "West":
self.DIRECTIONS[direction] = GuideDirections.guideWest
else:
self.logger.report_device_issue(
device_type="Guider",
device_name=self.telescope.device_name,
message=f"Invalid guide direction {params['DIRECTIONS'][direction]} for {self.telescope.device_name} config",
)
# RA axis alignment along x or y
self.RA_AXIS = params["RA_AXIS"]
# PID loop coefficients
self.PID_COEFFS = params["PID_COEFFS"]
# minimum guide interval
self.MIN_GUIDE_INTERVAL = params.get("MIN_GUIDE_INTERVAL", 30.0)
# set up variables
# initialise the PID controllers for X and Y
self.PIDx: PID = PID.from_config_dict(self.PID_COEFFS["x"])
self.PIDy: PID = PID.from_config_dict(self.PID_COEFFS["y"])
self.PIDx.initialize_set_point(self.PID_COEFFS["set_x"])
self.PIDy.initialize_set_point(self.PID_COEFFS["set_y"])
# ag correction buffers - used for outlier rejection
self.BUFF_X: List[float] = []
self.BUFF_Y: List[float] = []
self.running: bool = False
[docs]
def create_tables(self) -> None:
"""
Create database tables for autoguider reference images and logging.
Creates three tables:
- autoguider_ref: Reference image metadata and validity periods
- autoguider_log: Detailed guiding corrections and statistics
- autoguider_info_log: General status and info messages
"""
db_command_0 = """CREATE TABLE IF NOT EXISTS autoguider_ref (
ref_id mediumint auto_increment primary key,
field varchar(100) not null,
camera varchar(20) not null,
ref_image varchar(100) not null,
filter varchar(20) not null,
exptime varchar(20) not null,
pierside int not null,
valid_from datetime not null,
valid_until datetime
);"""
self.database_manager.execute(db_command_0)
db_command_1 = """CREATE TABLE IF NOT EXISTS autoguider_log (
datetime timestamp default current_timestamp,
telescope_name varchar(50) not null,
night date not null,
reference varchar(150) not null,
comparison varchar(150) not null,
stabilised varchar(5) not null,
shift_x double not null,
shift_y double not null,
pre_pid_x double not null,
pre_pid_y double not null,
post_pid_x double not null,
post_pid_y double not null,
std_buff_x double not null,
std_buff_y double not null,
culled_max_shift_x varchar(5) not null,
culled_max_shift_y varchar(5) not null
);
"""
self.database_manager.execute(db_command_1)
db_command_2 = """CREATE TABLE IF NOT EXISTS autoguider_info_log (
message_id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
camera varchar(20) NOT NULL,
message varchar(500) NOT NULL
);
"""
self.database_manager.execute(db_command_2)
[docs]
def logShiftsToDb(self, qry_args: Tuple[str, ...]) -> None:
"""
Log autoguiding corrections and statistics to the database.
Parameters:
qry_args (tuple): Tuple containing guiding data in order:
night, reference, comparison, stabilised, shift_x, shift_y,
pre_pid_x, pre_pid_y, post_pid_x, post_pid_y, std_buff_x,
std_buff_y, culled_max_shift_x, culled_max_shift_y
"""
qry = """
INSERT INTO autoguider_log
(telescope_name, night, reference, comparison, stabilised, shift_x, shift_y,
pre_pid_x, pre_pid_y, post_pid_x, post_pid_y, std_buff_x,
std_buff_y, culled_max_shift_x, culled_max_shift_y)
VALUES
('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s',
'%s', '%s', '%s', '%s', '%s', '%s', '%s')
"""
self.database_manager.execute(qry % qry_args)
[docs]
def logMessageToDb(self, camera_name: str, message: str) -> None:
"""
Log status messages to the database.
Parameters:
camera_name (str): Name of the camera being autoguided.
message (str): Status or info message to log.
"""
qry = """
INSERT INTO autoguider_info_log
(camera, message)
VALUES
('%s', '%s')
"""
qry_args = (camera_name, message)
self.database_manager.execute(qry % qry_args)
[docs]
def guide(
self,
x: float,
y: float,
images_to_stabilise: int,
camera_name: str,
binning: int = 1,
gem: bool = False,
) -> Tuple[bool, float, float, float, float]:
"""
Apply telescope guiding corrections using PID control with outlier rejection.
Processes measured pointing errors through PID controllers, applies outlier
rejection during stable operation, and sends pulse guide commands to the telescope.
Handles declination scaling for RA corrections and German Equatorial Mount
pier side changes.
Parameters:
x (float): Guide correction needed in X direction (pixels).
y (float): Guide correction needed in Y direction (pixels).
images_to_stabilise (int): Images remaining in stabilization period.
Negative values indicate stable operation.
camera_name (str): Name of the camera for logging.
binning (int, optional): Image binning factor. Defaults to 1.
gem (bool, optional): Whether telescope is German Equatorial Mount. Defaults to False.
Returns:
tuple: (success, pidx, pidy, sigma_x, sigma_y) where:
- success (bool): Whether correction was applied
- pidx, pidy (float): Actual corrections sent to mount
- sigma_x, sigma_y (float): Buffer standard deviations
"""
if gem:
current_pierside = self.telescope.get("SideOfPier")
# get telescope declination to scale RA corrections
dec = self.telescope.get("Declination")
dec_rads = radians(dec)
cos_dec = cos(dec_rads)
# pop the earliest buffer value if > 30 measurements
while len(self.BUFF_X) > GUIDE_BUFFER_LENGTH:
self.BUFF_X.pop(0)
while len(self.BUFF_Y) > GUIDE_BUFFER_LENGTH:
self.BUFF_Y.pop(0)
assert len(self.BUFF_X) == len(self.BUFF_Y)
if images_to_stabilise < 0:
CURRENT_MAX_SHIFT = MAX_ERROR_PIXELS
# kill anything that is > sigma_buffer sigma buffer stats
if (
len(self.BUFF_X) < GUIDE_BUFFER_LENGTH
and len(self.BUFF_Y) < GUIDE_BUFFER_LENGTH
):
self.logMessageToDb(camera_name, "Filling AG stats buffer...")
sigma_x = 0.0
sigma_y = 0.0
else:
sigma_x = float(np.std(self.BUFF_X))
sigma_y = float(np.std(self.BUFF_Y))
if abs(x) > SIGMA_BUFFER * sigma_x or abs(y) > SIGMA_BUFFER * sigma_y:
self.logMessageToDb(
camera_name,
"Guide error > {} sigma * buffer errors, ignoring...".format(
SIGMA_BUFFER
),
)
# store the original values in the buffer, even if correction
# was too big, this will allow small outliers to be caught
self.BUFF_X.append(x)
self.BUFF_Y.append(y)
return True, 0.0, 0.0, sigma_x, sigma_y
else:
pass
else:
self.logMessageToDb(camera_name, "Ignoring AG buffer during stabilisation")
CURRENT_MAX_SHIFT = MAX_ERROR_STABIL_PIXELS
sigma_x = 0.0
sigma_y = 0.0
# update the PID controllers, run them in parallel
pidx = self.PIDx.update(x) * -1
pidy = self.PIDy.update(y) * -1
# check if we are stabilising and allow for the max shift
if images_to_stabilise > 0:
if pidx >= CURRENT_MAX_SHIFT:
pidx = CURRENT_MAX_SHIFT
elif pidx <= -CURRENT_MAX_SHIFT:
pidx = -CURRENT_MAX_SHIFT
if pidy >= CURRENT_MAX_SHIFT:
pidy = CURRENT_MAX_SHIFT
elif pidy <= -CURRENT_MAX_SHIFT:
pidy = -CURRENT_MAX_SHIFT
self.logMessageToDb(camera_name, "PID: {0:.2f} {1:.2f}".format(pidx, pidy))
# make another check that the post PID values are not > Max allowed
# using >= allows for the stabilising runs to get through
# abs() on -ve duration otherwise throws back an error
if pidy > 0 and pidy <= CURRENT_MAX_SHIFT and self.running:
guide_time_y = pidy * self.PIX2TIME["+y"] / binning
y_p_dir = self.DIRECTIONS["+y"]
if self.RA_AXIS == "y":
guide_time_y = guide_time_y / cos_dec
if gem is False:
pass # keep as is
elif current_pierside == PierSide.pierEast:
pass # keep as is
else:
if self.DIRECTIONS["+y"] == GuideDirections.guideWest:
y_p_dir = GuideDirections.guideEast
else:
y_p_dir = GuideDirections.guideWest
self.telescope.get("PulseGuide")(
Direction=y_p_dir, Duration=int(guide_time_y)
)
if pidy < 0 and pidy >= -CURRENT_MAX_SHIFT and self.running:
guide_time_y = abs(pidy * self.PIX2TIME["-y"] / binning)
y_n_dir = self.DIRECTIONS["-y"]
if self.RA_AXIS == "y":
guide_time_y = guide_time_y / cos_dec
if gem is False:
pass # keep as is
elif current_pierside == PierSide.pierEast:
pass # keep as is
else:
if self.DIRECTIONS["-y"] == GuideDirections.guideWest:
y_n_dir = GuideDirections.guideEast
else:
y_n_dir = GuideDirections.guideWest
self.telescope.get("PulseGuide")(
Direction=y_n_dir, Duration=int(guide_time_y)
)
start_time = time.time()
while self.telescope.get("IsPulseGuiding") and self.running:
if time.time() - start_time > IS_PULSE_GUIDING_TIMEOUT:
self.logger.warning(
f"Pulse guiding timed out after {IS_PULSE_GUIDING_TIMEOUT} seconds."
)
break
time.sleep(0.01)
if pidx > 0 and pidx <= CURRENT_MAX_SHIFT and self.running:
guide_time_x = pidx * self.PIX2TIME["+x"] / binning
x_p_dir = self.DIRECTIONS["+x"]
if self.RA_AXIS == "x":
guide_time_x = guide_time_x / cos_dec
if gem is False:
pass
elif current_pierside == PierSide.pierEast:
pass # keep as is
else:
if self.DIRECTIONS["+x"] == GuideDirections.guideWest:
x_p_dir = GuideDirections.guideEast
else:
x_p_dir = GuideDirections.guideWest
self.telescope.get("PulseGuide")(
Direction=x_p_dir, Duration=int(guide_time_x)
)
if pidx < 0 and pidx >= -CURRENT_MAX_SHIFT and self.running:
guide_time_x = abs(pidx * self.PIX2TIME["-x"] / binning)
x_n_dir = self.DIRECTIONS["-x"]
if self.RA_AXIS == "x":
guide_time_x = guide_time_x / cos_dec
if gem is False:
pass
elif current_pierside == PierSide.pierEast:
pass # keep as is
else:
if self.DIRECTIONS["-x"] == GuideDirections.guideWest:
x_n_dir = GuideDirections.guideEast
else:
x_n_dir = GuideDirections.guideWest
self.telescope.get("PulseGuide")(
Direction=x_n_dir, Duration=int(guide_time_x)
)
start_time = time.time()
while self.telescope.get("IsPulseGuiding") and self.running:
if time.time() - start_time > IS_PULSE_GUIDING_TIMEOUT:
self.logger.warning(
f"Pulse guiding timed out after {IS_PULSE_GUIDING_TIMEOUT} seconds."
)
break
time.sleep(0.01)
if self.running:
self.logMessageToDb(camera_name, "Guide correction Applied")
else:
self.logMessageToDb(
camera_name,
"Guide correction NOT Applied due to self.running=False",
)
# store the original values in the buffer
# only if we are not stabilising
if images_to_stabilise < 0:
self.BUFF_X.append(x)
self.BUFF_Y.append(y)
return True, pidx, pidy, sigma_x, sigma_y
[docs]
def getReferenceImage(
self,
field: str | None,
filt: str | None,
exptime: str | None,
camera: str,
pierside: int,
) -> Optional[str]:
"""
Retrieve the current reference image path for given observation parameters.
Parameters:
field (str): Target field name.
filt (str): Filter name.
exptime (str): Exposure time.
camera (str): Camera name.
pierside (int): Telescope pier side (1=West, 0=East, -1=Unknown).
Returns:
str | None: Path to reference image, or None if not found.
"""
if field is None or filt is None or exptime is None:
raise ValueError("Field, filter, and exptime must be provided")
tnow = datetime.now(UTC).isoformat().split(".")[0].replace("T", " ")
qry = """
SELECT ref_image
FROM autoguider_ref
WHERE field = '%s'
AND filter = '%s'
AND exptime = '%s'
AND valid_from <= '%s'
AND camera = '%s'
AND pierside = %d
AND (valid_until IS NULL OR valid_until > '%s')
ORDER BY valid_from DESC
LIMIT 1
"""
qry_args = (field, filt, exptime, tnow, camera, pierside, tnow)
result = self.database_manager.execute(qry % qry_args)
if not result:
ref_image = None
else:
ref_image = os.path.join(self.reference_dir, result[0][0])
return ref_image
[docs]
def clearReferenceImage(
self,
field: str | None,
filt: str | None,
exptime: str | None,
camera: str,
pierside: int,
) -> None:
"""
Clear reference image from the database for given observation parameters.
Sets valid_until to current time for matching reference images, effectively
invalidating them so a new reference will be created on next guiding run.
Parameters:
field (str): Target field name.
filt (str): Filter name.
exptime (str): Exposure time.
camera (str): Camera name.
pierside (int): Telescope pier side (1=West, 0=East, -1=Unknown).
"""
if field is None or filt is None or exptime is None:
self.logger.warning(
"Cannot clear reference image: field, filter, or exptime is None"
)
return
tnow = datetime.now(UTC).isoformat().split(".")[0].replace("T", " ")
qry = """
UPDATE autoguider_ref
SET valid_until = '%s'
WHERE field = '%s'
AND filter = '%s'
AND exptime = '%s'
AND camera = '%s'
AND pierside = %d
AND valid_until IS NULL
"""
qry_args = (tnow, field, filt, exptime, camera, pierside)
self.database_manager.execute(qry % qry_args)
self.logMessageToDb(
camera,
f"Reference cleared for field={field}, filter={filt}, pierside={pierside}",
)
[docs]
def setReferenceImage(
self,
field: str | None,
filt: str | None,
exptime: str | None,
ref_image: str,
camera: str,
pierside: int,
) -> None:
"""
Set a new reference image in the database and copy to reference directory.
Parameters:
field (str): Target field name.
filt (str): Filter name.
exptime (str): Exposure time.
ref_image (str): Path to image file to use as reference.
camera (str): Camera name.
pierside (int): Telescope pier side (1=West, 0=East, -1=Unknown).
"""
if field is None or filt is None or exptime is None:
raise ValueError("Field, filter, and exptime must be provided")
tnow = datetime.now(UTC).isoformat().split(".")[0].replace("T", " ")
qry = """
INSERT INTO autoguider_ref
(field, camera, ref_image,
filter, exptime, valid_from, pierside)
VALUES
('%s', '%s', '%s', '%s', '%s', '%s', %d)
"""
qry_args = (
field,
camera,
os.path.split(ref_image)[-1],
filt,
exptime,
tnow,
pierside,
)
self.database_manager.execute(qry % qry_args)
# copy the file to the autoguider_ref location
self.logger.info(f"Copying reference image {ref_image} to {self.reference_dir}")
copyfile(
ref_image, os.path.join(self.reference_dir, os.path.split(ref_image)[-1])
)
[docs]
def waitForImage(
self,
camera_name: str,
image_handler: ImageHandler,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
"""
Wait for new images to appear in the monitoring directory.
Parameters:
camera_name (str): Camera name for logging.
image_handler (ImageHandler): Image handler instance to monitor.
Returns:
tuple: (newest_image, newest_field, newest_filter, newest_exptime)
Returns (None, None, None, None) if self.running becomes False.
"""
start_timestamp = datetime.now(UTC)
orig_last_image_timestamp = image_handler.last_image_timestamp
while self.running:
# Check if a new image has appeared
last_image_timestamp = image_handler.last_image_timestamp
exptime = image_handler.header.get("EXPTIME")
current_timestamp = datetime.now(UTC)
# if no images yet, wait
if last_image_timestamp is None:
time.sleep(0.1)
continue
# check if we have waited long enough
if (current_timestamp - start_timestamp).total_seconds() < max(
self.MIN_GUIDE_INTERVAL, exptime
):
time.sleep(0.1)
continue
# check if a new image has appeared to guide on
if (current_timestamp - last_image_timestamp).total_seconds() < 1 and (
last_image_timestamp != orig_last_image_timestamp
):
newest_image = image_handler.last_image_path
try:
header = fits.getheader(newest_image)
newest_filter = str(header["FILTER"]).strip("'")
newest_field = header["OBJECT"]
newest_exptime = header["EXPTIME"]
except Exception as e:
self.logMessageToDb(
camera_name,
f"Problem accessing fits file {newest_image}, skipping... Error: {e}",
)
continue
return newest_image, newest_field, newest_filter, newest_exptime
else:
time.sleep(0.1)
continue
return None, None, None, None
[docs]
def guider_loop(
self,
camera_name: str,
image_handler: ImageHandler, # type: ignore
binning: int = 1,
) -> None:
"""
Main autoguiding loop using image_handler for new images.
Continuously monitors image_handler.last_image_path and last_image_timestamp,
processes new images, updates reference images, measures shifts, and applies guiding corrections.
"""
self.running = True
self.logger.info(f"Starting guider loop for {camera_name}")
try:
while self.running:
# Get telescope alignment mode
gem = (
self.telescope.get("AlignmentMode") == AlignmentModes.algGermanPolar
)
if gem:
self.logger.info("Telescope is in German equatorial mode")
telescope_pierside = self.telescope.get("SideOfPier")
# Wait for the first image
newest_image, current_field, current_filter, current_exptime = (
self.waitForImage(camera_name, image_handler)
)
if newest_image is None:
self.logger.warning("No image found to start guiding.")
return
# Reference image logic
ref_file = self.getReferenceImage(
current_field,
current_filter,
current_exptime,
camera_name,
telescope_pierside,
)
if not ref_file or not os.path.exists(ref_file):
self.setReferenceImage(
current_field,
current_filter,
current_exptime,
newest_image,
camera_name,
telescope_pierside,
)
ref_file = os.path.join(
self.reference_dir, os.path.basename(newest_image)
)
self.logMessageToDb(camera_name, f"Ref_File created: {ref_file}")
self.logMessageToDb(camera_name, f"Ref_File: {ref_file}")
donuts_ref = Donuts(
ref_file,
normalise=False,
subtract_bkg=False,
downweight_edges=False,
image_class=CustomImageClass,
)
images_to_stabilise = IMAGES_TO_STABILISE
stabilised = "n"
# Main guiding loop
while self.running:
(
check_file,
current_field,
current_filter,
current_exptime,
) = self.waitForImage(camera_name, image_handler)
if check_file is None:
continue
# Check pierside change
if gem:
current_pierside = self.telescope.get("SideOfPier")
if current_pierside != telescope_pierside:
self.logMessageToDb(
camera_name,
f"Pierside changed from {telescope_pierside} to {current_pierside}, resetting guider loop...",
)
self.logger.info(
f"Pierside changed from {telescope_pierside} to {current_pierside}, resetting guider loop..."
)
break
if self.running:
self.logMessageToDb(
camera_name,
f"REF: {ref_file} CHECK: {check_file} [{current_filter}]",
)
images_to_stabilise -= 1
# PID reset logic
if images_to_stabilise == 0:
self.logMessageToDb(
camera_name,
"Stabilisation complete, reseting PID loop...",
)
self.PIDx = PID.from_config_dict(self.PID_COEFFS["x"])
self.PIDy = PID.from_config_dict(self.PID_COEFFS["y"])
self.PIDx.initialize_set_point(self.PID_COEFFS["set_x"])
self.PIDy.initialize_set_point(self.PID_COEFFS["set_y"])
elif images_to_stabilise > 0:
self.logMessageToDb(
camera_name, "Stabilising using P=1.0, I=0.0, D=0.0"
)
self.PIDx = PID(1.0, 0.0, 0.0)
self.PIDy = PID(1.0, 0.0, 0.0)
self.PIDx.initialize_set_point(self.PID_COEFFS["set_x"])
self.PIDy.initialize_set_point(self.PID_COEFFS["set_y"])
# Load comparison image and measure shift
try:
h2 = fits.open(check_file)
del h2
except IOError:
self.logMessageToDb(
camera_name,
f"Problem opening CHECK: {check_file}...",
)
self.logMessageToDb(
camera_name, "Breaking back to look for new file..."
)
continue
# reset culled tags
culled_max_shift_x = "n"
culled_max_shift_y = "n"
# work out shift here
shift = donuts_ref.measure_shift(check_file)
shift_x = shift.x.value # type: ignore
shift_y = shift.y.value # type: ignore
self.logMessageToDb(
camera_name, f"x shift: {float(shift_x):.2f}"
)
self.logMessageToDb(
camera_name, f"y shift: {float(shift_y):.2f}"
)
# revoke stabilisation early if shift less than 2 pixels
if (
abs(shift_x) <= 2.0
and abs(shift_y) < 2.0
and images_to_stabilise > 0
):
images_to_stabilise = 1
# Check if shift greater than max allowed error in post pull in state
if images_to_stabilise < 0:
stabilised = "y"
if abs(shift_x) > MAX_ERROR_PIXELS:
self.logMessageToDb(
camera_name,
f"X shift > {MAX_ERROR_PIXELS}, applying no correction",
)
culled_max_shift_x = "y"
else:
pre_pid_x = shift_x
if abs(shift_y) > MAX_ERROR_PIXELS:
self.logMessageToDb(
camera_name,
f"Y shift > {MAX_ERROR_PIXELS}, applying no correction",
)
culled_max_shift_y = "y"
else:
pre_pid_y = shift_y
else:
self.logMessageToDb(
camera_name,
"Allowing field to stabilise, imposing new max error clip",
)
stabilised = "n"
if shift_x > MAX_ERROR_STABIL_PIXELS:
pre_pid_x = MAX_ERROR_STABIL_PIXELS
elif shift_x < -MAX_ERROR_STABIL_PIXELS:
pre_pid_x = -MAX_ERROR_STABIL_PIXELS
else:
pre_pid_x = shift_x
if shift_y > MAX_ERROR_STABIL_PIXELS:
pre_pid_y = MAX_ERROR_STABIL_PIXELS
elif shift_y < -MAX_ERROR_STABIL_PIXELS:
pre_pid_y = -MAX_ERROR_STABIL_PIXELS
else:
pre_pid_y = shift_y
# if either axis is off by > MAX error then stop everything, no point guiding
# in 1 axis, need to figure out the source of the problem and run again
if culled_max_shift_x == "y" or culled_max_shift_y == "y":
(
pre_pid_x,
pre_pid_y,
post_pid_x,
post_pid_y,
std_buff_x,
std_buff_y,
) = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
else:
if self.running:
(
applied,
post_pid_x,
post_pid_y,
std_buff_x,
std_buff_y,
) = self.guide(
pre_pid_x,
pre_pid_y,
images_to_stabilise,
camera_name,
binning,
gem,
)
else:
break
# Extract night date from directory path
night_path = image_handler.last_image_path.parent
night_date = os.path.basename(
night_path
) # Get just the date folder name
log_list = [
self.telescope.device_name,
night_date,
os.path.basename(ref_file),
str(check_file),
stabilised,
str(round(shift_x, 3)),
str(round(shift_y, 3)),
str(round(pre_pid_x, 3)),
str(round(pre_pid_y, 3)),
str(round(post_pid_x, 3)),
str(round(post_pid_y, 3)),
str(round(std_buff_x, 3)),
str(round(std_buff_y, 3)),
culled_max_shift_x,
culled_max_shift_y,
]
self.logShiftsToDb(tuple(log_list))
self.logger.info(f"Guider post_pid_x shift: {post_pid_x:.2f}")
self.logger.info(f"Guider post_pid_y shift: {post_pid_y:.2f}")
except Exception as e:
self.running = False
self.logger.report_device_issue(
device_type="Guider",
device_name=self.telescope.device_name,
message="Error in guide loop",
exception=e,
)
self.logger.info("Stopping guider loop.")
"""
PID loop controller
"""
[docs]
class PID:
"""
Discrete PID controller for autoguiding corrections.
Implements a digital PID control loop with configurable proportional, integral,
and derivative gains. Includes integrator clamping to prevent windup.
Based on: http://code.activestate.com/recipes/577231-discrete-pid-controller/
Parameters:
kp (float, optional): Proportional gain. Defaults to 0.5.
ki (float, optional): Integral gain. Defaults to 0.25.
kd (float, optional): Derivative gain. Defaults to 0.0.
derivator (float, optional): Initial derivative term. Defaults to 0.
integrator (float, optional): Initial integrator value. Defaults to 0.
integrator_max (float, optional): Maximum integrator value. Defaults to 500.
integrator_min (float, optional): Minimum integrator value. Defaults to -500.
"""
def __init__(
self,
kp: float = 0.5,
ki: float = 0.25,
kd: float = 0.0,
derivator: float = 0,
integrator: float = 0,
integrator_max: float = 500,
integrator_min: float = -500,
) -> None:
self.kp: float = kp
self.ki: float = ki
self.kd: float = kd
self.derivator: float = derivator
self.integrator: float = integrator
self.integrator_max: float = integrator_max
self.integrator_min: float = integrator_min
self.set_point: float = 0.0
self.error: float = 0.0
self.p_value: float = 0.0
self.d_value: float = 0.0
self.i_value: float = 0.0
[docs]
def update(self, current_value: float) -> float:
"""
Calculate PID output for given input and feedback.
Parameters:
current_value (float): Current process value (feedback).
Returns:
float: PID controller output.
"""
self.error = self.set_point - current_value
self.p_value = self.kp * self.error
self.d_value = self.kd * (self.error - self.derivator)
self.derivator = self.error
self.integrator = self.integrator + self.error
if self.integrator > self.integrator_max:
self.integrator = self.integrator_max
elif self.integrator < self.integrator_min:
self.integrator = self.integrator_min
self.i_value = self.integrator * self.ki
pid = self.p_value + self.i_value + self.d_value
return pid
[docs]
def initialize_set_point(self, set_point: float) -> None:
"""
Initialize the PID setpoint and reset integrator/derivator.
Parameters:
set_point (float): Desired target value.
"""
self.set_point = set_point
self.integrator = 0
self.derivator = 0
[docs]
@classmethod
def from_config_dict(cls, config: Dict[str, float]) -> "PID":
"""
Create a PID instance from a configuration dictionary.
Parameters:
config (dict): Configuration with keys 'p', 'i', 'd'
"""
return cls(
kp=config["p"],
ki=config["i"],
kd=config["d"],
)