Source code for pyflink.dataframe.sql

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""
SQL entry point for the DataFrame API.

Allows users to write a SQL SELECT statement that references DataFrames in
the calling scope (or passed explicitly), returning a new DataFrame.
"""

import inspect
import logging
import warnings
from types import FrameType
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional

from pyflink.dataframe.context import (
    get_or_create_table_environment,
    get_table_environment,
)
from pyflink.dataframe.dataframe import DataFrame

if TYPE_CHECKING:
    from pyflink.table import StreamTableEnvironment

__all__ = ["sql"]

_logger = logging.getLogger(__name__)


def _parse_view_name(t_env: "StreamTableEnvironment", name: str) -> None:
    t_env._j_tenv.getParser().parseIdentifier(name)


def _is_valid_ident(t_env: "StreamTableEnvironment", name: str) -> bool:
    try:
        _parse_view_name(t_env, name)
        return True
    except Exception:  # noqa: BLE001
        return False


def _validate_view_name(t_env: "StreamTableEnvironment", name: str) -> None:
    try:
        _parse_view_name(t_env, name)
    except Exception as e:  # noqa: BLE001
        raise ValueError(f"'{name}' is not a valid SQL view name.") from e


def _temp_exists(t_env: "StreamTableEnvironment", name: str) -> bool:
    return name in (
        set(t_env.list_temporary_tables()) | set(t_env.list_temporary_views())
    )


def _resolve_explicit_bindings(
    bindings: Mapping[str, DataFrame],
) -> Dict[str, DataFrame]:
    """Validate explicit binding values and return them as a name -> DataFrame map.

    Explicit bindings are user contracts: every problem is a hard error.
    The user said "use this name for this DataFrame"; we cannot quietly
    substitute or skip.

    Raises:
        ValueError: the value is not a ``DataFrame``
    """
    result: Dict[str, DataFrame] = {}
    for name, value in bindings.items():
        if not isinstance(value, DataFrame):
            raise ValueError(
                f"binding '{name}' must be a DataFrame, "
                f"got {type(value).__name__}"
            )
        result[name] = value
    return result


def _resolve_table_environment(
    binding_views: Mapping[str, DataFrame],
    auto_candidates: Mapping[str, DataFrame],
) -> "StreamTableEnvironment":
    """Resolve t_env by priority: explicit bindings, auto-bind candidates,
    configured or newly created global environment.
    """
    if binding_views:
        first_name, first_df = next(iter(binding_views.items()))
        t_env = first_df._table._t_env
        _logger.debug(
            "resolved TableEnvironment from explicit binding '%s'", first_name
        )
        for name, df in binding_views.items():
            if df._table._t_env is not t_env:
                raise ValueError(
                    f"binding '{name}' belongs to a different "
                    f"TableEnvironment; all explicitly bound DataFrames "
                    f"must share the same TableEnvironment."
                )
        return t_env

    for name, df in auto_candidates.items():
        if _is_valid_ident(df._table._t_env, name):
            _logger.debug(
                "resolved TableEnvironment from auto-bound DataFrame '%s'",
                name,
            )
            return df._table._t_env

    t_env = get_table_environment()
    resolved_existing = t_env is not None
    t_env = get_or_create_table_environment()
    if resolved_existing:
        _logger.debug("resolved TableEnvironment from configured global environment")
    else:
        _logger.debug("created new TableEnvironment for SQL query")
    return t_env


def _resolve_auto_bind(
    candidates: Mapping[str, DataFrame],
    bindings: Mapping[str, DataFrame],
    t_env: "StreamTableEnvironment",
) -> Dict[str, DataFrame]:
    """Filter caller-scope DataFrames and decide which to register.

    Auto-bind is best-effort: it never raises. A candidate is dropped
    silently when any of these is true:

    - the name is already an explicit binding (bindings win);
    - the same ``DataFrame`` is already reachable from SQL via a
      binding alias (the user has already routed around the issue).

    Otherwise, candidates that can't be registered emit a warning so
    the user knows their DataFrame won't be visible in SQL:

    - the name is not a valid SQL view name;
    - the name collides with an existing temp table/view.
    """
    bound_df_ids = {id(df) for df in bindings.values()}

    result: Dict[str, DataFrame] = {}
    for name, value in candidates.items():
        if name in bindings:
            # Explicit binding wins; auto-bind ignores this name.
            continue
        # Beyond this point, anything that prevents registration is a
        # silent skip if the DataFrame is otherwise reachable via a
        # binding alias, and a warning otherwise.
        df_is_aliased = id(value) in bound_df_ids
        if not _is_valid_ident(t_env, name):
            if df_is_aliased:
                continue
            warnings.warn(
                f"Skipping auto-bind of '{name}': not a valid SQL view "
                f"name. To use this DataFrame in your query, pass "
                f"it under a valid view name via bindings "
                f"(e.g. `pf.sql(..., my_df={name})`).",
                stacklevel=3,  # warn → _resolve_auto_bind → sql → user
            )
            continue
        if value._table._t_env is not t_env:
            _logger.info(
                "skipping auto-bind of '%s': DataFrame belongs to a "
                "different TableEnvironment",
                name,
            )
            continue
        if _temp_exists(t_env, name):
            if df_is_aliased:
                continue
            warnings.warn(
                f"Skipping auto-bind of '{name}': a temporary table/view "
                f"with this name already exists. To use this DataFrame in "
                f"your query, either rename your Python variable, or pass "
                f"it under a different name via bindings "
                f"(e.g. `pf.sql(..., my_df={name})`).",
                stacklevel=3,
            )
            continue
        result[name] = value
    return result


def _execute_with_temp_views(
    t_env: "StreamTableEnvironment",
    to_register: Mapping[str, DataFrame],
    query: str,
) -> DataFrame:
    """Register the given views, run the query, drop the views in finally.

    Drop failures are logged but never propagated — letting a cleanup
    error mask the actual query outcome would be a debugging trap.
    """
    registered: List[str] = []
    try:
        for name, df in to_register.items():
            t_env.create_temporary_view(name, df._table)
            registered.append(name)
            _logger.debug("registered temp view '%s'", name)

        result = t_env.sql_query(query)
        return DataFrame(result)
    finally:
        for name in registered:
            try:
                t_env.drop_temporary_view(name)
                _logger.debug("dropped temp view '%s'", name)
            except Exception as e:  # noqa: BLE001
                _logger.warning(
                    "drop_temporary_view(%s) failed: %s", name, e
                )


[docs] def sql( query: str, *, auto_bind: bool = True, **bindings: DataFrame, ) -> DataFrame: """ Run a SQL SELECT query against DataFrames in scope, returning a DataFrame. There are two independent ways DataFrames get bound to SQL names: - **Explicit bindings** via ``**bindings`` kwargs. These are user contracts: any problem (bad view name, non-DataFrame value, collision with an existing temporary table/view) is reported as ``ValueError``. - **Auto-bind** (when ``auto_bind=True``, the default): scans the caller's globals and locals for ``DataFrame`` instances and registers them under their Python variable name. Best-effort: collisions warn and skip, invalid view names warn and skip, different-TableEnvironment DataFrames log and skip. Args: query: A SQL SELECT statement. Other statement kinds (INSERT, DDL) are not supported. auto_bind: If True (default), auto-detect ``DataFrame`` instances in the caller's globals and locals. See above. **bindings: Explicit alias -> DataFrame mappings. Always registered, even when ``auto_bind`` is False. Take precedence over auto-bind on name collision. Returns: A new DataFrame backed by the query result. Note: This function is **not safe** under concurrent calls against the same TableEnvironment: register / sql_query / drop is not an atomic sequence. Don't call it from multiple threads sharing one t_env. Example:: >>> import pyflink.dataframe as pf >>> df1 = pf.from_dict({"a": [1, 2, 3], "b": ["x", "y", "z"]}) >>> df2 = pf.from_dict({"a": [1, 2, 3], "c": ["p", "q", "r"]}) >>> pf.sql("SELECT * FROM df1 JOIN df2 ON df1.a = df2.a").show() """ binding_views = _resolve_explicit_bindings(bindings) auto_candidates: Dict[str, DataFrame] = {} if auto_bind: # `sql` is a plain function with no decorators, so the user's # frame is exactly one f_back. If a decorator is ever added, # this walk must be updated in lockstep — test_frame_back_level # pins the invariant. frame = inspect.currentframe() caller: Optional[FrameType] = frame.f_back if frame is not None else None try: if caller is not None: scope: Dict[str, object] = {} scope.update(caller.f_globals) scope.update(caller.f_locals) auto_candidates = { name: value for name, value in scope.items() if isinstance(value, DataFrame) } finally: del frame, caller t_env = _resolve_table_environment(binding_views, auto_candidates) for name in binding_views: _validate_view_name(t_env, name) if _temp_exists(t_env, name): raise ValueError( f"binding '{name}' conflicts with an existing temporary " f"table/view of the same name. Use a different binding " f"name, or drop the existing '{name}' first." ) auto_views: Dict[str, DataFrame] = {} if auto_bind: auto_views = _resolve_auto_bind(auto_candidates, binding_views, t_env) # _resolve_auto_bind already excluded any name present in bindings, # so the union is disjoint. Bindings listed second to make precedence # obvious to readers. to_register: Dict[str, DataFrame] = {**auto_views, **binding_views} return _execute_with_temp_views(t_env, to_register, query)