astra.scheduler#

Classes

Action(device_name, action_type, ...[, ...])

A scheduled action for a device.

ActionStatus(value)

Status of a scheduled action.

Schedule(actions)

A list of scheduled actions.

ScheduleManager(schedule_path, ...[, ...])

class astra.scheduler.ActionStatus(value)[source]#

Bases: Enum

Status of a scheduled action.

PENDING = 'pending'#
RUNNING = 'running'#
FINISHED = 'finished'#
FAILED = 'failed'#
class astra.scheduler.Action(device_name: str, action_type: str, action_value: BaseActionConfig, start_time: datetime, end_time: datetime, completed: bool = False, status: ActionStatus = ActionStatus.PENDING)[source]#

Bases: object

A scheduled action for a device.

Examples#

>>> from astra.scheduler import Action
>>> from astra.action_configs import ObjectActionConfig
>>> from datetime import datetime, UTC
>>> action = Action(
...     device_name="test_camera",
...     action_type="object",
...     action_value=ObjectActionConfig(
...         object="M42",
...         exptime=10.0,
...     ),
...     start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC),
...     end_time=datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC),
... )
>>> action.validate()
device_name: str#
action_type: str#
action_value: BaseActionConfig#
start_time: datetime#
end_time: datetime#
completed: bool = False#
status: ActionStatus = 'pending'#
get(key: str, default=None)[source]#
property duration#
validate()[source]#
update_times(time_factor: float, new_start_time: datetime | None = None) Action[source]#

Update the start and end times to present day factored by the time factor.

summary_string(verbose=False) str[source]#
set_status(status: ActionStatus | str)[source]#
to_dict(iso: bool = False) dict[source]#

Return a JSON-serializable dict representation of this Action.

iso: if True, format datetimes as ISO strings (for JSONL); otherwise

keep datetime objects (for DataFrame).

class astra.scheduler.Schedule(actions: List[Action])[source]#

Bases: list[Action]

A list of scheduled actions.

Examples#

>>> from astra.scheduler import Schedule, Action
>>> from astra.action_configs import ObjectActionConfig, OpenActionConfig
>>> from datetime import datetime, UTC
>>> actions = [
...     Action(
...         device_name="test_camera",
...         action_type="open",
...         action_value=OpenActionConfig(),
...         start_time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC),
...         end_time=datetime(2024, 1, 1, 12, 30, 0, tzinfo=UTC),
...     ),
...     Action(
...         device_name="test_camera",
...         action_type="object",
...         action_value=ObjectActionConfig(
...             object="M42",
...             exptime=10.0,
...         ),
...         start_time=datetime(2024, 1, 1, 12, 30, 0, tzinfo=UTC),
...         end_time=datetime(2024, 1, 1, 12, 35, 0, tzinfo=UTC),
...     ),
... ]
>>> schedule = Schedule(actions)
>>> schedule.validate()
ACTION_CONFIGS: Dict[str, Type[BaseActionConfig]] = {'autofocus': <class 'astra.action_configs.AutofocusConfig'>, 'calibrate_guiding': <class 'astra.action_configs.CalibrateGuidingActionConfig'>, 'calibration': <class 'astra.action_configs.CalibrationActionConfig'>, 'close': <class 'astra.action_configs.CloseActionConfig'>, 'complete_headers': <class 'astra.action_configs.CompleteHeadersActionConfig'>, 'cool_camera': <class 'astra.action_configs.CoolCameraActionConfig'>, 'flats': <class 'astra.action_configs.FlatsActionConfig'>, 'object': <class 'astra.action_configs.ObjectActionConfig'>, 'open': <class 'astra.action_configs.OpenActionConfig'>, 'pointing_model': <class 'astra.action_configs.PointingModelActionConfig'>}#
classmethod from_file(filename: str | Path) Schedule[source]#

Read a schedule file and return a Schedule instance with parsed schedule data.

Parameters:

filename – Path to the schedule file

classmethod from_dataframe(df: DataFrame, observatory_config: object | None = None) Schedule[source]#

Construct a Schedule instance from a pandas DataFrame.

Parameters:
  • df – DataFrame with schedule data

  • observatory_config – Optional observatory configuration

update_times(time_factor: float)[source]#

Update the start and end times of all actions in this Schedule to present day, factored by the time factor. Modifies the schedule in-place.

validate(filterwheel_names: dict[str, list[str]] | None = None, observatory_location: EarthLocation | None = None, min_altitude: float = 0.0)[source]#

Validate the schedule actions.

Parameters:
  • filterwheel_names – Optional dict mapping filterwheel names to their filter lists for validation. e.g., {“fw1”: [“Clear”, “Red”, “Green”]}

  • observatory_location – Optional EarthLocation for visibility checks

  • min_altitude – Minimum altitude in degrees for visibility checks (default: 0°)

Raises:

ValueError – If any action fails validation.

get_by_device(device_name: str) List[Action][source]#
sort_by_start_time()[source]#

Return a new Schedule sorted by start_time.

to_dataframe() DataFrame[source]#
save_to_csv(filename: str | Path)[source]#

Write the schedule to a CSV file.

is_completed() bool[source]#
reset_completion()[source]#
to_jsonl_string() str[source]#

Convert the schedule to a JSONL string.

to_one_line_string() str[source]#

Convert the schedule to a single line string.

save_to_jsonl(filename: str | Path)[source]#

Write the schedule to a JSONL file.

classmethod convert_action_value_string(action_value_str)[source]#

Read an old schedule CSV file and convert it to a Schedule instance.

class astra.scheduler.ScheduleManager(schedule_path: str | Path, truncate_factor: float | None, logger: ObservatoryLogger, device_manager=None)[source]#

Bases: object

get_filterwheel_names() dict[str, list[str]][source]#

Get available filter names from all filterwheels.

Returns:

Dict mapping filterwheel device names to their filter lists. Empty dict if device_manager is not available or no filterwheels exist.

get_observatory_location() EarthLocation | None[source]#

Get observatory location from telescope device.

Returns:

EarthLocation object if telescope device is available, None otherwise.

get_schedule() Schedule[source]#
read() Schedule | None[source]#

Read and process the observatory schedule from CSV file.

Loads the schedule CSV file and converts it to a pandas DataFrame with proper datetime parsing. Automatically reloads the schedule if the file has been modified since the last read. Supports schedule truncation for development and testing purposes.

Returns:

pd.DataFrame or None

A DataFrame containing the schedule data with

properly parsed ‘start_time’ and ‘end_time’ columns, or None if an error occurs during reading.

Features: - Automatic file modification detection and reload - Datetime parsing for start_time and end_time columns - Optional schedule truncation for development (via truncate_factor) - Error handling with logging and error source tracking

File Format: - CSV file with columns including start_time, end_time, device_name,

action_type, and action_value

  • Datetime columns should be in ISO format compatible with pandas

Note

  • Schedule is sorted by start_time after loading

  • Truncation moves schedule to current time for testing

  • File modification time is tracked to enable automatic reloading

reload_if_updated() bool[source]#

Reload the schedule if the schedule file has been modified.

Checks if the schedule file has been updated since it was last read. If the file has been modified, it reloads the schedule and updates the internal state accordingly.

Process: 1. Checks if the schedule file modification time is greater than

the last recorded modification time.

  1. If updated, calls the read() method to reload the schedule.

  2. Updates the internal schedule state.

Note

  • Does not interrupt a currently running schedule.

  • Logs a warning if attempting to update while running.

  • Useful for dynamic schedule updates during operation.

get_mtime() float[source]#

Get the modification timestamp of the schedule file.

Retrieves the last modification time of the schedule CSV file to enable automatic detection of schedule updates. Returns 0 if the file doesn’t exist, which can be used to detect when no schedule is available.

Returns:

float

The Unix timestamp of the schedule file’s last modification

time, or 0.0 if the file does not exist.

Note

  • Used by read_schedule() to detect file changes

  • Enables automatic schedule reloading during operation

  • Returns 0 for non-existent files to simplify logic

is_schedule_updated() bool[source]#

Return True if the schedule file has been modified since last read.

stop_schedule(thread_manager) None[source]#

Stop the currently running schedule execution thread.

Safely stops the schedule execution by setting the schedule_running flag to False and waiting for the schedule thread to complete. This ensures that any ongoing actions can finish cleanly before the schedule stops.

Process: 1. Sets schedule_running flag to False (signals thread to stop) 2. Finds the schedule thread in the threads list 3. Waits for the thread to complete using join() 4. Logs the stopping action

Thread Safety: - Uses thread.join() to ensure clean shutdown - Schedule thread checks schedule_running flag regularly - Ongoing actions are allowed to complete before stopping

Note

  • If no schedule is running, logs a warning and returns

  • Essential for emergency stops and robotic switch operations

  • Used when weather becomes unsafe or errors occur

get_completion_status() list[source]#
get_completed_percentage() float[source]#