Source code for pyflink.dataframe.convert

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""
Conversion utilities for creating DataFrame from Python objects.
"""

from typing import Any, List, Mapping, Sequence, Union, Optional, TYPE_CHECKING

from pyflink.dataframe.context import get_or_create_table_environment
from pyflink.dataframe.dataframe import DataFrame

if TYPE_CHECKING:
    from pyflink.table import Table

__all__ = ["from_dict", "from_records", "from_table", "from_pandas"]


[docs] def from_table(table: "Table") -> DataFrame: """ Create a DataFrame from a PyFlink Table. Args: table: The PyFlink Table to wrap. Returns: A new DataFrame instance. Example:: >>> import pyflink.dataframe as pf >>> table = t_env.from_elements([(1, 'a')], ['id', 'name']) >>> df = pf.from_table(table) """ return DataFrame(table)
[docs] def from_pandas( pdf: Any, schema: Optional[List[str]] = None, ) -> DataFrame: """ Create a DataFrame from a Pandas DataFrame. Args: pdf: The Pandas DataFrame. schema: Optional list of column names. If None, uses pandas column names. Returns: A new DataFrame instance. Example:: >>> import pandas as pd >>> import pyflink.dataframe as pf >>> pdf = pd.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]}) >>> df = pf.from_pandas(pdf) """ t_env = get_or_create_table_environment() table = t_env.from_pandas(pdf, schema) return DataFrame(table)
[docs] def from_dict( data: Mapping[str, Sequence[Any]], schema: Optional[List[str]] = None, ) -> DataFrame: """ Create a DataFrame from a dictionary of sequences. Args: data: A dictionary where keys are column names and values are sequences of column values. All sequences must have the same length. schema: Optional list of column names. If provided, only these columns will be used, in this order. If None, uses all keys from data. Returns: A new DataFrame. Example:: >>> import pyflink.dataframe as pf >>> df = pf.from_dict({"a": [1, 2, 3], "b": ["x", "y", "z"]}) """ t_env = get_or_create_table_environment() # Validate that all columns have the same length if not data: raise ValueError("data dictionary cannot be empty") lengths = {key: len(values) for key, values in data.items()} if len(set(lengths.values())) > 1: raise ValueError( f"All columns must have the same length. Got lengths: {lengths}" ) # Convert to list of rows for from_elements num_rows = len(next(iter(data.values()))) columns = schema if schema else list(data.keys()) # Validate schema columns exist in data for col_name in columns: if col_name not in data: raise ValueError(f"Column '{col_name}' not found in data") rows = [] for i in range(num_rows): row = tuple(data[col][i] for col in columns) rows.append(row) table = t_env.from_elements(rows, columns) return DataFrame(table)
[docs] def from_records( data: Sequence[Union[Sequence[Any], Mapping[str, Any]]], schema: Optional[List[str]] = None, ) -> DataFrame: """ Create a DataFrame from a sequence of records. Args: data: A sequence of records. Each record can be: - A sequence (tuple/list) of values - A dictionary with column names as keys schema: Column names. Required when data contains sequences. Ignored when data contains dictionaries (keys become column names). Returns: A new DataFrame. Example:: >>> import pyflink.dataframe as pf >>> # From list of dicts >>> df = pf.from_records([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}]) >>> # From list of tuples with schema >>> df = pf.from_records([(1, "x"), (2, "y")], schema=["a", "b"]) """ if not data: raise ValueError("data cannot be empty") t_env = get_or_create_table_environment() # Check the type of the first record first_record = data[0] if isinstance(first_record, Mapping): # Data is list of dicts if schema is None: # Infer schema from keys of the first record schema = list(first_record.keys()) # Convert dicts to tuples rows = [] for record in data: row = tuple(record.get(col) for col in schema) rows.append(row) table = t_env.from_elements(rows, schema) return DataFrame(table) elif isinstance(first_record, (list, tuple)): # Data is list of sequences if schema is None: raise ValueError( "schema is required when data contains sequences (list/tuple). " "Please provide column names via schema parameter." ) # Validate each row has the same length as schema expected_len = len(schema) for i, record in enumerate(data): if len(record) != expected_len: raise ValueError( f"Record at index {i} has {len(record)} elements, " f"expected {expected_len} (length of schema)" ) table = t_env.from_elements(data, schema) return DataFrame(table) else: raise TypeError( f"Unsupported record type: {type(first_record)}. " "Expected dict, list, or tuple." )