betty.asyncio module

Provide asynchronous programming utilities.

async betty.asyncio.gather(*coroutines: Coroutine[Any, None, betty.asyncio.T]) tuple[betty.asyncio.T, ...][source]

Gather multiple coroutines.

This is like Python’s own asyncio.gather, but with improved error handling.

Parameters:

coroutines (typing.Coroutine[typing.Any, None, typing.TypeVar(T)])

Return type:

tuple[typing.TypeVar(T), ...]

betty.asyncio.sync(f: Callable[[P], Awaitable[betty.asyncio.T]]) Callable[[P], betty.asyncio.T][source]

Decorate an asynchronous callable to become synchronous.

Parameters:

f (typing.Callable[[typing.ParamSpec(P, bound= None)], typing.Awaitable[typing.TypeVar(T)]])

Return type:

typing.Callable[[typing.ParamSpec(P, bound= None)], typing.TypeVar(T)]

betty.asyncio.wait(f: Awaitable[betty.asyncio.T]) betty.asyncio.T[source]

Wait for an awaitable, either in a new event loop or another thread.

Parameters:

f (typing.Awaitable[typing.TypeVar(T)])

Return type:

typing.TypeVar(T)

betty.asyncio.wait_to_thread(f: Awaitable[betty.asyncio.T]) betty.asyncio.T[source]

Wait for an awaitable in another thread.

Parameters:

f (typing.Awaitable[typing.TypeVar(T)])

Return type:

typing.TypeVar(T)