################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Conversion utilities for creating DataFrame from Python objects.
"""
from typing import Any, List, Mapping, Sequence, Union, Optional, TYPE_CHECKING
from pyflink.dataframe.context import get_or_create_table_environment
from pyflink.dataframe.dataframe import DataFrame
if TYPE_CHECKING:
from pyflink.table import Table
__all__ = ["from_dict", "from_records", "from_table", "from_pandas"]
[docs]
def from_table(table: "Table") -> DataFrame:
"""
Create a DataFrame from a PyFlink Table.
Args:
table: The PyFlink Table to wrap.
Returns:
A new DataFrame instance.
Example::
>>> import pyflink.dataframe as pf
>>> table = t_env.from_elements([(1, 'a')], ['id', 'name'])
>>> df = pf.from_table(table)
"""
return DataFrame(table)
[docs]
def from_pandas(
pdf: Any,
schema: Optional[List[str]] = None,
) -> DataFrame:
"""
Create a DataFrame from a Pandas DataFrame.
Args:
pdf: The Pandas DataFrame.
schema: Optional list of column names. If None, uses pandas column names.
Returns:
A new DataFrame instance.
Example::
>>> import pandas as pd
>>> import pyflink.dataframe as pf
>>> pdf = pd.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]})
>>> df = pf.from_pandas(pdf)
"""
t_env = get_or_create_table_environment()
table = t_env.from_pandas(pdf, schema)
return DataFrame(table)
[docs]
def from_dict(
data: Mapping[str, Sequence[Any]],
schema: Optional[List[str]] = None,
) -> DataFrame:
"""
Create a DataFrame from a dictionary of sequences.
Args:
data: A dictionary where keys are column names and values are
sequences of column values. All sequences must have the same length.
schema: Optional list of column names. If provided, only these columns
will be used, in this order. If None, uses all keys from data.
Returns:
A new DataFrame.
Example::
>>> import pyflink.dataframe as pf
>>> df = pf.from_dict({"a": [1, 2, 3], "b": ["x", "y", "z"]})
"""
t_env = get_or_create_table_environment()
# Validate that all columns have the same length
if not data:
raise ValueError("data dictionary cannot be empty")
lengths = {key: len(values) for key, values in data.items()}
if len(set(lengths.values())) > 1:
raise ValueError(
f"All columns must have the same length. Got lengths: {lengths}"
)
# Convert to list of rows for from_elements
num_rows = len(next(iter(data.values())))
columns = schema if schema else list(data.keys())
# Validate schema columns exist in data
for col_name in columns:
if col_name not in data:
raise ValueError(f"Column '{col_name}' not found in data")
rows = []
for i in range(num_rows):
row = tuple(data[col][i] for col in columns)
rows.append(row)
table = t_env.from_elements(rows, columns)
return DataFrame(table)
[docs]
def from_records(
data: Sequence[Union[Sequence[Any], Mapping[str, Any]]],
schema: Optional[List[str]] = None,
) -> DataFrame:
"""
Create a DataFrame from a sequence of records.
Args:
data: A sequence of records. Each record can be:
- A sequence (tuple/list) of values
- A dictionary with column names as keys
schema: Column names. Required when data contains sequences.
Ignored when data contains dictionaries (keys become column names).
Returns:
A new DataFrame.
Example::
>>> import pyflink.dataframe as pf
>>> # From list of dicts
>>> df = pf.from_records([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}])
>>> # From list of tuples with schema
>>> df = pf.from_records([(1, "x"), (2, "y")], schema=["a", "b"])
"""
if not data:
raise ValueError("data cannot be empty")
t_env = get_or_create_table_environment()
# Check the type of the first record
first_record = data[0]
if isinstance(first_record, Mapping):
# Data is list of dicts
if schema is None:
# Infer schema from keys of the first record
schema = list(first_record.keys())
# Convert dicts to tuples
rows = []
for record in data:
row = tuple(record.get(col) for col in schema)
rows.append(row)
table = t_env.from_elements(rows, schema)
return DataFrame(table)
elif isinstance(first_record, (list, tuple)):
# Data is list of sequences
if schema is None:
raise ValueError(
"schema is required when data contains sequences (list/tuple). "
"Please provide column names via schema parameter."
)
# Validate each row has the same length as schema
expected_len = len(schema)
for i, record in enumerate(data):
if len(record) != expected_len:
raise ValueError(
f"Record at index {i} has {len(record)} elements, "
f"expected {expected_len} (length of schema)"
)
table = t_env.from_elements(data, schema)
return DataFrame(table)
else:
raise TypeError(
f"Unsupported record type: {type(first_record)}. "
"Expected dict, list, or tuple."
)