Cron-like task scheduler with overlap prevention and interval support.
pip install philiprehberger-task-schedulerCron-like task scheduler with overlap prevention and interval support.
pip install philiprehberger-task-scheduler
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
@scheduler.cron("*/5 * * * *") # every 5 minutes
def check_health():
ping_server()
scheduler.start() # blocks
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
@scheduler.interval(seconds=30)
def poll_queue():
process_messages()
@scheduler.interval(minutes=5, overlap=False)
def sync_data():
pull_latest_data()
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
@scheduler.once(delay=10) # run once after 10 seconds
def startup_task():
warm_cache()
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
scheduler.start(background=True)
# ... your app continues running ...
scheduler.stop()
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
scheduler.add("my-job", fn=my_function, cron="0 * * * *")
scheduler.add("poller", fn=poll, interval_seconds=60)
scheduler.remove("my-job")
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler(history_limit=50)
scheduler.add("job", fn=my_function, interval_seconds=60)
scheduler.start(background=True)
# Get all history (newest first)
for record in scheduler.history:
print(f"{record.job_name}: {record.status.value} in {record.duration_seconds:.2f}s")
# Get history for a specific job
for record in scheduler.get_job_history("job"):
if record.error:
print(f"Error: {record.error}")
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
scheduler.add("job", fn=my_function, cron="0 * * * *")
for name, next_time in scheduler.next_runs():
print(f"{name}: next at {next_time}")
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
@scheduler.interval(seconds=60, name="fetch-data")
def fetch_data():
download_latest()
@scheduler.interval(seconds=60, name="process-data", depends_on="fetch-data")
def process_data():
transform_and_store()
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
scheduler.start(background=True)
# Wait for running tasks to finish before stopping
scheduler.stop(wait=True)
# Or set a timeout (seconds)
scheduler.stop(wait=True, timeout=10)
from philiprehberger_task_scheduler import Scheduler, MissedJobPolicy
scheduler = Scheduler()
@scheduler.cron("0 * * * *", missed_policy=MissedJobPolicy.RUN_ONCE)
def hourly_sync():
sync_data()
@scheduler.cron("*/5 * * * *", missed_policy=MissedJobPolicy.RUN_ALL)
def critical_check():
check_systems()
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
scheduler.add("my-job", fn=my_function, interval_seconds=60)
scheduler.pause("my-job") # job won't run during ticks
scheduler.resume("my-job") # job runs again
from philiprehberger_task_scheduler import Scheduler
scheduler = Scheduler()
scheduler.add("job1", fn=fn1, interval_seconds=60)
scheduler.add("job2", fn=fn2, cron="0 * * * *")
print(scheduler.job_count) # 2
print(scheduler.is_running) # False
scheduler.clear() # remove all jobs
print(scheduler.job_count) # 0
Standard 5-field cron expressions:
minute (0-59)
hour (0-23)
day of month (1-31)
month (1-12)
day of week (0-6, Mon-Sun)
Supports: *, ranges (1-5), lists (1,3,5), steps (*/5).
| Function / Class | Description |
|---|---|
Scheduler(history_limit) | Task scheduler with cron, interval, and one-shot scheduling |
Scheduler.cron(expression, overlap, name, depends_on, missed_policy) | Decorator to schedule with a cron expression |
Scheduler.interval(seconds, minutes, hours, overlap, name, depends_on, missed_policy) | Decorator to schedule at a fixed interval |
Scheduler.once(delay, name) | Decorator to schedule a one-shot task |
Scheduler.add(name, fn, cron, interval_seconds, overlap, depends_on, missed_policy) | Programmatically add a job |
Scheduler.remove(name) | Remove a job by name |
Scheduler.pause(name) | Pause a job without removing it |
Scheduler.resume(name) | Resume a paused job |
Scheduler.clear() | Remove all registered jobs |
Scheduler.job_count | Number of registered jobs |
Scheduler.is_running | Whether the scheduler is currently running |
Scheduler.start(background) | Start the scheduler (blocks unless background=True) |
Scheduler.stop(wait, timeout) | Stop the scheduler with optional graceful shutdown |
Scheduler.next_runs() | Get the next run time for each job |
Scheduler.history | Execution history (newest first) |
Scheduler.get_job_history(name) | Execution history for a specific job |
Job | A scheduled job with name, function, schedule config, and next_run property |
ExecutionRecord | Record of a job execution with status, duration, and error |
ExecutionStatus | Enum: SUCCESS, FAILED |
MissedJobPolicy | Enum: SKIP, RUN_ONCE, RUN_ALL |
pip install -e .
python -m pytest tests/ -v
If you find this project useful: