General function for running functions asynchronously

General function for running functions asynchronously#

We use the this following snippet for running any function asynchronously:

import asyncio
import nest_asyncio
import functools
import contextvars
from typing import Callable, Any, TypeVar

T = TypeVar("T")


async def run_in_threadpool(func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Mock Sync function for Async call.Any
    Code from https://github.com/encode/starlette/blob/master/starlette/concurrency.py
    """
    loop = asyncio.get_event_loop_policy().new_event_loop()
    nest_asyncio.apply(loop)
    asyncio.set_event_loop(loop)
    # Ensure we run in the same context
    child = functools.partial(func, *args, **kwargs)
    context = contextvars.copy_context()
    func = context.run
    args = (child,)
    return await loop.run_in_executor(None, func, *args)