API reference¶
.. py:module:: asynclit
asynclit: background tasks for rerun-driven UIs.
asynclit provides a small polling-friendly API for running sync/async callables on a dedicated background asyncio event loop. It is designed for Streamlit-style apps where the main script reruns frequently and must not block.
.. py:class:: RetryPolicy(max_attempts=1, retry_on=(<class ‘Exception’>,), base_delay=0.0, max_delay=60.0, multiplier=2.0, jitter=0.0, max_elapsed=None, retry_if=None) :module: asynclit :canonical: asynclit.retry.RetryPolicy
Bases: :py:class:object
Retry configuration for exception-based retries.
This policy is used by asynclit.run(..., retry=...) and
TaskManager.submit(..., retry=...).
Notes:
Retries are exception-based only (no result predicates).
Jitter is optional; set
jitter=0.0for deterministic tests.
.. py:attribute:: RetryPolicy.base_delay :module: asynclit :type: float :value: 0.0
.. py:method:: RetryPolicy.delay_for_attempt(attempt_index) :module: asynclit
Compute the delay before the next retry attempt.
:type attempt_index: :sphinx_autodoc_typehints_type:`\:py\:class\:\`int\``
:param attempt_index: 0 for a retry after the first failure, 1 after the second, etc.
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`float\``
:returns: Delay in seconds (>= 0), including optional jitter and max delay cap.
.. py:method:: RetryPolicy.exceeded_elapsed(started_at) :module: asynclit
Whether the max elapsed time budget has been exceeded.
:type started_at: :sphinx_autodoc_typehints_type:`\:py\:class\:\`float\``
:param started_at: Start time returned by `start_time()`.
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`bool\``
.. py:attribute:: RetryPolicy.jitter :module: asynclit :type: float :value: 0.0
.. py:attribute:: RetryPolicy.max_attempts :module: asynclit :type: int :value: 1
.. py:attribute:: RetryPolicy.max_delay :module: asynclit :type: float :value: 60.0
.. py:attribute:: RetryPolicy.max_elapsed :module: asynclit :type: float | None :value: None
.. py:attribute:: RetryPolicy.multiplier :module: asynclit :type: float :value: 2.0
.. py:attribute:: RetryPolicy.retry_if :module: asynclit :type: ~typing.Callable[[BaseException], bool] | None :value: None
.. py:attribute:: RetryPolicy.retry_on :module: asynclit :type: ~typing.Tuple[~typing.Type[BaseException], …] :value: (<class ‘Exception’>,)
.. py:method:: RetryPolicy.should_retry(exc) :module: asynclit
Decide whether the given exception is eligible for retry.
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`bool\``
:returns: `True` if `exc` matches `retry_on` and `retry_if` (if provided).
.. py:method:: RetryPolicy.start_time() :module: asynclit
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`float\``
.. py:class:: ScheduledTask(job_id, latest_task_key=None) :module: asynclit :canonical: asynclit.scheduler.ScheduledTask
Bases: :py:class:object
Metadata returned by scheduling helpers.
.. attribute:: job_id
APScheduler job id.
.. attribute:: latest_task_key
Optional manager alias key (`global:{name}`) when `latest_task_name` is used.
.. py:attribute:: ScheduledTask.job_id :module: asynclit :type: str
.. py:attribute:: ScheduledTask.latest_task_key :module: asynclit :type: str | None :value: None
.. py:exception:: SchedulerUnavailable :module: asynclit :canonical: asynclit.scheduler.SchedulerUnavailable
Bases: :py:class:RuntimeError
Raised when scheduling helpers are used without APScheduler installed.
.. py:class:: Task(task_id) :module: asynclit :canonical: asynclit.task.Task
Bases: :py:class:~typing.Generic\ [:py:obj:~asynclit.task.T]
A poll-friendly handle for work running on the asynclit worker loop.
Task is designed for synchronous, rerun-driven UIs (e.g. Streamlit):
submit work, keep the handle somewhere stable (session state), and on each
rerun poll done / status and read result when ready.
Notes:
Results are bridged via a
concurrent.futures.Future, so polling does not require an event loop on the caller thread.Progress is available only when the submitted callable is async and declares a
queueorprogress_queueparameter (seeasynclit.run).
.. py:method:: Task.cancel() :module: asynclit
Request cancellation.
Behavior:
- If the task is already terminal (`DONE`, `ERROR`, `CANCELLED`), returns `False`.
- If the underlying asyncio task is running, schedules `asyncio.Task.cancel()`
on the worker loop and returns `True`.
- If the task is still pending (not yet bound on the worker), cancels the
result future and returns whether it was cancelled.
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`bool\``
:returns: `True` if a cancellation request was scheduled/performed, otherwise `False`.
.. py:property:: Task.done :module: asynclit :type: bool
Whether the task has completed (success, error, or cancellation).
.. py:property:: Task.error :module: asynclit :type: BaseException | None
The captured exception if `status` is `ERROR`, otherwise `None`.
.. py:property:: Task.progress :module: asynclit :type: ~typing.List[~typing.Any]
Drain progress values (non-blocking).
This returns any values currently available on the sync side of the Janus
queue, plus any tail buffered when the queue was closed (so late polls can
still observe final progress).
:returns: A list of values in FIFO order. Empty when no progress is available.
.. py:property:: Task.result :module: asynclit :type: ~asynclit.task.T
Return the result value.
:raises RuntimeError: If the task is not complete yet.
:raises BaseException: Re-raises the underlying exception if the task failed.
:raises concurrent.futures.CancelledError: If the task was cancelled.
.. py:property:: Task.status :module: asynclit :type: ~asynclit.task.TaskStatus
Current lifecycle status.
.. py:class:: TaskManager(*, max_completed=256) :module: asynclit :canonical: asynclit.manager.TaskManager
Bases: :py:class:object
Submits work to the asynclit worker, tracks tasks, and trims completed entries.
The manager acts as an in-memory registry keyed by task id (and optional
global:{name} aliases). For rerun-driven UIs, keep a manager around when
you create many tasks over time and periodically call cleanup().
.. py:method:: TaskManager.cleanup() :module: asynclit
Remove oldest completed tasks if the registry exceeds `max_completed`.
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`int\``
:returns: Number of removed entries.
.. py:method:: TaskManager.get(task_id) :module: asynclit
Get a task by id, or `None` if missing.
:rtype: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Optional\`\\ \\\[\:py\:class\:\`\~asynclit.task.Task\`\\ \\\[\:py\:data\:\`\~typing.Any\`\]\]`
.. py:method:: TaskManager.register_global(task, name) :module: asynclit
Alias a task for shared lookup.
After registering, `get(f"global:{name}")` returns the task.
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``
.. py:method:: TaskManager.submit(func, /, *args, retry=None, **kwargs) :module: asynclit
Submit `func` to the asynclit worker.
:type func: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Callable\`\\ \\\[\:py\:data\:\`...\<Ellipsis\>\`\, \:py\:class\:\`\~typing.TypeVar\`\\ \\\(\`\`T\`\`\)\]`
:param func: Sync or async callable to execute.
:type \*args: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Any\``
:param \*args: Positional arguments passed to `func` (after progress queue injection, if any).
:type retry: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Optional\`\\ \\\[\:py\:class\:\`\~asynclit.retry.RetryPolicy\`\]`
:param retry: Optional `RetryPolicy` for exception-based retries.
:type \*\*kwargs: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Any\``
:param \*\*kwargs: Keyword arguments passed to `func`.
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`\~asynclit.task.Task\`\\ \\\[\:py\:class\:\`\~typing.TypeVar\`\\ \\\(\`\`T\`\`\)\]`
:returns: A `Task[T]` handle that can be polled from the caller thread.
.. py:class:: TaskStatus(value) :module: asynclit :canonical: asynclit.task.TaskStatus
Bases: :py:class:~enum.Enum
.. py:attribute:: TaskStatus.CANCELLED :module: asynclit :value: ‘cancelled’
.. py:attribute:: TaskStatus.DONE :module: asynclit :value: ‘done’
.. py:attribute:: TaskStatus.ERROR :module: asynclit :value: ‘error’
.. py:attribute:: TaskStatus.PENDING :module: asynclit :value: ‘pending’
.. py:attribute:: TaskStatus.RUNNING :module: asynclit :value: ‘running’
.. py:function:: get_default_manager() :module: asynclit
Return the process-wide default TaskManager (lazy singleton).
:rtype: :sphinx_autodoc_typehints_type:\:py\:class\:\~asynclit.manager.TaskManager``
.. py:function:: get_default_scheduler() :module: asynclit
Return a process-wide AsyncIOScheduler bound to the asynclit worker loop.
Requires installing the optional extra: asynclit[scheduler].
:rtype: :sphinx_autodoc_typehints_type:\_AsyncIOScheduler
.. py:function:: run(func, /, *args, manager=None, retry=None, **kwargs) :module: asynclit
Run func on the asynclit worker thread and return a pollable Task.
Async callables run on the dedicated worker event loop. Sync callables run via
asyncer.asyncify (thread pool). If the async callable declares a progress_queue
or queue parameter, a janus.Queue is created and injected to stream progress
values back to the caller thread (drain via Task.progress).
:type func: :sphinx_autodoc_typehints_type:\:py\:data\:\~typing.Callable`\ \[:py:data:`…<Ellipsis>`, :py:class:`~typing.TypeVar`\ \(``T``)] :param func: Sync or async callable. :type \*args: :sphinx_autodoc_typehints_type::py:data:`~typing.Any` :param \*args: Positional args forfunc (after progress queue injection, if any). :type manager: :sphinx_autodoc_typehints_type::py:data:`~typing.Optional`\ \[:py:class:`~asynclit.manager.TaskManager`] :param manager: OptionalTaskManager to submit into (defaults to the process-wide manager). :type retry: :sphinx_autodoc_typehints_type::py:data:`~typing.Optional`\ \[:py:class:`~asynclit.retry.RetryPolicy`] :param retry: OptionalRetryPolicy for exception-based retries. :type \*\*kwargs: :sphinx_autodoc_typehints_type::py:data:`~typing.Any` :param \*\*kwargs: Keyword args forfunc`.
:rtype: :sphinx_autodoc_typehints_type:\:py\:class\:\~asynclit.task.Task`\ \[:py:class:`~typing.TypeVar`\ \(``T``)] :returns: ATask[T]` handle.
.. py:function:: schedule_cron(func, *, cron, args=(), kwargs=None, manager=None, scheduler=None, job_id=None, replace_existing=True, latest_task_name=None) :module: asynclit
Schedule periodic execution of func using a cron expression.
The cron expression uses APScheduler’s CronTrigger.from_crontab format.
:rtype: :sphinx_autodoc_typehints_type:ScheduledTask
.. py:function:: schedule_interval(func, *, seconds, args=(), kwargs=None, manager=None, scheduler=None, job_id=None, replace_existing=True, latest_task_name=None) :module: asynclit
Schedule periodic execution of func using an interval trigger.
Each tick submits an asynclit Task via the provided manager.
If latest_task_name is set, the latest Task is stored as a manager global alias
under key global:{latest_task_name}.
:rtype: :sphinx_autodoc_typehints_type:ScheduledTask
.. py:function:: session_tasks(session_state, key=’asynclit_tasks’) :module: asynclit
Return a task registry dict stored on session_state[key].
This is a convenience for Streamlit apps that want a stable dict for named tasks.
:type session_state: :sphinx_autodoc_typehints_type:\:py\:data\:\~typing.Any` :param session_state:st.session_state or a compatible mapping-like object. :type key: :sphinx_autodoc_typehints_type::py:class:`str``
:param key: Storage key used within the session state.
:rtype: :sphinx_autodoc_typehints_type:\:py\:class\:\~typing.Dict`\ \[:py:class:`str`, :py:data:`~typing.Any`]`
:returns: A mutable dict that persists across reruns.
.. py:function:: shutdown_scheduler(scheduler=None, *, wait=False) :module: asynclit
Shutdown the scheduler (on the worker loop thread).
:type scheduler: :sphinx_autodoc_typehints_type:Optional\[\'\_AsyncIOScheduler\'\]
:param scheduler: Scheduler instance to shutdown (defaults to the singleton if created).
:type wait: :sphinx_autodoc_typehints_type:bool
:param wait: Whether to wait for running jobs (passed to APScheduler).
:rtype: :sphinx_autodoc_typehints_type:None
.. py:function:: start_scheduler(scheduler=None) :module: asynclit
Start the scheduler (on the worker loop thread) and return it.
:type scheduler: :sphinx_autodoc_typehints_type:Optional\[\'\_AsyncIOScheduler\'\]
:param scheduler: Scheduler instance to start (defaults to the singleton).
:rtype: :sphinx_autodoc_typehints_type:\_AsyncIOScheduler