Source code for pyflink.dataframe.context
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Global context for DataFrame operations.
This module provides global configuration for DataFrame operations.
"""
from typing import Dict, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from pyflink.table import StreamTableEnvironment
_global_t_env: Optional["StreamTableEnvironment"] = None
[docs]
class DataFrameConfig:
"""
Configuration for DataFrame operations.
Stores key-value config pairs and applies them to the underlying
TableEnvironment. If a TableEnvironment is already set, configs are
applied immediately. Otherwise, they are buffered and applied when
the environment becomes available.
Example::
>>> import pyflink.dataframe as pf
>>> pf.config.set("parallelism.default", "4")
>>> pf.config.get("parallelism.default")
'4'
"""
def __init__(self):
self._buffer: Dict[str, str] = {}
[docs]
def set(self, key: str, value: str) -> "DataFrameConfig":
"""
Set a configuration key-value pair.
Args:
key: The configuration key.
value: The configuration value.
Returns:
self, for chaining.
"""
self._buffer[key] = value
if _global_t_env is not None:
_global_t_env.get_config().set(key, value)
return self
[docs]
def get(self, key: str, default: Optional[str] = None) -> Optional[str]:
"""
Get a configuration value.
If a TableEnvironment is set, reads from its config.
Otherwise, reads from the local buffer.
Args:
key: The configuration key.
default: Default value if key is not found.
Returns:
The configuration value, or default.
"""
if _global_t_env is not None:
table_config = _global_t_env.get_config()
if table_config.get_configuration().contains_key(key):
return table_config.get_configuration().get_string(key, default)
return self._buffer.get(key, default)
def _apply_to(self, t_env: "StreamTableEnvironment") -> None:
"""Apply all buffered configs to the given TableEnvironment."""
for key, value in self._buffer.items():
t_env.get_config().set(key, value)
def __repr__(self) -> str:
items = ", ".join(f"{k!r}: {v!r}" for k, v in self._buffer.items())
return f"DataFrameConfig({{{items}}})"
config = DataFrameConfig()
[docs]
def set_table_environment(t_env: Optional["StreamTableEnvironment"]) -> None:
"""
Set the global TableEnvironment for DataFrame operations.
This is useful in testing scenarios where you want to reuse
the same TableEnvironment across multiple DataFrame creations.
Args:
t_env: The TableEnvironment to use globally. Pass None to reset.
Example::
>>> import pyflink.dataframe as pf
>>> # In test setup
>>> pf.set_table_environment(self.t_env)
>>> # Now from_records, from_dict will use this t_env
>>> df = pf.from_records([(1, 2)], schema=["a", "b"])
>>> # In test teardown
>>> pf.set_table_environment(None)
"""
global _global_t_env
_global_t_env = t_env
if t_env is not None:
config._apply_to(t_env)
[docs]
def get_table_environment() -> Optional["StreamTableEnvironment"]:
"""
Get the currently set global TableEnvironment.
Returns:
The global TableEnvironment, or None if not set.
"""
return _global_t_env
[docs]
def get_or_create_table_environment() -> "StreamTableEnvironment":
"""
Return the global TableEnvironment, creating and storing one if needed.
Buffered DataFrame config is applied when a new environment is created.
"""
t_env = get_table_environment()
if t_env is not None:
return t_env
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
set_table_environment(t_env)
return t_env