Skip to content

execution module

Engines for executing functions.


DUMMY _Dummy

Sentinel that represents a missing value.


NoResult _NoResult

Sentinel that represents no result.


execute function

execute(
    tasks,
    size=None,
    keys=None,
    executor_cls=None,
    engine=None,
    engine_config=None,
    min_size=None,
    n_chunks=None,
    chunk_len=None,
    chunk_meta=None,
    distribute=None,
    warmup=None,
    in_chunk_order=None,
    cache_chunks=None,
    chunk_cache_dir=None,
    chunk_cache_save_kwargs=None,
    chunk_cache_load_kwargs=None,
    pre_clear_chunk_cache=None,
    post_clear_chunk_cache=None,
    release_chunk_cache=None,
    chunk_clear_cache=None,
    chunk_collect_garbage=None,
    chunk_delay=None,
    pre_execute_func=None,
    pre_execute_kwargs=None,
    pre_chunk_func=None,
    pre_chunk_kwargs=None,
    post_chunk_func=None,
    post_chunk_kwargs=None,
    post_execute_func=None,
    post_execute_kwargs=None,
    post_execute_on_sorted=None,
    filter_results=None,
    raise_no_results=None,
    merge_func=None,
    merge_kwargs=None,
    template_context=None,
    show_progress=None,
    pbar_kwargs=None,
    merge_to_engine_config=None,
    **kwargs
)

Execute functions and their arguments using Executor.

Keyword arguments **kwargs and engine_config are merged into engine_config if merge_to_engine_config is True, otherwise, **kwargs are passed directly to Executor.


filter_out_no_results function

filter_out_no_results(
    objs,
    keys=_Missing.MISSING,
    raise_error=True
)

Filter objects and keys by removing NoResult objects.


iterated function

iterated(
    *args,
    over_arg=None,
    executor_cls=None,
    engine=None,
    engine_config=None,
    min_size=None,
    n_chunks=None,
    chunk_len=None,
    chunk_meta=None,
    distribute=None,
    warmup=None,
    in_chunk_order=None,
    cache_chunks=None,
    chunk_cache_dir=None,
    chunk_cache_save_kwargs=None,
    chunk_cache_load_kwargs=None,
    pre_clear_chunk_cache=None,
    post_clear_chunk_cache=None,
    release_chunk_cache=None,
    chunk_clear_cache=None,
    chunk_collect_garbage=None,
    chunk_delay=None,
    pre_execute_func=None,
    pre_execute_kwargs=None,
    pre_chunk_func=None,
    pre_chunk_kwargs=None,
    post_chunk_func=None,
    post_chunk_kwargs=None,
    post_execute_func=None,
    post_execute_kwargs=None,
    post_execute_on_sorted=None,
    filter_results=None,
    raise_no_results=None,
    merge_func=None,
    merge_kwargs=None,
    template_context=None,
    show_progress=None,
    pbar_kwargs=None,
    merge_to_engine_config=None,
    **kwargs
)

Decorator that executes a function in iteration using Executor.

Returns a new function with the same signature as the passed one.

Use over_arg to specify which argument (position or name) should be iterated over. If it's None (default), uses the first argument.

Each option can be modified in the options attribute of the wrapper function or directly passed as a keyword argument with a leading underscore. You can also explicitly specify keys and size by passing them as _keys and _size respectively if the range-like object is an iterator.

Keyword arguments **kwargs and engine_config are merged into engine_config if merge_to_engine_config is True, otherwise, **kwargs are passed directly to Executor.

If NoResult is returned, will skip the current iteration and remove it from the final index.


parse_iterable_and_keys function

parse_iterable_and_keys(
    iterable_like,
    keys=None
)

Parse the iterable and the keys from an iterable-like and a keys-like object respectively.

Object can be an integer that will be interpreted as a total or any iterable.

If object is a dictionary, a Pandas Index, or a Pandas Series, keys will be set to the index. Otherwise, keys will be extracted using index_from_values(). Keys won't be extracted if the object is not a sequence to avoid materializing it.


pass_kwargs_as_args function

pass_kwargs_as_args(
    func,
    args,
    kwargs
)

Helper function for pathos.pools.ParallelPool.


DaskEngine class

DaskEngine(
    compute_kwargs=None,
    **kwargs
)

Class for executing functions in parallel using Dask.

For defaults, see engines.dask in execution.

Note

Use multi-threading mainly on numeric code that releases the GIL (like NumPy, Pandas, Scikit-Learn, Numba).

Superclasses

Inherited members


compute_kwargs property

Keyword arguments passed to dask.compute.


ExecutionEngine class

ExecutionEngine(
    **config
)

Abstract class for executing functions.

Superclasses

Inherited members

Subclasses


execute method

ExecutionEngine.execute(
    tasks,
    size=None,
    keys=None
)

Run an iterable of tuples out of a function, arguments, and keyword arguments.

Provide size in case tasks is a generator and the underlying engine needs it.


Executor class

Executor(
    engine=None,
    engine_config=None,
    min_size=None,
    n_chunks=None,
    chunk_len=None,
    chunk_meta=None,
    distribute=None,
    warmup=None,
    in_chunk_order=None,
    cache_chunks=None,
    chunk_cache_dir=None,
    chunk_cache_save_kwargs=None,
    chunk_cache_load_kwargs=None,
    pre_clear_chunk_cache=None,
    post_clear_chunk_cache=None,
    release_chunk_cache=None,
    chunk_clear_cache=None,
    chunk_collect_garbage=None,
    chunk_delay=None,
    pre_execute_func=None,
    pre_execute_kwargs=None,
    pre_chunk_func=None,
    pre_chunk_kwargs=None,
    post_chunk_func=None,
    post_chunk_kwargs=None,
    post_execute_func=None,
    post_execute_kwargs=None,
    post_execute_on_sorted=None,
    filter_results=None,
    raise_no_results=None,
    merge_func=None,
    merge_kwargs=None,
    template_context=None,
    show_progress=None,
    pbar_kwargs=None,
    **kwargs
)

Class responsible executing functions.

Supported values for engine:

Can execute per chunk if chunk_meta is provided. Otherwise, if any of n_chunks and chunk_len are set, passes them to yield_chunk_meta() to generate chunk_meta. Arguments n_chunks and chunk_len can be set globally in the engine-specific settings. Set n_chunks and chunk_len to 'auto' to set them to the number of cores.

If distribute is "tasks", distributes tasks within each chunk. If indices in chunk_meta are perfectly sorted and tasks is an iterable, iterates over tasks to avoid converting it into a list. Otherwise, iterates over chunk_meta. If in_chunk_order is True, returns the results in the order they appear in chunk_meta. Otherwise, always returns them in the same order as in tasks.

If distribute is "chunks", distributes chunks. For this, executes tasks within each chunk serially using Executor.execute_serially(). Also, compresses each chunk such that each unique function, positional argument, and keyword argument is serialized only once.

If tasks is a custom template, substitutes it once chunk_meta is established. Use template_context as an additional context. All the resolved functions and arguments will be immediately passed to the executor.

If pre_chunk_func is not None, calls the function before processing a chunk. If it returns anything other than None, the returned object will be appended to the results and the chunk won't be executed. This enables use cases such as caching. If post_chunk_func is not None, calls the function after processing the chunk. It should return either None to keep the old call results, or return new ones. Will also substitute any templates in pre_chunk_kwargs and post_chunk_kwargs and pass them as keyword arguments. The following additional arguments are available in the contexts: the index of the current chunk chunk_idx, the list of call indices call_indices in the chunk, the list of call results chunk_cache returned from caching (only for pre_chunk_func), the list of call results call_results returned by executing the chunk (only for post_chunk_func), and whether the chunk was executed chunk_executed or otherwise returned by pre_chunk_func (only for post_chunk_func).

Note

The both callbacks above are effective only when distribute is "tasks" and chunking is enabled.

If pre_execute_func is not None, calls the function before processing all tasks. Should return nothing (None). Will also substitute any templates in post_execute_kwargs and pass them as keyword arguments. The following additional arguments are available in the context: the number of chunks n_chunks.

If post_execute_func is not None, calls the function after processing all tasks. Will also substitute any templates in post_execute_kwargs and pass them as keyword arguments. Should return either None to keep the default results or return the new ones. The following additional arguments are available in the context: the number of chunks n_chunks and the generated flattened list of results results. If post_execute_on_sorted is True, will run the callback after sorting the call indices.

Info

Chunks are processed sequentially, while functions within each chunk can be processed distributively.

For defaults, see execution.

Superclasses

Inherited members


build_serial_chunk class method

Executor.build_serial_chunk(
    tasks
)

Build a serial chunk.


cache_chunks property

Whether to cache chunks.


call_execute class method

Executor.call_execute(
    engine,
    tasks,
    size=None,
    keys=None
)

Call ExecutionEngine.execute().


call_post_chunk_func class method

Executor.call_post_chunk_func(
    chunk_idx,
    call_indices,
    call_results,
    cache_chunks=False,
    chunk_cache_dir=None,
    chunk_cache_save_kwargs=None,
    release_chunk_cache=False,
    chunk_clear_cache=False,
    chunk_collect_garbage=False,
    chunk_delay=None,
    post_chunk_func=None,
    post_chunk_kwargs=None,
    chunk_executed=True,
    template_context=None
)

Call Executor.post_chunk_func.


call_post_execute_func class method

Executor.call_post_execute_func(
    results,
    cache_chunks=False,
    chunk_cache_dir=None,
    chunk_cache_load_kwargs=None,
    post_clear_chunk_cache=True,
    release_chunk_cache=False,
    post_execute_func=None,
    post_execute_kwargs=None,
    template_context=None
)

Call Executor.post_execute_func.


call_pre_chunk_func class method

Executor.call_pre_chunk_func(
    chunk_idx,
    call_indices,
    cache_chunks=False,
    chunk_cache_dir=None,
    chunk_cache_load_kwargs=None,
    release_chunk_cache=False,
    pre_chunk_func=None,
    pre_chunk_kwargs=None,
    template_context=None
)

Call Executor.pre_chunk_func.


call_pre_execute_func class method

Executor.call_pre_execute_func(
    cache_chunks=False,
    chunk_cache_dir=None,
    pre_clear_chunk_cache=False,
    pre_execute_func=None,
    pre_execute_kwargs=None,
    template_context=None
)

Call Executor.pre_execute_func.


chunk_cache_dir property

Directory where to put chunk cache files.


chunk_cache_load_kwargs property

Keyword arguments passed to load() for chunk caching.


chunk_cache_save_kwargs property

Keyword arguments passed to save() for chunk caching.


chunk_clear_cache property

Whether to clear global cache after each chunk or every n chunks.


chunk_collect_garbage property

Whether to collect garbage after each chunk or every n chunks.


chunk_delay property

Number of seconds to sleep after each chunk.


chunk_len property

See yield_chunk_meta().


chunk_meta property

See yield_chunk_meta().


distribute property

Distribution mode.


engine property

Engine resolved with Executor.resolve_engine().


execute_serially static method

Executor.execute_serially(
    tasks,
    id_objs
)

Execute serially.


filter_results property

Whether to filter NoResult results.


get_engine_setting class method

Executor.get_engine_setting(
    *args,
    engine_name=None,
    **kwargs
)

HasSettings.get_setting() with sub_path=engine_name.


get_engine_settings class method

Executor.get_engine_settings(
    *args,
    engine_name=None,
    **kwargs
)

HasSettings.get_settings() with sub_path=engine_name.


has_engine_setting class method

Executor.has_engine_setting(
    *args,
    engine_name=None,
    **kwargs
)

HasSettings.has_setting() with sub_path=engine_name.


has_engine_settings class method

Executor.has_engine_settings(
    *args,
    engine_name=None,
    **kwargs
)

HasSettings.has_settings() with sub_path=engine_name.


in_chunk_order property

Whether to return the results in the order they appear in chunk_meta.

Otherwise, always returns them in the same order as in tasks.


merge_func property

Merging function.

Resolved using resolve_merge_func().


merge_kwargs property

Keyword arguments passed to the merging function.


merge_results class method

Executor.merge_results(
    results,
    keys=None,
    filter_results=False,
    raise_no_results=True,
    merge_func=None,
    merge_kwargs=None,
    template_context=None
)

Merge results using Executor.merge_func and Executor.merge_kwargs.


min_size property

See yield_chunk_meta().


n_chunks property

See yield_chunk_meta().


pbar_kwargs property

Keyword arguments passed to ProgressBar.


post_chunk_func property

Function to call after processing the chunk.

It should return either None to keep the old call results, or return new ones.


post_chunk_kwargs property

Keyword arguments passed to Executor.post_chunk_func.


post_clear_chunk_cache property

Whether to remove the chunk cache directory after execution.


post_execute_func property

Function to call after processing all tasks.

Should return either None to keep the default results, or return the new ones.


post_execute_kwargs property

Keyword arguments passed to Executor.post_execute_func.


post_execute_on_sorted property

Whether to run Executor.post_execute_func after sorting the call indices.


pre_chunk_func property

Function to call before processing a chunk.

If it returns anything other than None, the returned object will be appended to the results and the chunk won't be executed. This enables use cases such as caching.


pre_chunk_kwargs property

Keyword arguments passed to Executor.pre_chunk_func.


pre_clear_chunk_cache property

Whether to remove the chunk cache directory before execution.


pre_execute_func property

Function to call before processing all tasks.


pre_execute_kwargs property

Keyword arguments passed to Executor.pre_execute_func.


raise_no_results property

Whether to raise NoResultsException if there are no results. Otherwise, returns NoResult.

Has effect only if Executor.filter_results is True. But regardless of this setting, gets passed to the merging function if the merging function is pre-configured.


release_chunk_cache property

Whether to replace chunk cache with dummy objects once the chunk has been executed and then load all cache at once after all chunks have been executed.


resolve_engine class method

Executor.resolve_engine(
    engine,
    show_progress=None,
    pbar_kwargs=None,
    **engine_config
)

Resolve engine and its name in settings.


resolve_engine_setting class method

Executor.resolve_engine_setting(
    *args,
    engine_name=None,
    **kwargs
)

HasSettings.resolve_setting() with sub_path=engine_name.


run method

Executor.run(
    tasks,
    size=None,
    keys=None
)

Execute functions and their arguments.


set_engine_settings class method

Executor.set_engine_settings(
    *args,
    engine_name=None,
    **kwargs
)

HasSettings.set_settings() with sub_path=engine_name.


show_progress property

Whether to show progress bar when iterating over chunks.

If Executor.engine accepts show_progress and there's no key show_progress in Executor.engine_config, then passes it to the engine as well.


template_context property

Context used to substitute templates.


warmup property

Whether to call the first item of tasks once before distribution.


MpireEngine class

MpireEngine(
    init_kwargs=None,
    apply_kwargs=None,
    **kwargs
)

Class for executing functions using WorkerPool from mpire.

For defaults, see engines.mpire in execution.

Superclasses

Inherited members


apply_kwargs property

Keyword arguments passed to WorkerPool.async_apply.


init_kwargs property

Keyword arguments used to initialize WorkerPool.


NoResultsException class

NoResultsException(
    *args,
    **kwargs
)

Gets raised when there are no results.

Superclasses

  • builtins.BaseException
  • builtins.Exception

PathosEngine class

PathosEngine(
    pool_type=None,
    init_kwargs=None,
    timeout=None,
    check_delay=None,
    show_progress=None,
    pbar_kwargs=None,
    join_pool=None,
    **kwargs
)

Class for executing functions using pathos.

For defaults, see engines.pathos in execution.

Superclasses

Inherited members


check_delay property

Number of seconds to sleep between checks.


init_kwargs property

Keyword arguments used to initialize the pool.


join_pool property

Whether to join the pool.


pbar_kwargs property

Keyword arguments passed to ProgressBar.


pool_type property

Pool type.


show_progress property

Whether to show the progress bar.


timeout property

Timeout.


ProcessPoolEngine class

ProcessPoolEngine(
    init_kwargs=None,
    timeout=None,
    **kwargs
)

Class for executing functions using ProcessPoolExecutor from concurrent.futures.

For defaults, see engines.processpool in execution.

Superclasses

Inherited members


init_kwargs property

Keyword arguments used to initialize ProcessPoolExecutor.


timeout property

Timeout.


RayEngine class

RayEngine(
    restart=None,
    reuse_refs=None,
    del_refs=None,
    shutdown=None,
    init_kwargs=None,
    remote_kwargs=None,
    **kwargs
)

Class for executing functions in parallel using Ray.

For defaults, see engines.ray in execution.

Note

Ray spawns multiple processes as opposed to threads, so any argument and keyword argument must first be put into an object store to be shared. Make sure that the computation with func takes a considerable amount of time compared to this copying operation, otherwise there will be a little to no speedup.

Superclasses

Inherited members


del_refs property

Whether to explicitly delete the result object references.


get_ray_refs class method

RayEngine.get_ray_refs(
    tasks,
    reuse_refs=True,
    remote_kwargs=None
)

Get result references by putting each argument and keyword argument into the object store and invoking the remote decorator on each function using Ray.

If reuse_refs is True, will generate one reference per unique object id.


init_kwargs property

Keyword arguments passed to ray.init.


remote_kwargs property

Keyword arguments passed to ray.remote.


restart property

Whether to terminate the Ray runtime and initialize a new one.


reuse_refs property

Whether to re-use function and object references, such that each unique object will be copied only once.


shutdown property

Whether to True to terminate the Ray runtime upon the job end.


SerialEngine class

SerialEngine(
    show_progress=None,
    pbar_kwargs=None,
    clear_cache=None,
    collect_garbage=None,
    delay=None,
    **kwargs
)

Class for executing functions sequentially.

For defaults, see engines.serial in execution.

Superclasses

Inherited members


clear_cache property

Whether to clear vectorbt's cache after each iteration.

If integer, do it once a number of tasks.


collect_garbage property

Whether to clear garbage after each iteration.

If integer, do it once a number of tasks.


delay property

Number of seconds to sleep after each call.


pbar_kwargs property

Keyword arguments passed to ProgressBar.


show_progress property

Whether to show the progress bar.


Task class

Task(
    func,
    *args,
    **kwargs
)

Class that represents an executable task.

Superclasses

Inherited members


args class variable

Positional arguments.


execute method

Task.execute()

Execute the task.


func class variable

Function.


kwargs class variable

Keyword arguments.


ThreadPoolEngine class

ThreadPoolEngine(
    init_kwargs=None,
    timeout=None,
    **kwargs
)

Class for executing functions using ThreadPoolExecutor from concurrent.futures.

For defaults, see engines.threadpool in execution.

Superclasses

Inherited members


init_kwargs property

Keyword arguments used to initialize ThreadPoolExecutor.


timeout property

Timeout.