Skip to content

🚦  Concurrency limiter for your Streamlit app

Submitted by Karen Javadyan

Summary

This decorator limit function execution concurrency with max_concurrency param.

Functions

concurrency_limiter

Decorator that limits function concurrent execution in Stremalit app.

Parameters:

Name Type Description Default
max_concurrency int

The number of allowed instances of the decorated function to be run simultaneously Defaults to 1.

1
show_spinner bool

If True, a spinner will be shown while waiting for the function to be executed.

True
Source code in src/streamlit_extras/concurrency_limiter/__init__.py
@extra
def concurrency_limiter(func=None, max_concurrency: int = 1, show_spinner: bool = True):
    """Decorator that limits function concurrent execution in Stremalit app.

    Args:
        max_concurrency (int): The number of allowed instances of the decorated function to be run simultaneously
             Defaults to 1.
        show_spinner (bool): If True, a spinner will be shown while waiting for the function to be executed.
    """

    if func is None:
        return partial(
            concurrency_limiter,
            max_concurrency=max_concurrency,
            show_spinner=show_spinner,
        )

    function_key = _make_function_key(func, max_concurrency)

    with SEMAPHORES_LOCK:
        if function_key not in CONCURRENCY_MAP:
            CONCURRENCY_MAP[function_key] = FuncConcurrencyInfo(
                semaphore=Semaphore(max_concurrency),
                condition=Condition(),
            )

    @wraps(func)
    def wrapper(*args, **kwargs):
        func_info = CONCURRENCY_MAP[function_key]
        acquired = False

        COUNTERS.update({function_key: 1})

        try:
            with func_info.condition:
                while not (acquired := func_info.semaphore.acquire(blocking=False)):
                    if show_spinner:
                        num_of_instances = COUNTERS[function_key] - max_concurrency
                        text = f"""Function {func.__name__} has approximately {num_of_instances} instances waiting..."""
                        with st.spinner(text):
                            func_info.condition.wait()
                    else:
                        func_info.condition.wait()

            return func(*args, **kwargs)
        finally:
            COUNTERS.update({function_key: -1})
            with func_info.condition:
                if acquired:
                    func_info.semaphore.release()
                func_info.condition.notify_all()

    return wrapper

Import:

from streamlit_extras.concurrency_limiter import concurrency_limiter # (1)!
  1. You should add this to the top of your .py file πŸ› 

Examples

example

def example():
    @concurrency_limiter(max_concurrency=1)
    def heavy_computation():
        st.write("Heavy computation")
        progress_text = "Operation in progress. Please wait."
        my_bar = st.progress(0, text=progress_text)

        for percent_complete in range(100):
            time.sleep(0.15)
            my_bar.progress(percent_complete + 1, text=progress_text)
        st.write("END OF Heavy computation")
        return 42

    my_button = st.button("Run heavy computation")

    if my_button:
        heavy_computation()
Output (beta)