Skip to content

๐Ÿšฆย ย Concurrency limiter

Submitted by Karen Javadyan

Summary

This decorator limit function execution concurrency with max_concurrency param.

Functions

concurrency_limiter

Decorator that limits function concurrent execution in Stremalit app.

Parameters:

Name Type Description Default
max_concurrency int

The number of allowed instances of the decorated function to be run simultaneously. Defaults to 1.

1
show_spinner bool

If True, a spinner will be shown while waiting for the function to be executed.

True

Returns:

Name Type Description
Callable F | Callable[[F], F]

The decorated function with concurrency limiting applied.

Source code in src/streamlit_extras/concurrency_limiter/__init__.py
@extra
def concurrency_limiter(
    func: F | None = None, max_concurrency: int = 1, show_spinner: bool = True
) -> F | Callable[[F], F]:
    """Decorator that limits function concurrent execution in Stremalit app.

    Args:
        max_concurrency (int): The number of allowed instances of the decorated function
            to be run simultaneously. Defaults to 1.
        show_spinner (bool): If True, a spinner will be shown while waiting for the
            function to be executed.

    Returns:
        Callable: The decorated function with concurrency limiting applied.
    """

    if func is None:
        return partial(  # type: ignore[return-value]
            concurrency_limiter,
            max_concurrency=max_concurrency,
            show_spinner=show_spinner,
        )

    function_key = _make_function_key(func, max_concurrency)

    with SEMAPHORES_LOCK:
        if function_key not in CONCURRENCY_MAP:
            CONCURRENCY_MAP[function_key] = FuncConcurrencyInfo(
                semaphore=Semaphore(max_concurrency),
                condition=Condition(),
            )

    @wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        func_info = CONCURRENCY_MAP[function_key]
        acquired = False

        COUNTERS.update({function_key: 1})

        try:
            with func_info.condition:
                while not (acquired := func_info.semaphore.acquire(blocking=False)):
                    if show_spinner:
                        num_of_instances = COUNTERS[function_key] - max_concurrency
                        text = f"""Function {func.__name__} has approximately {num_of_instances} instances waiting..."""
                        with st.spinner(text):
                            func_info.condition.wait()
                    else:
                        func_info.condition.wait()

            return func(*args, **kwargs)
        finally:
            COUNTERS.update({function_key: -1})
            with func_info.condition:
                if acquired:
                    func_info.semaphore.release()
                func_info.condition.notify_all()

    return wrapper

Import:

from streamlit_extras.concurrency_limiter import concurrency_limiter # (1)!
  1. You should add this to the top of your .py file ๐Ÿ› 

Examples

example

def example() -> None:
    @concurrency_limiter(max_concurrency=1)  # type: ignore[arg-type]
    def heavy_computation() -> int:
        st.write("Heavy computation")
        progress_text = "Operation in progress. Please wait."
        my_bar = st.progress(0, text=progress_text)

        for percent_complete in range(100):
            time.sleep(0.15)
            my_bar.progress(percent_complete + 1, text=progress_text)
        st.write("END OF Heavy computation")
        return 42

    my_button = st.button("Run heavy computation")

    if my_button:
        heavy_computation()