Skip to content

schedule_ module

Utilities for scheduling jobs.


AsyncJob class

AsyncJob(
    interval,
    scheduler=None
)

Async CustomJob.

Superclasses


async_run method

AsyncJob.async_run()

Async CustomJob.run.


AsyncScheduler class

AsyncScheduler()

Async CustomScheduler.

Superclasses


async_run_all method

AsyncScheduler.async_run_all(
    delay_seconds=0
)

Async CustomScheduler.run_all.


async_run_pending method

AsyncScheduler.async_run_pending()

Async CustomScheduler.run_pending.


every method

AsyncScheduler.every(
    interval=1
)

Schedule a new periodic job of type AsyncJob.


CancelledError class

CancelledError(
    *args,
    **kwargs
)

Thrown for the operation to be cancelled.

Superclasses

  • asyncio.exceptions.CancelledError
  • builtins.BaseException

CustomJob class

CustomJob(
    interval,
    scheduler=None
)

A periodic job as used by :class:Scheduler.

:param interval: A quantity of a certain time unit :param scheduler: The :class:Scheduler <Scheduler> instance that this job will register itself with once it has been fully configured in :meth:Job.do().

Every job runs at a given fixed time interval that is defined by:

  • a :meth:time unit <Job.second>
  • a quantity of time units defined by interval

A job is usually created and returned by :meth:Scheduler.every method, which also defines its interval.

Superclasses

  • schedule.Job

Subclasses


force_missed_run property


modulo property


zero_offset property


CustomScheduler class

CustomScheduler()

Objects instantiated by the :class:Scheduler <Scheduler> are factories to create jobs, keep record of scheduled jobs and handle their execution.

Superclasses

  • schedule.Scheduler

Subclasses


ScheduleManager class

ScheduleManager(
    scheduler=None
)

Class that manages CustomScheduler.


async_start method

ScheduleManager.async_start(
    sleep=1,
    clear_after=False
)

Async run pending jobs in a loop.


async_task property

Current async task.


async_task_running property

Whether the async task is running.


clear_jobs method

ScheduleManager.clear_jobs(
    tags=None
)

Delete scheduled jobs with the given tags, or all jobs if tag is omitted.


done_callback method

ScheduleManager.done_callback(
    async_task
)

Callback run when the async task is finished.


every method

ScheduleManager.every(
    *args,
    to=None,
    zero_offset=False,
    force_missed_run=False,
    tags=None
)

Create a new job that runs every interval units of time.

*args can include at most four different arguments: interval, unit, start_day, and at, in the strict order:

See the package schedule for more details.

Usage

>>> from vectorbtpro import *

>>> def job_func(message="I'm working..."):
...     print(message)

>>> my_manager = vbt.ScheduleManager()

>>> # add jobs
>>> my_manager.every().do(job_func, message="Hello")
Every 1 second do job_func(message='Hello') (last run: [never], next run: 2021-03-18 19:06:47)

>>> my_manager.every(10, 'minutes').do(job_func)
Every 10 minutes do job_func() (last run: [never], next run: 2021-03-18 19:16:46)

>>> my_manager.every(10, 'minutes', ':00', zero_offset=True).do(job_func)
Every 10 minutes at 00:00:00 do job_func() (last run: [never], next run: 2022-08-18 16:10:00)

>>> my_manager.every('hour').do(job_func)
Every 1 hour do job_func() (last run: [never], next run: 2021-03-18 20:06:46)

>>> my_manager.every('hour', '00:00').do(job_func)
Every 1 hour at 00:00:00 do job_func() (last run: [never], next run: 2021-03-18 20:00:00)

>>> my_manager.every(4, 'hours', '00:00').do(job_func)
Every 4 hours at 00:00:00 do job_func() (last run: [never], next run: 2021-03-19 00:00:00)

>>> my_manager.every('10:30').do(job_func)
Every 1 day at 10:30:00 do job_func() (last run: [never], next run: 2021-03-19 10:30:00)

>>> my_manager.every('hour', '00:00').do(job_func)
Every 1 hour at 00:00:00 do job_func() (last run: [never], next run: 2021-03-19 10:30:00)

>>> my_manager.every(4, 'hour', '00:00').do(job_func)
Every 4 hours at 00:00:00 do job_func() (last run: [never], next run: 2021-03-19 10:30:00)

>>> my_manager.every('day', '10:30').do(job_func)
Every 1 day at 10:30:00 do job_func() (last run: [never], next run: 2021-03-19 10:30:00)

>>> my_manager.every('day', time(9, 30, tzinfo="utc")).do(job_func)
Every 1 day at 10:30:00 do job_func() (last run: [never], next run: 2021-03-19 10:30:00)

>>> my_manager.every('monday').do(job_func)
Every 1 week do job_func() (last run: [never], next run: 2021-03-22 19:06:46)

>>> my_manager.every('wednesday', '13:15').do(job_func)
Every 1 week at 13:15:00 do job_func() (last run: [never], next run: 2021-03-24 13:15:00)

>>> my_manager.every('minute', ':17').do(job_func)
Every 1 minute at 00:00:17 do job_func() (last run: [never], next run: 2021-03-18 19:07:17)

>>> my_manager.start()

You can still use the chained approach as done by schedule:

>>> my_manager.every().minute.at(':17').do(job_func)
Every 1 minute at 00:00:17 do job_func() (last run: [never], next run: 2021-03-18 19:07:17)

scheduler property

Scheduler.


start method

ScheduleManager.start(
    sleep=1,
    clear_after=False
)

Run pending jobs in a loop.


start_in_background method

ScheduleManager.start_in_background(
    **kwargs
)

Run ScheduleManager.async_start() in the background.


stop method

ScheduleManager.stop()

Stop the async task.


units class variable


weekdays class variable