Specification¶
This is the package specification (design + behavior notes).
asynclit — package specification¶
1. Overview¶
asynclit is an async task layer aimed at Streamlit (and similar environments with a synchronous UI thread and rerun-driven updates). It provides:
Background execution of sync and async callables
Non-blocking UI updates via polling (
task.done,task.result, …)Progress streaming through a Janus queue
Task lifecycle management: status, result, error, cancel
Optional hooks for session-scoped task registries and scheduler integration
The library does not replace Streamlit’s execution model; it complements it by moving work off the main thread while keeping the UI pattern simple.
2. Goals¶
Preserve Streamlit’s rerun model (no mandatory long-lived blocking calls on the script thread).
Provide a small, obvious API for offloading work.
Support sync and async functions without forcing callers to manage an event loop.
Avoid requiring users to understand asyncio beyond optional progress/cancel semantics.
3. Core architecture¶
3.1 Async worker¶
A dedicated daemon thread hosts a single asyncio event loop for the process (lazy-started on first use).
Work is submitted with
asyncio.run_coroutine_threadsafefrom other threads.
3.2 Task manager¶
TaskManagerregisters tasks by id, exposessubmit(),get(), andcleanup()(trim oldest completed entries when a configurable cap is exceeded).asynclit.run(..., manager=None)uses a process-wide default manager fromget_default_manager()whenmanageris omitted.register_global(task, name)stores an alias keyed asglobal:{name}for shared lookup viaget().
3.3 Janus queue (progress)¶
When an async callable opts in (see §8), a
janus.Queueis created on the worker loop.The worker pushes progress on
queue.async_q; the UI thread reads by drainingtask.progress(sync side + any tail buffered when the queue is closed).
3.4 Scheduler (optional)¶
APScheduler is an optional dependency (
asynclit[scheduler]).asynclit provides scheduling helpers that create an
AsyncIOSchedulerbound to the worker loop and schedule jobs that submit asynclit tasks.
4. Dependencies¶
Component |
Role |
|---|---|
asyncer |
|
janus |
Sync/async queue for progress streaming. |
asyncio |
Worker event loop (stdlib). |
concurrent.futures |
Result bridge for cross-thread polling ( |
APScheduler |
Optional; not required at runtime for core features. |
Streamlit |
Optional; only needed for apps using |
5. Public API¶
5.1 Run task¶
task = asynclit.run(func, /, *args, manager=None, **kwargs)
manager: optionalTaskManager; must not collide with keyword arguments intended forfunc(reserved keyword forrunonly).
5.2 Task surface¶
task.id
task.done
task.result # raises if not complete
task.status # TaskStatus enum
task.error # set when status is ERROR
task.cancel() # returns bool
task.progress # list: non-blocking drain of pending progress values
5.3 Session and managers¶
asynclit.session_tasks(session_state, key="asynclit_tasks")
asynclit.get_default_manager()
asynclit.TaskManager(max_completed=256)
6. Task lifecycle¶
TaskStatus: PENDING → RUNNING → DONE | ERROR | CANCELLED
State |
Meaning |
|---|---|
PENDING |
Submitted; worker coroutine may not have bound the asyncio task yet. |
RUNNING |
Worker asyncio task is bound; callable is executing. |
DONE |
Finished successfully; result available (subject to |
ERROR |
Exception captured; exposed via |
CANCELLED |
User cancellation or cancelled future before completion. |
7. Internal design (conceptual)¶
7.1 Task object¶
id: str— unique task identifier.Result bridge —
concurrent.futures.Futurefor cross-thread completion.Progress — optional Janus queue handle while running; internal tail buffer for values drained at close so late polls still observe progress.
Status —
TaskStatus.Worker reference — bound
asyncio.Taskand loop for cancellation.
7.2 Task manager¶
submit(func, /, *args, **kwargs)→Taskget(task_id)→Task | Nonecleanup()→ count removed (completed-task cap)
8. Progress streaming¶
Progress is supported only for async callables that declare a parameter named queue or progress_queue.
If that parameter is the first parameter, the injected queue is passed positionally and remaining
run(...)positional arguments map to the rest of the signature.Otherwise the queue is passed as the corresponding keyword argument.
Worker side:
async def job(queue, steps: int) -> int:
for i in range(steps):
await queue.async_q.put(i)
return steps
UI side: read task.progress on each rerun (or poll in a loop); values are drained from the sync side of the Janus queue.
9. Session handling¶
Callers store
Taskreferences inst.session_state(or any mutable mapping).session_tasks(session_state, key=...)returns a dict stored atsession_state[key]for named tasks.TaskManager.register_global/get("global:{name}")support optional shared aliases inside a manager.
9.1 Rerun-driven UIs and polling¶
In Streamlit-style rerun models, the script run that submits the task will usually finish before the background work completes. The intended usage is to:
Store the
Task(for example in session state)On subsequent reruns, check
task.done/task.statusRead
task.resultonly once it is complete
10. Error handling¶
Exceptions from the user callable are stored on the task and
Future.set_exception.task.errorholds the exception whentask.status == TaskStatus.ERROR.Accessing
task.resultwhen complete but failed re-raises the stored exception.
10.1 Retries¶
Retries are opt-in and exception-based.
A
RetryPolicycan be provided per submission (for example viaasynclit.run(..., retry=...)).Only raised exceptions are eligible for retry; returned values are not inspected.
Cancellation stops any ongoing attempt and prevents further retries.
11. Cancellation¶
Pending (not yet bound to a worker asyncio task):
concurrent.futures.Future.cancel()and statusCANCELLED.Running:
asyncio.Task.cancel()scheduled on the worker loop; completion ends inCANCELLEDwith the result future cancelled if still pending.
12. Performance considerations¶
Single global worker event loop per process (simple, predictable).
Thread-safe manager and task state where required.
Bounded registry:
cleanup()/ automatic trim viamax_completedto limit memory from finished tasks.
13. Example usage¶
import streamlit as st
import asynclit
task = asynclit.run(fetch_data)
if task.done:
st.write(task.result)
else:
st.write("Loading…")