astra.scheduler#
Classes
|
A scheduled action for a device. |
|
Status of a scheduled action. |
|
A list of scheduled actions. |
|
- class astra.scheduler.ActionStatus(value)[source]#
Bases:
EnumStatus of a scheduled action.
- PENDING = 'pending'#
- RUNNING = 'running'#
- FINISHED = 'finished'#
- FAILED = 'failed'#
- class astra.scheduler.Action(device_name: str, action_type: str, action_value: BaseActionConfig, start_time: datetime, end_time: datetime, completed: bool = False, status: ActionStatus = ActionStatus.PENDING)[source]#
Bases:
objectA scheduled action for a device.
Examples#
>>> from astra.scheduler import Action >>> from astra.action_configs import ObjectActionConfig >>> from datetime import datetime, UTC >>> action = Action( ... device_name="test_camera", ... action_type="object", ... action_value=ObjectActionConfig( ... object="M42", ... exptime=10.0, ... ), ... start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC), ... end_time=datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC), ... ) >>> action.validate()
- action_value: BaseActionConfig#
- status: ActionStatus = 'pending'#
- property duration#
- update_times(time_factor: float, new_start_time: datetime | None = None) Action[source]#
Update the start and end times to present day factored by the time factor.
- set_status(status: ActionStatus | str)[source]#
- class astra.scheduler.Schedule(actions: List[Action])[source]#
-
A list of scheduled actions.
Examples#
>>> from astra.scheduler import Schedule, Action >>> from astra.action_configs import ObjectActionConfig, OpenActionConfig >>> from datetime import datetime, UTC >>> actions = [ ... Action( ... device_name="test_camera", ... action_type="open", ... action_value=OpenActionConfig(), ... start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC), ... end_time=datetime(2024, 1, 1, 12, 30, 0, tzinfo=UTC), ... ), ... Action( ... device_name="test_camera", ... action_type="object", ... action_value=ObjectActionConfig( ... object="M42", ... exptime=10.0, ... ), ... start_time=datetime(2024, 1, 1, 12, 30, 0, tzinfo=UTC), ... end_time=datetime(2024, 1, 1, 12, 35, 0, tzinfo=UTC), ... ), ... ] >>> schedule = Schedule(actions) >>> schedule.validate()
- ACTION_CONFIGS: Dict[str, Type[BaseActionConfig]] = {'autofocus': <class 'astra.action_configs.AutofocusConfig'>, 'calibrate_guiding': <class 'astra.action_configs.CalibrateGuidingActionConfig'>, 'calibration': <class 'astra.action_configs.CalibrationActionConfig'>, 'close': <class 'astra.action_configs.CloseActionConfig'>, 'complete_headers': <class 'astra.action_configs.CompleteHeadersActionConfig'>, 'cool_camera': <class 'astra.action_configs.CoolCameraActionConfig'>, 'flats': <class 'astra.action_configs.FlatsActionConfig'>, 'object': <class 'astra.action_configs.ObjectActionConfig'>, 'open': <class 'astra.action_configs.OpenActionConfig'>, 'pointing_model': <class 'astra.action_configs.PointingModelActionConfig'>}#
- classmethod from_file(filename: str | Path) Schedule[source]#
Read a schedule file and return a Schedule instance with parsed schedule data.
- Parameters:
filename – Path to the schedule file
- classmethod from_dataframe(df: DataFrame, observatory_config: object | None = None) Schedule[source]#
Construct a Schedule instance from a pandas DataFrame.
- Parameters:
df – DataFrame with schedule data
observatory_config – Optional observatory configuration
- update_times(time_factor: float)[source]#
Update the start and end times of all actions in this Schedule to present day, factored by the time factor. Modifies the schedule in-place.
- validate(filterwheel_names: dict[str, list[str]] | None = None, observatory_location: EarthLocation | None = None, min_altitude: float = 0.0)[source]#
Validate the schedule actions.
- Parameters:
filterwheel_names – Optional dict mapping filterwheel names to their filter lists for validation. e.g., {“fw1”: [“Clear”, “Red”, “Green”]}
observatory_location – Optional EarthLocation for visibility checks
min_altitude – Minimum altitude in degrees for visibility checks (default: 0°)
- Raises:
ValueError – If any action fails validation.
- class astra.scheduler.ScheduleManager(schedule_path: str | Path, truncate_factor: float | None, logger: ObservatoryLogger, device_manager=None)[source]#
Bases:
object- get_filterwheel_names() dict[str, list[str]][source]#
Get available filter names from all filterwheels.
- Returns:
Dict mapping filterwheel device names to their filter lists. Empty dict if device_manager is not available or no filterwheels exist.
- get_observatory_location() EarthLocation | None[source]#
Get observatory location from telescope device.
- Returns:
EarthLocation object if telescope device is available, None otherwise.
- read() Schedule | None[source]#
Read and process the observatory schedule from CSV file.
Loads the schedule CSV file and converts it to a pandas DataFrame with proper datetime parsing. Automatically reloads the schedule if the file has been modified since the last read. Supports schedule truncation for development and testing purposes.
- Returns:
pd.DataFrame or None –
- A DataFrame containing the schedule data with
properly parsed ‘start_time’ and ‘end_time’ columns, or None if an error occurs during reading.
Features: - Automatic file modification detection and reload - Datetime parsing for start_time and end_time columns - Optional schedule truncation for development (via truncate_factor) - Error handling with logging and error source tracking
File Format: - CSV file with columns including start_time, end_time, device_name,
action_type, and action_value
Datetime columns should be in ISO format compatible with pandas
Note
Schedule is sorted by start_time after loading
Truncation moves schedule to current time for testing
File modification time is tracked to enable automatic reloading
- reload_if_updated() bool[source]#
Reload the schedule if the schedule file has been modified.
Checks if the schedule file has been updated since it was last read. If the file has been modified, it reloads the schedule and updates the internal state accordingly.
Process: 1. Checks if the schedule file modification time is greater than
the last recorded modification time.
If updated, calls the read() method to reload the schedule.
Updates the internal schedule state.
Note
Does not interrupt a currently running schedule.
Logs a warning if attempting to update while running.
Useful for dynamic schedule updates during operation.
- get_mtime() float[source]#
Get the modification timestamp of the schedule file.
Retrieves the last modification time of the schedule CSV file to enable automatic detection of schedule updates. Returns 0 if the file doesn’t exist, which can be used to detect when no schedule is available.
- Returns:
float –
- The Unix timestamp of the schedule file’s last modification
time, or 0.0 if the file does not exist.
Note
Used by read_schedule() to detect file changes
Enables automatic schedule reloading during operation
Returns 0 for non-existent files to simplify logic
- is_schedule_updated() bool[source]#
Return True if the schedule file has been modified since last read.
- stop_schedule(thread_manager) None[source]#
Stop the currently running schedule execution thread.
Safely stops the schedule execution by setting the schedule_running flag to False and waiting for the schedule thread to complete. This ensures that any ongoing actions can finish cleanly before the schedule stops.
Process: 1. Sets schedule_running flag to False (signals thread to stop) 2. Finds the schedule thread in the threads list 3. Waits for the thread to complete using join() 4. Logs the stopping action
Thread Safety: - Uses thread.join() to ensure clean shutdown - Schedule thread checks schedule_running flag regularly - Ongoing actions are allowed to complete before stopping
Note
If no schedule is running, logs a warning and returns
Essential for emergency stops and robotic switch operations
Used when weather becomes unsafe or errors occur