API reference

.. py:module:: asynclit

asynclit: background tasks for rerun-driven UIs.

asynclit provides a small polling-friendly API for running sync/async callables on a dedicated background asyncio event loop. It is designed for Streamlit-style apps where the main script reruns frequently and must not block.

.. py:class:: RetryPolicy(max_attempts=1, retry_on=(<class ‘Exception’>,), base_delay=0.0, max_delay=60.0, multiplier=2.0, jitter=0.0, max_elapsed=None, retry_if=None) :module: asynclit :canonical: asynclit.retry.RetryPolicy

Bases: :py:class:object

Retry configuration for exception-based retries.

This policy is used by asynclit.run(..., retry=...) and TaskManager.submit(..., retry=...).

Notes:

  • Retries are exception-based only (no result predicates).

  • Jitter is optional; set jitter=0.0 for deterministic tests.

.. py:attribute:: RetryPolicy.base_delay :module: asynclit :type: float :value: 0.0

.. py:method:: RetryPolicy.delay_for_attempt(attempt_index) :module: asynclit

  Compute the delay before the next retry attempt.

  :type attempt_index: :sphinx_autodoc_typehints_type:`\:py\:class\:\`int\``
  :param attempt_index: 0 for a retry after the first failure, 1 after the second, etc.

  :rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`float\``
  :returns: Delay in seconds (>= 0), including optional jitter and max delay cap.

.. py:method:: RetryPolicy.exceeded_elapsed(started_at) :module: asynclit

  Whether the max elapsed time budget has been exceeded.

  :type started_at: :sphinx_autodoc_typehints_type:`\:py\:class\:\`float\``
  :param started_at: Start time returned by `start_time()`.

  :rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`bool\``

.. py:attribute:: RetryPolicy.jitter :module: asynclit :type: float :value: 0.0

.. py:attribute:: RetryPolicy.max_attempts :module: asynclit :type: int :value: 1

.. py:attribute:: RetryPolicy.max_delay :module: asynclit :type: float :value: 60.0

.. py:attribute:: RetryPolicy.max_elapsed :module: asynclit :type: float | None :value: None

.. py:attribute:: RetryPolicy.multiplier :module: asynclit :type: float :value: 2.0

.. py:attribute:: RetryPolicy.retry_if :module: asynclit :type: ~typing.Callable[[BaseException], bool] | None :value: None

.. py:attribute:: RetryPolicy.retry_on :module: asynclit :type: ~typing.Tuple[~typing.Type[BaseException], …] :value: (<class ‘Exception’>,)

.. py:method:: RetryPolicy.should_retry(exc) :module: asynclit

  Decide whether the given exception is eligible for retry.

  :rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`bool\``
  :returns: `True` if `exc` matches `retry_on` and `retry_if` (if provided).

.. py:method:: RetryPolicy.start_time() :module: asynclit

  :rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`float\``

.. py:class:: ScheduledTask(job_id, latest_task_key=None) :module: asynclit :canonical: asynclit.scheduler.ScheduledTask

Bases: :py:class:object

Metadata returned by scheduling helpers.

.. attribute:: job_id

  APScheduler job id.

.. attribute:: latest_task_key

  Optional manager alias key (`global:{name}`) when `latest_task_name` is used.

.. py:attribute:: ScheduledTask.job_id :module: asynclit :type: str

.. py:attribute:: ScheduledTask.latest_task_key :module: asynclit :type: str | None :value: None

.. py:exception:: SchedulerUnavailable :module: asynclit :canonical: asynclit.scheduler.SchedulerUnavailable

Bases: :py:class:RuntimeError

Raised when scheduling helpers are used without APScheduler installed.

.. py:class:: Task(task_id) :module: asynclit :canonical: asynclit.task.Task

Bases: :py:class:~typing.Generic\ [:py:obj:~asynclit.task.T]

A poll-friendly handle for work running on the asynclit worker loop.

Task is designed for synchronous, rerun-driven UIs (e.g. Streamlit): submit work, keep the handle somewhere stable (session state), and on each rerun poll done / status and read result when ready.

Notes:

  • Results are bridged via a concurrent.futures.Future, so polling does not require an event loop on the caller thread.

  • Progress is available only when the submitted callable is async and declares a queue or progress_queue parameter (see asynclit.run).

.. py:method:: Task.cancel() :module: asynclit

  Request cancellation.

  Behavior:
  - If the task is already terminal (`DONE`, `ERROR`, `CANCELLED`), returns `False`.
  - If the underlying asyncio task is running, schedules `asyncio.Task.cancel()`
    on the worker loop and returns `True`.
  - If the task is still pending (not yet bound on the worker), cancels the
    result future and returns whether it was cancelled.

  :rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`bool\``
  :returns: `True` if a cancellation request was scheduled/performed, otherwise `False`.

.. py:property:: Task.done :module: asynclit :type: bool

  Whether the task has completed (success, error, or cancellation).

.. py:property:: Task.error :module: asynclit :type: BaseException | None

  The captured exception if `status` is `ERROR`, otherwise `None`.

.. py:property:: Task.progress :module: asynclit :type: ~typing.List[~typing.Any]

  Drain progress values (non-blocking).

  This returns any values currently available on the sync side of the Janus
  queue, plus any tail buffered when the queue was closed (so late polls can
  still observe final progress).

  :returns: A list of values in FIFO order. Empty when no progress is available.

.. py:property:: Task.result :module: asynclit :type: ~asynclit.task.T

  Return the result value.

  :raises RuntimeError: If the task is not complete yet.
  :raises BaseException: Re-raises the underlying exception if the task failed.
  :raises concurrent.futures.CancelledError: If the task was cancelled.

.. py:property:: Task.status :module: asynclit :type: ~asynclit.task.TaskStatus

  Current lifecycle status.

.. py:class:: TaskManager(*, max_completed=256) :module: asynclit :canonical: asynclit.manager.TaskManager

Bases: :py:class:object

Submits work to the asynclit worker, tracks tasks, and trims completed entries.

The manager acts as an in-memory registry keyed by task id (and optional global:{name} aliases). For rerun-driven UIs, keep a manager around when you create many tasks over time and periodically call cleanup().

.. py:method:: TaskManager.cleanup() :module: asynclit

  Remove oldest completed tasks if the registry exceeds `max_completed`.

  :rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`int\``
  :returns: Number of removed entries.

.. py:method:: TaskManager.get(task_id) :module: asynclit

  Get a task by id, or `None` if missing.


  :rtype: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Optional\`\\ \\\[\:py\:class\:\`\~asynclit.task.Task\`\\ \\\[\:py\:data\:\`\~typing.Any\`\]\]`

.. py:method:: TaskManager.register_global(task, name) :module: asynclit

  Alias a task for shared lookup.

  After registering, `get(f"global:{name}")` returns the task.


  :rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``

.. py:method:: TaskManager.submit(func, /, *args, retry=None, **kwargs) :module: asynclit

  Submit `func` to the asynclit worker.

  :type func: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Callable\`\\ \\\[\:py\:data\:\`...\<Ellipsis\>\`\, \:py\:class\:\`\~typing.TypeVar\`\\ \\\(\`\`T\`\`\)\]`
  :param func: Sync or async callable to execute.
  :type \*args: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Any\``
  :param \*args: Positional arguments passed to `func` (after progress queue injection, if any).
  :type retry: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Optional\`\\ \\\[\:py\:class\:\`\~asynclit.retry.RetryPolicy\`\]`
  :param retry: Optional `RetryPolicy` for exception-based retries.
  :type \*\*kwargs: :sphinx_autodoc_typehints_type:`\:py\:data\:\`\~typing.Any\``
  :param \*\*kwargs: Keyword arguments passed to `func`.

  :rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`\~asynclit.task.Task\`\\ \\\[\:py\:class\:\`\~typing.TypeVar\`\\ \\\(\`\`T\`\`\)\]`
  :returns: A `Task[T]` handle that can be polled from the caller thread.

.. py:class:: TaskStatus(value) :module: asynclit :canonical: asynclit.task.TaskStatus

Bases: :py:class:~enum.Enum

.. py:attribute:: TaskStatus.CANCELLED :module: asynclit :value: ‘cancelled’

.. py:attribute:: TaskStatus.DONE :module: asynclit :value: ‘done’

.. py:attribute:: TaskStatus.ERROR :module: asynclit :value: ‘error’

.. py:attribute:: TaskStatus.PENDING :module: asynclit :value: ‘pending’

.. py:attribute:: TaskStatus.RUNNING :module: asynclit :value: ‘running’

.. py:function:: get_default_manager() :module: asynclit

Return the process-wide default TaskManager (lazy singleton).

:rtype: :sphinx_autodoc_typehints_type:\:py\:class\:\~asynclit.manager.TaskManager``

.. py:function:: get_default_scheduler() :module: asynclit

Return a process-wide AsyncIOScheduler bound to the asynclit worker loop.

Requires installing the optional extra: asynclit[scheduler].

:rtype: :sphinx_autodoc_typehints_type:\_AsyncIOScheduler

.. py:function:: run(func, /, *args, manager=None, retry=None, **kwargs) :module: asynclit

Run func on the asynclit worker thread and return a pollable Task.

Async callables run on the dedicated worker event loop. Sync callables run via asyncer.asyncify (thread pool). If the async callable declares a progress_queue or queue parameter, a janus.Queue is created and injected to stream progress values back to the caller thread (drain via Task.progress).

:type func: :sphinx_autodoc_typehints_type:\:py\:data\:\~typing.Callable`\ \[:py:data:`…<Ellipsis>`, :py:class:`~typing.TypeVar`\ \(``T``)]    :param func: Sync or async callable.    :type \*args: :sphinx_autodoc_typehints_type::py:data:`~typing.Any`   :param \*args: Positional args forfunc (after progress queue injection, if any).    :type manager: :sphinx_autodoc_typehints_type::py:data:`~typing.Optional`\ \[:py:class:`~asynclit.manager.TaskManager`]   :param manager: OptionalTaskManager to submit into (defaults to the process-wide manager).    :type retry: :sphinx_autodoc_typehints_type::py:data:`~typing.Optional`\ \[:py:class:`~asynclit.retry.RetryPolicy`]   :param retry: OptionalRetryPolicy for exception-based retries.    :type \*\*kwargs: :sphinx_autodoc_typehints_type::py:data:`~typing.Any`   :param \*\*kwargs: Keyword args forfunc`.

:rtype: :sphinx_autodoc_typehints_type:\:py\:class\:\~asynclit.task.Task`\ \[:py:class:`~typing.TypeVar`\ \(``T``)]   :returns: ATask[T]` handle.

.. py:function:: schedule_cron(func, *, cron, args=(), kwargs=None, manager=None, scheduler=None, job_id=None, replace_existing=True, latest_task_name=None) :module: asynclit

Schedule periodic execution of func using a cron expression.

The cron expression uses APScheduler’s CronTrigger.from_crontab format.

:rtype: :sphinx_autodoc_typehints_type:ScheduledTask

.. py:function:: schedule_interval(func, *, seconds, args=(), kwargs=None, manager=None, scheduler=None, job_id=None, replace_existing=True, latest_task_name=None) :module: asynclit

Schedule periodic execution of func using an interval trigger.

Each tick submits an asynclit Task via the provided manager.

If latest_task_name is set, the latest Task is stored as a manager global alias under key global:{latest_task_name}.

:rtype: :sphinx_autodoc_typehints_type:ScheduledTask

.. py:function:: session_tasks(session_state, key=’asynclit_tasks’) :module: asynclit

Return a task registry dict stored on session_state[key].

This is a convenience for Streamlit apps that want a stable dict for named tasks.

:type session_state: :sphinx_autodoc_typehints_type:\:py\:data\:\~typing.Any`   :param session_state:st.session_state or a compatible mapping-like object.    :type key: :sphinx_autodoc_typehints_type::py:class:`str`` :param key: Storage key used within the session state.

:rtype: :sphinx_autodoc_typehints_type:\:py\:class\:\~typing.Dict`\ \[:py:class:`str`, :py:data:`~typing.Any`]` :returns: A mutable dict that persists across reruns.

.. py:function:: shutdown_scheduler(scheduler=None, *, wait=False) :module: asynclit

Shutdown the scheduler (on the worker loop thread).

:type scheduler: :sphinx_autodoc_typehints_type:Optional\[\'\_AsyncIOScheduler\'\] :param scheduler: Scheduler instance to shutdown (defaults to the singleton if created). :type wait: :sphinx_autodoc_typehints_type:bool :param wait: Whether to wait for running jobs (passed to APScheduler).

:rtype: :sphinx_autodoc_typehints_type:None

.. py:function:: start_scheduler(scheduler=None) :module: asynclit

Start the scheduler (on the worker loop thread) and return it.

:type scheduler: :sphinx_autodoc_typehints_type:Optional\[\'\_AsyncIOScheduler\'\] :param scheduler: Scheduler instance to start (defaults to the singleton).

:rtype: :sphinx_autodoc_typehints_type:\_AsyncIOScheduler