Skip to content

❄️  Snowflake Utilities

Submitted by Zachary Blackwood

Summary

Utilities for Streamlit-in-Snowflake

Functions

get_table

Get a Snowpark table for use in building a query.

Parameters:

Name Type Description Default
table_name str

Name of the table to retrieve

required

Returns:

Type Description
Table

sp.Table: A cached Snowpark Table object that can be used for querying. The result is cached so that metadata is not re-fetched from the database.

Source code in src/streamlit_extras/snowflake/connection.py
@extra
@st.cache_resource
def get_table(table_name: str) -> sp.Table:
    """
    Get a Snowpark table for use in building a query.

    Args:
        table_name (str): Name of the table to retrieve

    Returns:
        sp.Table: A cached Snowpark Table object that can be used for querying.
            The result is cached so that metadata is not re-fetched from the database.
    """
    return _get_session().table(table_name)

Import:

from streamlit_extras.snowflake import get_table # (1)!
  1. You should add this to the top of your .py file 🛠

run_snowpark

Convert a Snowpark DataFrame to a pandas DataFrame and cache the result.

Parameters:

Name Type Description Default
df DataFrame

The Snowpark DataFrame to convert

required
ttl timedelta | int | None

Time-to-live for the cache. Defaults to 2 hours. Set to None to use the default cache invalidation.

timedelta(hours=2)
lowercase_columns bool

Whether to convert column names to lowercase. Defaults to True.

True

Returns:

Type Description
DataFrame

pd.DataFrame: The converted pandas DataFrame with cached results

Source code in src/streamlit_extras/snowflake/connection.py
@extra
def run_snowpark(
    df: sp.DataFrame,
    ttl: timedelta | int | None = timedelta(hours=2),
    lowercase_columns: bool = True,
) -> pd.DataFrame:
    """
    Convert a Snowpark DataFrame to a pandas DataFrame and cache the result.

    Args:
        df (sp.DataFrame): The Snowpark DataFrame to convert
        ttl (timedelta | int | None): Time-to-live for the cache. Defaults to 2 hours.
            Set to None to use the default cache invalidation.
        lowercase_columns (bool): Whether to convert column names to lowercase. Defaults to True.

    Returns:
        pd.DataFrame: The converted pandas DataFrame with cached results
    """

    @st.cache_data(ttl=ttl)
    def _run_snowpark(
        _df: sp.DataFrame, query: str, lowercase_columns: bool
    ) -> pd.DataFrame:
        _ = query
        df = _df.to_pandas()

        if lowercase_columns:
            df.columns = df.columns.str.lower()

        return df

    query = df._plan.queries[0].sql

    return _run_snowpark(df, query, lowercase_columns=lowercase_columns)

Import:

from streamlit_extras.snowflake import run_snowpark # (1)!
  1. You should add this to the top of your .py file 🛠

run_sql

Execute a SQL query and cache the results.

Parameters:

Name Type Description Default
query str

The SQL query to execute

required
ttl timedelta | int | None

Time-to-live for the cache. Defaults to 2 hours. Set to None to use the default cache invalidation.

timedelta(hours=2)
lowercase_columns bool

Whether to convert column names to lowercase. Defaults to True.

True

Returns:

Type Description

pd.DataFrame: The query results as a pandas DataFrame

Source code in src/streamlit_extras/snowflake/connection.py
@extra
def run_sql(
    query: str,
    ttl: timedelta | int | None = timedelta(hours=2),
    lowercase_columns: bool = True,
):
    """
    Execute a SQL query and cache the results.

    Args:
        query (str): The SQL query to execute
        ttl (timedelta | int | None): Time-to-live for the cache. Defaults to 2 hours.
            Set to None to use the default cache invalidation.
        lowercase_columns (bool): Whether to convert column names to lowercase. Defaults to True.

    Returns:
        pd.DataFrame: The query results as a pandas DataFrame
    """

    @st.cache_data(ttl=ttl)
    def _run_sql(query: str) -> pd.DataFrame:
        return _get_session().sql(query).to_pandas()

    df = _run_sql(query)

    if lowercase_columns:
        df.columns = df.columns.str.lower()

    return df

Import:

from streamlit_extras.snowflake import run_sql # (1)!
  1. You should add this to the top of your .py file 🛠

Examples

snowpark_example

def snowpark_example():
    from snowflake.snowpark.functions import col

    df = (
        get_table("snowflake.information_schema.tables")
        .select("table_name", "table_schema", "created")
        .where(col("table_type") == "VIEW")
        .limit(10)
    )

    st.dataframe(run_snowpark(df))

sql_example

def sql_example():
    df = run_sql("""
    select
        table_name,
        table_schema,
        created
    from snowflake.information_schema.tables
    where table_type = 'VIEW'
    limit 10
    """)
    st.dataframe(df)