################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
SQL entry point for the DataFrame API.
Allows users to write a SQL SELECT statement that references DataFrames in
the calling scope (or passed explicitly), returning a new DataFrame.
"""
import inspect
import logging
import warnings
from types import FrameType
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional
from pyflink.dataframe.context import (
get_or_create_table_environment,
get_table_environment,
)
from pyflink.dataframe.dataframe import DataFrame
if TYPE_CHECKING:
from pyflink.table import StreamTableEnvironment
__all__ = ["sql"]
_logger = logging.getLogger(__name__)
def _parse_view_name(t_env: "StreamTableEnvironment", name: str) -> None:
t_env._j_tenv.getParser().parseIdentifier(name)
def _is_valid_ident(t_env: "StreamTableEnvironment", name: str) -> bool:
try:
_parse_view_name(t_env, name)
return True
except Exception: # noqa: BLE001
return False
def _validate_view_name(t_env: "StreamTableEnvironment", name: str) -> None:
try:
_parse_view_name(t_env, name)
except Exception as e: # noqa: BLE001
raise ValueError(f"'{name}' is not a valid SQL view name.") from e
def _temp_exists(t_env: "StreamTableEnvironment", name: str) -> bool:
return name in (
set(t_env.list_temporary_tables()) | set(t_env.list_temporary_views())
)
def _resolve_explicit_bindings(
bindings: Mapping[str, DataFrame],
) -> Dict[str, DataFrame]:
"""Validate explicit binding values and return them as a name -> DataFrame map.
Explicit bindings are user contracts: every problem is a hard error.
The user said "use this name for this DataFrame"; we cannot quietly
substitute or skip.
Raises:
ValueError: the value is not a ``DataFrame``
"""
result: Dict[str, DataFrame] = {}
for name, value in bindings.items():
if not isinstance(value, DataFrame):
raise ValueError(
f"binding '{name}' must be a DataFrame, "
f"got {type(value).__name__}"
)
result[name] = value
return result
def _resolve_table_environment(
binding_views: Mapping[str, DataFrame],
auto_candidates: Mapping[str, DataFrame],
) -> "StreamTableEnvironment":
"""Resolve t_env by priority: explicit bindings, auto-bind candidates,
configured or newly created global environment.
"""
if binding_views:
first_name, first_df = next(iter(binding_views.items()))
t_env = first_df._table._t_env
_logger.debug(
"resolved TableEnvironment from explicit binding '%s'", first_name
)
for name, df in binding_views.items():
if df._table._t_env is not t_env:
raise ValueError(
f"binding '{name}' belongs to a different "
f"TableEnvironment; all explicitly bound DataFrames "
f"must share the same TableEnvironment."
)
return t_env
for name, df in auto_candidates.items():
if _is_valid_ident(df._table._t_env, name):
_logger.debug(
"resolved TableEnvironment from auto-bound DataFrame '%s'",
name,
)
return df._table._t_env
t_env = get_table_environment()
resolved_existing = t_env is not None
t_env = get_or_create_table_environment()
if resolved_existing:
_logger.debug("resolved TableEnvironment from configured global environment")
else:
_logger.debug("created new TableEnvironment for SQL query")
return t_env
def _resolve_auto_bind(
candidates: Mapping[str, DataFrame],
bindings: Mapping[str, DataFrame],
t_env: "StreamTableEnvironment",
) -> Dict[str, DataFrame]:
"""Filter caller-scope DataFrames and decide which to register.
Auto-bind is best-effort: it never raises. A candidate is dropped
silently when any of these is true:
- the name is already an explicit binding (bindings win);
- the same ``DataFrame`` is already reachable from SQL via a
binding alias (the user has already routed around the issue).
Otherwise, candidates that can't be registered emit a warning so
the user knows their DataFrame won't be visible in SQL:
- the name is not a valid SQL view name;
- the name collides with an existing temp table/view.
"""
bound_df_ids = {id(df) for df in bindings.values()}
result: Dict[str, DataFrame] = {}
for name, value in candidates.items():
if name in bindings:
# Explicit binding wins; auto-bind ignores this name.
continue
# Beyond this point, anything that prevents registration is a
# silent skip if the DataFrame is otherwise reachable via a
# binding alias, and a warning otherwise.
df_is_aliased = id(value) in bound_df_ids
if not _is_valid_ident(t_env, name):
if df_is_aliased:
continue
warnings.warn(
f"Skipping auto-bind of '{name}': not a valid SQL view "
f"name. To use this DataFrame in your query, pass "
f"it under a valid view name via bindings "
f"(e.g. `pf.sql(..., my_df={name})`).",
stacklevel=3, # warn → _resolve_auto_bind → sql → user
)
continue
if value._table._t_env is not t_env:
_logger.info(
"skipping auto-bind of '%s': DataFrame belongs to a "
"different TableEnvironment",
name,
)
continue
if _temp_exists(t_env, name):
if df_is_aliased:
continue
warnings.warn(
f"Skipping auto-bind of '{name}': a temporary table/view "
f"with this name already exists. To use this DataFrame in "
f"your query, either rename your Python variable, or pass "
f"it under a different name via bindings "
f"(e.g. `pf.sql(..., my_df={name})`).",
stacklevel=3,
)
continue
result[name] = value
return result
def _execute_with_temp_views(
t_env: "StreamTableEnvironment",
to_register: Mapping[str, DataFrame],
query: str,
) -> DataFrame:
"""Register the given views, run the query, drop the views in finally.
Drop failures are logged but never propagated — letting a cleanup
error mask the actual query outcome would be a debugging trap.
"""
registered: List[str] = []
try:
for name, df in to_register.items():
t_env.create_temporary_view(name, df._table)
registered.append(name)
_logger.debug("registered temp view '%s'", name)
result = t_env.sql_query(query)
return DataFrame(result)
finally:
for name in registered:
try:
t_env.drop_temporary_view(name)
_logger.debug("dropped temp view '%s'", name)
except Exception as e: # noqa: BLE001
_logger.warning(
"drop_temporary_view(%s) failed: %s", name, e
)
[docs]
def sql(
query: str,
*,
auto_bind: bool = True,
**bindings: DataFrame,
) -> DataFrame:
"""
Run a SQL SELECT query against DataFrames in scope, returning a DataFrame.
There are two independent ways DataFrames get bound to SQL names:
- **Explicit bindings** via ``**bindings`` kwargs. These are user
contracts: any problem (bad view name, non-DataFrame value, collision
with an existing temporary table/view) is reported as ``ValueError``.
- **Auto-bind** (when ``auto_bind=True``, the default): scans the
caller's globals and locals for ``DataFrame`` instances and
registers them under their Python variable name. Best-effort:
collisions warn and skip, invalid view names warn and skip,
different-TableEnvironment DataFrames log and skip.
Args:
query: A SQL SELECT statement. Other statement kinds (INSERT, DDL)
are not supported.
auto_bind: If True (default), auto-detect ``DataFrame`` instances
in the caller's globals and locals. See above.
**bindings: Explicit alias -> DataFrame mappings. Always
registered, even when ``auto_bind`` is False. Take
precedence over auto-bind on name collision.
Returns:
A new DataFrame backed by the query result.
Note:
This function is **not safe** under concurrent calls against the
same TableEnvironment: register / sql_query / drop is not an
atomic sequence. Don't call it from multiple threads sharing one
t_env.
Example::
>>> import pyflink.dataframe as pf
>>> df1 = pf.from_dict({"a": [1, 2, 3], "b": ["x", "y", "z"]})
>>> df2 = pf.from_dict({"a": [1, 2, 3], "c": ["p", "q", "r"]})
>>> pf.sql("SELECT * FROM df1 JOIN df2 ON df1.a = df2.a").show()
"""
binding_views = _resolve_explicit_bindings(bindings)
auto_candidates: Dict[str, DataFrame] = {}
if auto_bind:
# `sql` is a plain function with no decorators, so the user's
# frame is exactly one f_back. If a decorator is ever added,
# this walk must be updated in lockstep — test_frame_back_level
# pins the invariant.
frame = inspect.currentframe()
caller: Optional[FrameType] = frame.f_back if frame is not None else None
try:
if caller is not None:
scope: Dict[str, object] = {}
scope.update(caller.f_globals)
scope.update(caller.f_locals)
auto_candidates = {
name: value
for name, value in scope.items()
if isinstance(value, DataFrame)
}
finally:
del frame, caller
t_env = _resolve_table_environment(binding_views, auto_candidates)
for name in binding_views:
_validate_view_name(t_env, name)
if _temp_exists(t_env, name):
raise ValueError(
f"binding '{name}' conflicts with an existing temporary "
f"table/view of the same name. Use a different binding "
f"name, or drop the existing '{name}' first."
)
auto_views: Dict[str, DataFrame] = {}
if auto_bind:
auto_views = _resolve_auto_bind(auto_candidates, binding_views, t_env)
# _resolve_auto_bind already excluded any name present in bindings,
# so the union is disjoint. Bindings listed second to make precedence
# obvious to readers.
to_register: Dict[str, DataFrame] = {**auto_views, **binding_views}
return _execute_with_temp_views(t_env, to_register, query)