Source code for pyflink.dataframe.context

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""
Global context for DataFrame operations.

This module provides global configuration for DataFrame operations.
"""

from typing import Dict, Optional, TYPE_CHECKING

if TYPE_CHECKING:
    from pyflink.table import StreamTableEnvironment

_global_t_env: Optional["StreamTableEnvironment"] = None


[docs] class DataFrameConfig: """ Configuration for DataFrame operations. Stores key-value config pairs and applies them to the underlying TableEnvironment. If a TableEnvironment is already set, configs are applied immediately. Otherwise, they are buffered and applied when the environment becomes available. Example:: >>> import pyflink.dataframe as pf >>> pf.config.set("parallelism.default", "4") >>> pf.config.get("parallelism.default") '4' """ def __init__(self): self._buffer: Dict[str, str] = {}
[docs] def set(self, key: str, value: str) -> "DataFrameConfig": """ Set a configuration key-value pair. Args: key: The configuration key. value: The configuration value. Returns: self, for chaining. """ self._buffer[key] = value if _global_t_env is not None: _global_t_env.get_config().set(key, value) return self
[docs] def get(self, key: str, default: Optional[str] = None) -> Optional[str]: """ Get a configuration value. If a TableEnvironment is set, reads from its config. Otherwise, reads from the local buffer. Args: key: The configuration key. default: Default value if key is not found. Returns: The configuration value, or default. """ if _global_t_env is not None: table_config = _global_t_env.get_config() if table_config.get_configuration().contains_key(key): return table_config.get_configuration().get_string(key, default) return self._buffer.get(key, default)
def _apply_to(self, t_env: "StreamTableEnvironment") -> None: """Apply all buffered configs to the given TableEnvironment.""" for key, value in self._buffer.items(): t_env.get_config().set(key, value) def __repr__(self) -> str: items = ", ".join(f"{k!r}: {v!r}" for k, v in self._buffer.items()) return f"DataFrameConfig({{{items}}})"
config = DataFrameConfig()
[docs] def set_table_environment(t_env: Optional["StreamTableEnvironment"]) -> None: """ Set the global TableEnvironment for DataFrame operations. This is useful in testing scenarios where you want to reuse the same TableEnvironment across multiple DataFrame creations. Args: t_env: The TableEnvironment to use globally. Pass None to reset. Example:: >>> import pyflink.dataframe as pf >>> # In test setup >>> pf.set_table_environment(self.t_env) >>> # Now from_records, from_dict will use this t_env >>> df = pf.from_records([(1, 2)], schema=["a", "b"]) >>> # In test teardown >>> pf.set_table_environment(None) """ global _global_t_env _global_t_env = t_env if t_env is not None: config._apply_to(t_env)
[docs] def get_table_environment() -> Optional["StreamTableEnvironment"]: """ Get the currently set global TableEnvironment. Returns: The global TableEnvironment, or None if not set. """ return _global_t_env
[docs] def get_or_create_table_environment() -> "StreamTableEnvironment": """ Return the global TableEnvironment, creating and storing one if needed. Buffered DataFrame config is applied when a new environment is created. """ t_env = get_table_environment() if t_env is not None: return t_env from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) set_table_environment(t_env) return t_env