################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
I/O utilities for reading external data sources into DataFrame.
"""
import uuid
from datetime import datetime
from typing import Any, Dict, List, Literal, Optional, Tuple, Union, TYPE_CHECKING
from pyflink.dataframe.dataframe import DataFrame
if TYPE_CHECKING:
from pyflink.dataframe.datatype import DataType
__all__ = [
"read_parquet",
"read_kafka",
"read_odps",
"read_paimon",
"read_hologres",
"read_sls",
"read_custom",
]
_ODPS_LONG_MAX_VALUE = 9223372036854775807
_ODPS_COMPRESS_ALGORITHMS = {"RAW", "ZLIB", "SNAPPY"}
_ODPS_COMPRESS_CODECS = {"", "ZSTD", "LZ4_FRAME"}
_ODPS_MODIFIED_TABLE_OPERATIONS = {"NONE", "SKIP"}
_ODPS_CACHE_VALUES = {"ALL"}
_ODPS_SINK_OPERATIONS = {"insert", "upsert"}
_SLS_STARTUP_MODES = ("timestamp", "latest", "earliest", "consumer_group")
_PAIMON_FILE_FORMATS = {"orc", "parquet", "avro", "lance"}
_PAIMON_CHANGELOG_PRODUCERS = {"none", "input", "full-compaction", "lookup"}
_PAIMON_MERGE_ENGINES = {"deduplicate", "partial-update", "aggregation"}
_PAIMON_SCAN_MODES = {
"default",
"latest-full",
"latest",
"compacted-full",
"from-timestamp",
"from-snapshot",
}
_PAIMON_WRITE_MODES = {"change-log", "append-only"}
_PAIMON_DELETE_STRATEGIES = {
"NONE",
"IGNORE_DELETE",
"NON_PK_FIELD_TO_NULL",
"DELETE_ROW_ON_PK",
"CHANGELOG_STANDARD",
}
_HOLOGRES_BINLOG_READ_MODES = ("AUTO", "JDBC", "HOLOHUB")
_HOLOGRES_BINLOG_CHANGE_LOG_MODES = ("ALL", "UPSERT", "ALL_AS_APPEND_ONLY")
_HOLOGRES_BINLOG_STARTUP_MODES = (
"INITIAL",
"EARLIEST_OFFSET",
"TIMESTAMP",
"LATEST_OFFSET",
)
_HOLOGRES_PARTITION_BINLOG_MODES = ("DISABLE", "STATIC", "DYNAMIC")
_HOLOGRES_SSL_MODES = ("disable", "require", "verify-ca", "verify-full")
_HOLOGRES_WRITE_MODES = (
"AUTO",
"INSERT",
"COPY_STREAM",
"COPY_BULK_LOAD",
"COPY_BULK_LOAD_ON_CONFLICT",
)
_HOLOGRES_ON_CONFLICT_ACTIONS = (
"INSERT_OR_IGNORE",
"INSERT_OR_UPDATE",
"INSERT_OR_REPLACE",
)
_HOLOGRES_DELETE_STRATEGIES = (
"IGNORE_DELETE",
"NON_PK_FIELD_TO_NULL",
"DELETE_ROW_ON_PK",
"CHANGELOG_STANDARD",
)
_HOLOGRES_COPY_FORMATS = ("binary", "text", "binaryrow")
_HOLOGRES_CACHE_TYPES = ("None", "LRU")
[docs]
def read_parquet(
path: str,
*,
schema: Optional[Dict[str, "DataType"]] = None,
columns: Optional[List[str]] = None,
monitor_interval: Optional[str] = None,
path_regex_pattern: Optional[str] = None,
) -> DataFrame:
"""
Read a Parquet file or directory into a DataFrame.
Uses Flink's FileSystem Connector with Parquet Format under the hood.
Args:
path: Path to a Parquet file, directory, or glob pattern.
Supports local paths and any Flink-supported file system
(HDFS, OSS, S3, etc.).
schema: Dict of {column_name: DataType} specifying the schema.
This parameter is required.
columns: Optional list of column names to read (projection pushdown).
If None, all columns are read.
monitor_interval: Optional interval for continuously monitoring the
directory for new files (e.g., '10s', '1min').
If None, the path is scanned once (bounded source).
If set, the source becomes unbounded (streaming).
path_regex_pattern: Optional regex pattern to filter files.
The pattern is matched against each file's
absolute path. Only files whose path matches
the pattern will be read. This is useful when
reading from a directory that contains mixed
file types or when you want to select a subset
of files based on naming conventions.
Returns:
A new DataFrame.
Example::
>>> import pyflink.dataframe as pf
>>>
>>> # Read with explicit schema
>>> df = pf.read_parquet("/path/to/data.parquet", schema={
... "id": pf.DataType.int64(),
... "name": pf.DataType.string(),
... })
>>>
>>> # Read only specific columns
>>> df = pf.read_parquet("/path/to/data.parquet", schema={
... "id": pf.DataType.int64(),
... "name": pf.DataType.string(),
... "value": pf.DataType.float64(),
... }, columns=["id", "name"])
>>>
>>> # Continuously monitor directory for new files
>>> df = pf.read_parquet("/path/to/dir", schema={
... "id": pf.DataType.int64(),
... "name": pf.DataType.string(),
... }, monitor_interval="10s")
>>>
>>> # Filter files by regex pattern (matched against absolute path)
>>> df = pf.read_parquet("/path/to/dir", schema={
... "id": pf.DataType.int64(),
... "name": pf.DataType.string(),
... }, path_regex_pattern="/path/to/dir/part-.*\\.parquet")
>>>
>>> # Only read files from specific date partitions
>>> df = pf.read_parquet("/path/to/dir", schema={
... "id": pf.DataType.int64(),
... "name": pf.DataType.string(),
... }, path_regex_pattern=".*/dt=2024-01-0[1-3]/.*")
"""
from pyflink.table import TableDescriptor, FormatDescriptor, Schema
from pyflink.table.expressions import col
from pyflink.dataframe.util.artifacts import (
add_built_in_connector,
add_built_in_format,
)
from pyflink.dataframe.context import get_or_create_table_environment
t_env = get_or_create_table_environment()
add_built_in_connector(t_env, "filesystem")
add_built_in_format(t_env, "parquet")
builder = (
TableDescriptor.for_connector("filesystem")
.option("path", path)
.format(FormatDescriptor.for_format("parquet").build())
)
if schema is None:
raise ValueError(
"schema is required for read_parquet. "
"Please provide a dict of {column_name: DataType}.\n"
"Example:\n"
" pf.read_parquet('/path/to/data.parquet', schema={\n"
" 'id': pf.DataType.int64(),\n"
" 'name': pf.DataType.string(),\n"
" })"
)
schema_builder = Schema.new_builder()
for name, dtype in schema.items():
schema_builder.column(name, dtype._table_type)
builder = builder.schema(schema_builder.build())
if monitor_interval is not None:
builder = builder.option("source.monitor-interval", monitor_interval)
if path_regex_pattern is not None:
builder = builder.option("source.path.regex-pattern", path_regex_pattern)
table = t_env.from_descriptor(builder.build())
if columns is not None:
table = table.select(*[col(c) for c in columns])
return DataFrame(table)
[docs]
def read_kafka(
bootstrap_servers: str,
*,
schema: Optional[Dict[str, "DataType"]] = None,
columns: Optional[List[str]] = None,
# General options
properties: Optional[Dict[str, str]] = None,
format: str = "json",
format_options: Optional[Dict[str, str]] = None,
key_format: Optional[str] = None,
key_format_options: Optional[Dict[str, str]] = None,
key_fields: Optional[List[str]] = None,
key_fields_prefix: Optional[str] = None,
value_format: Optional[str] = None,
value_format_options: Optional[Dict[str, str]] = None,
value_fields_include: Literal["ALL", "EXCEPT_KEY"] = "ALL",
# Source-specific options
topic: Optional[Union[str, List[str]]] = None,
topic_pattern: Optional[str] = None,
group_id: Optional[str] = None,
startup_mode: Literal[
"earliest-offset",
"latest-offset",
"group-offsets",
"timestamp",
"specific-offsets",
] = "group-offsets",
startup_specific_offsets: Optional[Union[str, Dict[int, int]]] = None,
startup_timestamp_millis: Optional[int] = None,
topic_partition_discovery_interval: str = "5 min",
header_filter: Optional[str] = None,
check_duplicated_group_id: bool = False,
bounded_mode: Literal[
"unbounded", "group-offsets", "latest-offset", "timestamp", "specific-offsets"
] = "unbounded",
bounded_timestamp_millis: Optional[int] = None,
bounded_specific_offsets: Optional[Union[str, Dict[int, int]]] = None,
) -> DataFrame:
"""
Read data from one or more Kafka topics into a DataFrame.
Uses Flink's Kafka SQL Connector under the hood. This creates a streaming
source that continuously reads from Kafka. For bounded (batch) reads, use
the ``bounded_mode`` parameter.
Args:
bootstrap_servers: Comma-separated list of Kafka broker addresses
(e.g., ``"localhost:9092"`` or ``"broker1:9092,broker2:9092"``).
schema: Dict of ``{column_name: DataType}`` specifying the schema of
the data in Kafka messages.
columns: Optional list of column names to read (projection pushdown).
If None, all columns from the schema are read.
properties: Extra Kafka consumer properties. Each key in this dict is
directly passed to Kafka.
Example: ``{"allow.auto.create.topics": "false"}``
Note:
- Property names should be valid Kafka consumer config keys.
- ``key.deserializer`` and ``value.deserializer`` cannot be
set, because Kafka connector overrides them.
format: The format used to deserialize the value part of
Kafka messages (equivalent to ``value_format``). Supported options:
``"csv"``, ``"json"``, ``"avro"``, ``"debezium-json"``,
``"canal-json"``, ``"maxwell-json"``, ``"avro-confluent"``,
``"raw"``.
format_options: Additional options for the value format used by
``format`` as a dict.
Keys are format-specific options without the format prefix
(e.g., ``{"map-null-key.mode": "FAIL"}`` for JSON format).
key_format: The format used to deserialize the key part
of Kafka messages.
If a key format is defined, the ``key_fields`` option is required
as well. Otherwise the Kafka records will have an empty key.
key_format_options: Additional options for the key format, as a dict.
Same structure as ``format_options``.
key_fields: List of column names that form the message key (e.g.
``["field1", "field2"]``).
key_fields_prefix: Prefix for key field column names to avoid name
clashes with value fields. For example, with prefix ``"k_"``,
a key field ``user_id`` becomes ``k_user_id`` in the schema.
This prefix is only for table-column disambiguation; it is removed
when parsing/serializing Kafka key fields.
When this option is set, ``value_fields_include`` must be set to
``"EXCEPT_KEY"``.
value_format: The format used to deserialize the value part of Kafka
messages.
Only one of ``format`` or ``value_format`` are required.
If both are configured, ``value_format`` should take precedence.
value_format_options: Extra options for ``value_format``.
value_fields_include: Whether key fields are included in value fields
during value parsing.
- ``"ALL"`` (default): all columns are treated as value fields.
- ``"EXCEPT_KEY"``: fields in ``key_fields`` are excluded from
value fields.
topic: Topic name(s) to read. It can be a single topic, a list of
topics, or a semicolon-separated topic string
(for example, ``"topic-1;topic-2"``).
Exactly one of ``topic`` and ``topic_pattern`` can be set.
topic_pattern: Regular expression for matching topic names.
All topics matching this pattern are consumed at runtime.
Exactly one of ``topic`` and ``topic_pattern`` can be set.
group_id: Consumer group id.
If a new group id is used for the first time, configure
``auto.offset.reset`` to ``earliest`` or ``latest`` in
``properties`` to control initial startup position.
startup_mode: Startup position mode for Kafka consumer. Values:
- ``"group-offsets"`` (default): Start from committed offsets in
the consumer group.
- ``"earliest-offset"``: Start from the earliest available offset.
- ``"latest-offset"``: Start from the latest offset.
- ``"timestamp"``: Start from a user-specified timestamp
(requires ``startup_timestamp_millis``).
- ``"specific-offsets"``: Start from specified offsets per
partition (requires ``startup_specific_offsets``).
Note: This option is effective for stateless startup. When
restarting from checkpoint/savepoint, restored state progress has
higher priority.
startup_specific_offsets: Per-partition offsets to start reading from.
Used when ``startup_mode="specific-offsets"``. Can be a string
like ``"partition:0,offset:42;partition:1,offset:300"`` or a dict
mapping partition IDs to offsets like ``{0: 42, 1: 300}``.
startup_timestamp_millis: Epoch timestamp in milliseconds to start
reading from. Used when ``startup_mode="timestamp"``.
topic_partition_discovery_interval: Interval for discovering new
partitions and topics dynamically (e.g., ``"30 s"``, ``"5 min"``).
Default is 5 minutes.
Note:
- Set this to a non-positive interval to disable discovery.
- In ``topic_pattern`` mode, this discovers both:
new partitions in existing matched topics and
newly created topics that match the pattern.
header_filter: Logical expression for filtering Kafka records by
headers.
Syntax:
- Use ``key:value`` for one header condition.
- Use ``&`` and ``|`` to combine conditions.
- Use ``!`` for negation.
Example: ``"depart:toy|depart:book&!env:test"`` keeps records
with ``depart=toy`` or ``depart=book``, and without ``env=test``.
Notes:
- Parentheses are not supported.
- Operators are evaluated from left to right.
- Header values are converted to UTF-8 strings before comparison.
check_duplicated_group_id: Whether to check if the consumer group ID
already exists on broker. Default is ``False``.
- ``True``: check duplicate group id before job starts. If found,
fail fast to avoid consumer-group conflicts.
- ``False``: start directly without conflict check.
bounded_mode: Bounded mode for Kafka consumer
(connector ``scan.bounded.mode``). One of:
- ``"unbounded"`` (default): Do not stop consuming.
- ``"latest-offset"``: bounded by latest offsets. This is evaluated
at the start of consumption from a given partition.
- ``"group-offsets"``: bounded by committed offsets in ZooKeeper /
Kafka brokers of a specific consumer group. This is evaluated at
the start of consumption from a given partition.
- ``"timestamp"``: bounded by a user-supplied timestamp
(requires ``bounded_timestamp_millis``).
- ``"specific-offsets"``: bounded by user-supplied specific offsets
for each partition (requires ``bounded_specific_offsets``).
bounded_timestamp_millis: Epoch timestamp in milliseconds for the
bounded end position. Used when ``bounded_mode="timestamp"``.
bounded_specific_offsets: Per-partition offsets for the bounded end
position. Used when ``bounded_mode="specific-offsets"``.
Same format as ``startup_specific_offsets``.
Returns:
A new DataFrame representing the Kafka source.
Example::
>>> import pyflink.dataframe as pf
>>>
>>> # Simple JSON consumption from a single topic
>>> df = pf.read_kafka(
... "localhost:9092",
... topic="user_events",
... schema={
... "user_id": pf.DataType.int64(),
... "event": pf.DataType.string(),
... "ts": pf.DataType.timestamp(3),
... },
... )
>>>
>>> # Read from earliest offset with explicit consumer group
>>> df = pf.read_kafka(
... "broker1:9092,broker2:9092",
... topic="orders",
... schema={
... "order_id": pf.DataType.int64(),
... "amount": pf.DataType.float64(),
... },
... group_id="my-consumer-group",
... startup_mode="earliest-offset",
... )
>>>
>>> # Read multiple topics
>>> df = pf.read_kafka(
... "localhost:9092",
... topic=["topic_a", "topic_b"],
... schema={
... "id": pf.DataType.int64(),
... "data": pf.DataType.string(),
... },
... )
>>>
>>> # Subscribe to topics by pattern
>>> df = pf.read_kafka(
... "localhost:9092",
... topic_pattern="events-.*",
... schema={
... "id": pf.DataType.int64(),
... "payload": pf.DataType.string(),
... },
... )
>>>
>>> # Bounded read (batch mode) — read up to the latest offset
>>> df = pf.read_kafka(
... "localhost:9092",
... topic="orders",
... schema={
... "order_id": pf.DataType.int64(),
... "amount": pf.DataType.float64(),
... },
... startup_mode="earliest-offset",
... bounded_mode="latest-offset",
... )
>>>
>>> # Read with key+value formats and Avro
>>> df = pf.read_kafka(
... "localhost:9092",
... topic="keyed_events",
... schema={
... "user_id": pf.DataType.int64(),
... "event": pf.DataType.string(),
... },
... format="avro",
... key_format="json",
... key_fields=["user_id"],
... )
>>>
>>> # Start from a specific timestamp
>>> df = pf.read_kafka(
... "localhost:9092",
... topic="events",
... schema={
... "id": pf.DataType.int64(),
... "data": pf.DataType.string(),
... },
... startup_mode="timestamp",
... startup_timestamp_millis=1700000000000,
... )
>>>
>>> # Start from specific offsets per partition
>>> df = pf.read_kafka(
... "localhost:9092",
... topic="events",
... schema={
... "id": pf.DataType.int64(),
... "data": pf.DataType.string(),
... },
... startup_mode="specific-offsets",
... startup_specific_offsets={0: 42, 1: 300},
... )
>>>
>>> # JSON format with custom parse options
>>> df = pf.read_kafka(
... "localhost:9092",
... topic="events",
... schema={
... "id": pf.DataType.int64(),
... "data": pf.DataType.string(),
... },
... format="json",
... format_options={"ignore-parse-errors": "true"},
... )
>>>
>>> # With Kafka security (SASL/SSL)
>>> # When using Aliyun Kafka, download the truststore file from the Kafka
>>> # console and upload it as an additional dependency file.
>>> df = pf.read_kafka(
... "localhost:9092",
... topic="secure_events",
... schema={
... "id": pf.DataType.int64(),
... "data": pf.DataType.string(),
... },
... properties={
... "ssl.truststore.location": "/flink/usrlib/only.4096.client.truststore.jks",
... "ssl.truststore.password": "KafkaOnsClient",
... "ssl.endpoint.identification.algorithm": "",
... "sasl.jaas.config": (
... "org.apache.flink.kafka.shaded.org.apache.kafka.common.security"
... ".plain.PlainLoginModule required "
... f'username="<username>" '
... f'password="<password>";'
... ),
... "sasl.mechanism": "PLAIN",
... "security.protocol": "SASL_SSL",
... },
... )
>>>
>>> # Read with column projection
>>> df = pf.read_kafka(
... "localhost:9092",
... topic="events",
... schema={
... "id": pf.DataType.int64(),
... "name": pf.DataType.string(),
... "value": pf.DataType.float64(),
... },
... columns=["id", "name"],
... )
>>>
>>> # CDC with Debezium format
>>> df = pf.read_kafka(
... "localhost:9092",
... topic="cdc_events",
... schema={
... "id": pf.DataType.int64(),
... "name": pf.DataType.string(),
... },
... format="debezium-json",
... )
"""
from pyflink.table import TableDescriptor, FormatDescriptor, Schema
from pyflink.table.expressions import col
from pyflink.dataframe.util.artifacts import (
add_built_in_connector,
add_built_in_format,
)
from pyflink.dataframe.context import get_or_create_table_environment
def _format_specific_offsets(
offsets: Union[str, Dict[int, int]],
) -> str:
if isinstance(offsets, str):
return offsets
return ";".join(f"partition:{p},offset:{o}" for p, o in offsets.items())
# Validate schema
if schema is None:
raise ValueError(
"schema is required for read_kafka. "
"Please provide a dict of {column_name: DataType}.\n"
"Example:\n"
" pf.read_kafka('localhost:9092', topic='my_topic', schema={\n"
" 'id': pf.DataType.int64(),\n"
" 'name': pf.DataType.string(),\n"
" })"
)
# Validate topic/topic_pattern
if topic is None and topic_pattern is None:
raise ValueError("Either 'topic' or 'topic_pattern' must be specified.")
if topic is not None and topic_pattern is not None:
raise ValueError(
"Only one of 'topic' or 'topic_pattern' can be specified, " "not both."
)
# Validate key/value option dependencies
if key_format is not None and key_fields is None:
raise ValueError("'key_fields' must be specified when 'key_format' is defined.")
if key_fields_prefix is not None and value_fields_include != "EXCEPT_KEY":
raise ValueError(
"'value_fields_include' must be 'EXCEPT_KEY' when "
"'key_fields_prefix' is set."
)
def _validate_keys_are_strings(d: Dict, name: str) -> None:
for k in d.keys():
if not isinstance(k, str):
raise ValueError(
f"Invalid key '{k}' in {name}. " "Option keys must be strings."
)
if format_options is not None:
_validate_keys_are_strings(format_options, "format_options")
if key_format_options is not None:
_validate_keys_are_strings(key_format_options, "key_format_options")
if value_format_options is not None:
_validate_keys_are_strings(value_format_options, "value_format_options")
# Validate startup and bounded mode dependencies
def _validate_scan_mode_options(
mode_name: str,
mode_value: str,
timestamp_millis: Optional[int],
specific_offsets: Optional[Union[str, Dict[int, int]]],
) -> None:
timestamp_option = f"{mode_name}_timestamp_millis"
specific_offsets_option = f"{mode_name}_specific_offsets"
if mode_value == "group-offsets" and group_id is None:
raise ValueError(
f"'group_id' must be specified when {mode_name}_mode is "
"'group-offsets'."
)
if mode_value == "timestamp" and timestamp_millis is None:
raise ValueError(
f"'{timestamp_option}' must be specified when "
f"{mode_name}_mode is 'timestamp'."
)
if mode_value != "timestamp" and timestamp_millis is not None:
raise ValueError(
f"'{timestamp_option}' must not be specified unless "
f"{mode_name}_mode is 'timestamp'."
)
if mode_value == "specific-offsets" and specific_offsets is None:
raise ValueError(
f"'{specific_offsets_option}' must be specified when "
f"{mode_name}_mode is 'specific-offsets'."
)
if mode_value != "specific-offsets" and specific_offsets is not None:
raise ValueError(
f"'{specific_offsets_option}' must not be specified unless "
f"{mode_name}_mode is 'specific-offsets'."
)
_validate_scan_mode_options(
mode_name="startup",
mode_value=startup_mode,
timestamp_millis=startup_timestamp_millis,
specific_offsets=startup_specific_offsets,
)
_validate_scan_mode_options(
mode_name="bounded",
mode_value=bounded_mode,
timestamp_millis=bounded_timestamp_millis,
specific_offsets=bounded_specific_offsets,
)
t_env = get_or_create_table_environment()
add_built_in_connector(t_env, "kafka")
add_built_in_format(t_env, format)
add_built_in_format(t_env, key_format)
add_built_in_format(t_env, value_format)
# Build default format descriptor
format_builder = FormatDescriptor.for_format(format)
if format_options:
for key, value in format_options.items():
format_builder = format_builder.option(key, _to_str(value))
# Build connector
builder = (
TableDescriptor.for_connector("kafka")
.option("properties.bootstrap.servers", bootstrap_servers)
.format(format_builder.build())
)
# Schema
schema_builder = Schema.new_builder()
for name, dtype in schema.items():
schema_builder.column(name, dtype._table_type)
builder = builder.schema(schema_builder.build())
# Additional Kafka properties
if properties:
for key, value in properties.items():
builder = builder.option(f"properties.{key}", _to_str(value))
# Key format
if key_format is not None:
from pyflink.common.config_options import ConfigOptions
key_format_opt = (
ConfigOptions.key("key.format").string_type().no_default_value()
)
key_format_builder = FormatDescriptor.for_format(key_format)
if key_format_options:
for key, value in key_format_options.items():
key_format_builder = key_format_builder.option(key, _to_str(value))
builder = builder.format(
key_format_builder.build(),
format_option=key_format_opt,
)
if key_fields is not None:
builder = builder.option("key.fields", ";".join(key_fields))
if key_fields_prefix is not None:
builder = builder.option("key.fields-prefix", key_fields_prefix)
# Value format
if value_format is not None:
from pyflink.common.config_options import ConfigOptions
value_format_opt = (
ConfigOptions.key("value.format").string_type().no_default_value()
)
value_format_builder = FormatDescriptor.for_format(value_format)
if value_format_options:
for key, value in value_format_options.items():
value_format_builder = value_format_builder.option(key, _to_str(value))
builder = builder.format(
value_format_builder.build(),
format_option=value_format_opt,
)
if value_fields_include is not None:
builder = builder.option("value.fields-include", value_fields_include)
# Source-specific options
if topic is not None:
if isinstance(topic, (list, tuple)):
topic_str = ";".join(topic)
else:
topic_str = topic
builder = builder.option("topic", topic_str)
else:
builder = builder.option("topic-pattern", topic_pattern)
if group_id is not None:
builder = builder.option("properties.group.id", group_id)
if startup_mode is not None:
builder = builder.option("scan.startup.mode", startup_mode)
if startup_specific_offsets is not None:
builder = builder.option(
"scan.startup.specific-offsets",
_format_specific_offsets(startup_specific_offsets),
)
if startup_timestamp_millis is not None:
builder = builder.option(
"scan.startup.timestamp-millis",
str(startup_timestamp_millis),
)
if topic_partition_discovery_interval is not None:
builder = builder.option(
"scan.topic-partition-discovery.interval",
topic_partition_discovery_interval,
)
if header_filter is not None:
builder = builder.option("scan.header-filter", header_filter)
builder = builder.option(
"scan.check.duplicated.group.id", str(check_duplicated_group_id).lower()
)
# Bounded mode
if bounded_mode is not None:
builder = builder.option("scan.bounded.mode", bounded_mode)
if bounded_timestamp_millis is not None:
builder = builder.option(
"scan.bounded.timestamp-millis",
str(bounded_timestamp_millis),
)
if bounded_specific_offsets is not None:
builder = builder.option(
"scan.bounded.specific-offsets",
_format_specific_offsets(bounded_specific_offsets),
)
# Kafka's KafkaDynamicTableFactory rejects anonymous sources (those created
# via ``t_env.from_descriptor`` without a registered name), so we register
# the descriptor as a named temporary table and resolve it back.
_kafka_table_name = f"_pf_df_kafka_{uuid.uuid4().hex[:8]}"
t_env.create_temporary_table(_kafka_table_name, builder.build())
table = t_env.from_path(_kafka_table_name)
# Column projection
if columns is not None:
table = table.select(*[col(c) for c in columns])
return DataFrame(table)
[docs]
def read_paimon(
path: Optional[str] = None,
*,
schema: Optional[Dict[str, "DataType"]] = None,
primary_key: Optional[Union[str, List[str]]] = None,
columns: Optional[List[str]] = None,
auto_create: bool = False,
file_format: Literal["orc", "parquet", "avro", "lance"] = "parquet",
bucket: int = 1,
bucket_key: Optional[Union[str, List[str]]] = None,
changelog_producer: Literal["none", "input", "full-compaction", "lookup"] = "none",
full_compaction_delta_commits: Optional[int] = None,
lookup_cache_max_memory_size: str = "256 MB",
merge_engine: Literal[
"deduplicate", "partial-update", "aggregation"
] = "deduplicate",
partial_update_ignore_delete: bool = False,
ignore_delete: bool = False,
partition_default_name: str = "__DEFAULT_PARTITION__",
partition_expiration_check_interval: str = "1h",
partition_expiration_time: Optional[str] = None,
partition_timestamp_formatter: Optional[str] = None,
partition_timestamp_pattern: Optional[str] = None,
bounded_watermark: Optional[int] = None,
mode: Literal[
"default",
"latest-full",
"latest",
"compacted-full",
"from-timestamp",
"from-snapshot",
] = "default",
snapshot_id: Optional[int] = None,
timestamp_millis: Optional[int] = None,
snapshot_num_retained_max: int = 2147483647,
snapshot_num_retained_min: int = 10,
snapshot_time_retained: str = "1h",
infer_parallelism: bool = True,
parallelism: Optional[int] = None,
extra_options: Optional[Dict[str, Any]] = None,
) -> DataFrame:
"""
Read a Paimon table into a DataFrame.
Uses the Paimon SQL connector under the hood. The function creates a
temporary connector descriptor, so ``schema`` is required even when the
Paimon files already exist at ``path``.
Args:
path: File-system path of the Paimon table.
schema: Dict of ``{column_name: DataType}`` specifying the table
schema. This parameter is required.
primary_key: Optional primary key column name or list of column
names.
columns: Optional list of column names to read. If None, all columns
from the schema are read.
auto_create: Whether to create Paimon table files while constructing
the temporary table if the target path has no existing Paimon
table. Default is False.
file_format: Format of Paimon data files. One of ``"orc"``,
``"parquet"``, ``"avro"``, or ``"lance"``. Default is
``"parquet"``.
bucket: Bucket number for the file store.
bucket_key: Column name or list of column names defining the Paimon
distribution policy. Data is assigned to each bucket according to
the hash value of these fields. When unset, Paimon uses the
primary key, or the full row if no primary key exists.
changelog_producer: Whether and how to double-write to changelog
files. Changelog files keep data-change details and can be read
directly during stream reads. ``"none"`` produces no changelog
file, ``"input"`` writes input changes when flushing the memory
table, ``"full-compaction"`` generates changelog files during full
compaction, and ``"lookup"`` produces changelog by lookup before
committing snapshots.
full_compaction_delta_commits: Maximum number of committed snapshots
between two full compactions.
lookup_cache_max_memory_size: Memory size used by lookup cache and the
lookup changelog producer cache. Default is ``"256 MB"``.
merge_engine: Merge engine for tables with primary keys.
``"deduplicate"`` de-duplicates and keeps the last row,
``"partial-update"`` updates non-null fields, and
``"aggregation"`` aggregates rows with the same primary key.
partial_update_ignore_delete: Whether to ignore delete records in
partial-update mode.
ignore_delete: Whether to ignore delete records.
partition_default_name: Default partition name used when a dynamic
partition column value is null or an empty string.
partition_expiration_check_interval: Interval for checking expired
partitions.
partition_expiration_time: Partition retention time. When unset,
partitions never expire.
partition_timestamp_formatter: Formatter for converting partition
time strings to timestamps.
partition_timestamp_pattern: Pattern for extracting a time string from
partition values.
bounded_watermark: Stop producing source records when source watermark
exceeds this value.
mode: Source scanning behavior. ``"default"`` lets Paimon infer the
actual startup mode: ``timestamp_millis`` selects
``"from-timestamp"``, ``snapshot_id`` selects
``"from-snapshot"``, and otherwise it behaves as
``"latest-full"``. ``"latest-full"`` produces the latest snapshot
first and then reads changes in streaming mode. ``"latest"`` reads
latest changes without an initial snapshot in streaming mode.
``"compacted-full"`` starts from the latest compacted snapshot.
``"from-timestamp"`` and ``"from-snapshot"`` start from the
configured timestamp or snapshot. The deprecated ``"full"`` mode
is intentionally not accepted.
snapshot_id: Optional snapshot id used by ``mode="from-snapshot"``.
timestamp_millis: Optional timestamp in milliseconds used by
``mode="from-timestamp"``.
snapshot_num_retained_max: Maximum number of completed snapshots to
retain.
snapshot_num_retained_min: Minimum number of completed snapshots to
retain.
snapshot_time_retained: Maximum time to retain completed snapshots.
infer_parallelism: Whether to infer scan source parallelism from the
bucket count.
parallelism: Custom scan source parallelism. If unset, the planner
derives statement parallelism while considering global
configuration.
extra_options: Additional connector options forwarded
through to the underlying Paimon connector. This is for options
not exposed as named parameters of ``read_paimon``. If a key
matches an option generated by a named parameter of `read_paimon`,
the value in ``extra_options`` takes precedence. ``"connector"`` is
reserved and must not be supplied.
Returns:
A new DataFrame representing the Paimon source.
Example::
>>> import pyflink.dataframe as pf
>>>
>>> order_schema = {
... "order_id": pf.DataType.int64(),
... "user_id": pf.DataType.int64(),
... "status": pf.DataType.string(),
... "event_ts": pf.DataType.timestamp(3),
... }
>>>
>>> # Read the latest snapshot with a primary key and projection
>>> orders = pf.read_paimon(
... "oss://bucket/warehouse/db.db/orders",
... schema=order_schema,
... primary_key="order_id",
... columns=["order_id", "status", "event_ts"],
... )
>>>
>>> # Stream changes from a specific snapshot
>>> changes = pf.read_paimon(
... "oss://bucket/warehouse/db.db/orders",
... schema=order_schema,
... primary_key="order_id",
... changelog_producer="lookup",
... mode="from-snapshot",
... snapshot_id=42,
... )
>>>
>>> # Stream from a timestamp in milliseconds with explicit parallelism
>>> changes = pf.read_paimon(
... "oss://bucket/warehouse/db.db/orders",
... schema=order_schema,
... primary_key="order_id",
... mode="from-timestamp",
... timestamp_millis=1710000000000,
... bounded_watermark=1710003600000,
... infer_parallelism=False,
... parallelism=4,
... )
>>>
>>> # Read a table with bucket distribution
>>> events = pf.read_paimon(
... "oss://bucket/warehouse/db.db/events",
... schema={
... "dt": pf.DataType.string(),
... "user_id": pf.DataType.int64(),
... "payload": pf.DataType.string(),
... },
... bucket=16,
... bucket_key=["dt", "user_id"],
... extra_options={
... "snapshot.time-retained": "12h",
... },
... )
"""
def _validate_paimon_scan_options(
*,
mode: str,
snapshot_id: Optional[int],
timestamp_millis: Optional[int],
) -> None:
if snapshot_id is not None and timestamp_millis is not None:
raise ValueError(
"'snapshot_id' and 'timestamp_millis' must not both be set."
)
if mode == "from-snapshot" and snapshot_id is None:
raise ValueError(
"'snapshot_id' must be specified when mode is 'from-snapshot'."
)
if mode == "from-timestamp" and timestamp_millis is None:
raise ValueError(
"'timestamp_millis' must be specified when mode is " "'from-timestamp'."
)
if snapshot_id is not None and mode not in ("default", "from-snapshot"):
raise ValueError(
"'snapshot_id' must not be specified unless mode is "
"'default' or 'from-snapshot'."
)
if timestamp_millis is not None and mode not in ("default", "from-timestamp"):
raise ValueError(
"'timestamp_millis' must not be specified unless mode is "
"'default' or 'from-timestamp'."
)
from pyflink.table import TableDescriptor, Schema
from pyflink.table.expressions import col
from pyflink.dataframe.context import get_or_create_table_environment
from pyflink.dataframe.util.artifacts import add_built_in_connector
if schema is None:
raise ValueError(
"schema is required for read_paimon. "
"Please provide a dict of {column_name: DataType}.\n"
"Example:\n"
" pf.read_paimon('/path/to/paimon-table', schema={\n"
" 'id': pf.DataType.int64(),\n"
" 'name': pf.DataType.string(),\n"
" })"
)
_validate_paimon_common_options(
file_format=file_format,
changelog_producer=changelog_producer,
merge_engine=merge_engine,
)
_validate_allowed_value(mode, "mode", _PAIMON_SCAN_MODES)
_validate_paimon_scan_options(
mode=mode,
snapshot_id=snapshot_id,
timestamp_millis=timestamp_millis,
)
primary_key_columns = _normalize_primary_key(
primary_key,
schema,
)
t_env = get_or_create_table_environment()
add_built_in_connector(t_env, "paimon")
schema_builder = Schema.new_builder()
primary_key_set = set(primary_key_columns)
for name, dtype in schema.items():
table_type = dtype._table_type
if name in primary_key_set:
table_type = table_type.not_null()
schema_builder.column(name, table_type)
if primary_key_columns:
schema_builder.primary_key(*primary_key_columns)
paimon_options = _paimon_common_options(
path=path,
auto_create=auto_create,
file_format=file_format,
bucket=bucket,
bucket_key=bucket_key,
changelog_producer=changelog_producer,
full_compaction_delta_commits=full_compaction_delta_commits,
lookup_cache_max_memory_size=lookup_cache_max_memory_size,
merge_engine=merge_engine,
partial_update_ignore_delete=partial_update_ignore_delete,
ignore_delete=ignore_delete,
partition_default_name=partition_default_name,
partition_expiration_check_interval=partition_expiration_check_interval,
partition_expiration_time=partition_expiration_time,
partition_timestamp_formatter=partition_timestamp_formatter,
partition_timestamp_pattern=partition_timestamp_pattern,
snapshot_num_retained_max=snapshot_num_retained_max,
snapshot_num_retained_min=snapshot_num_retained_min,
snapshot_time_retained=snapshot_time_retained,
)
paimon_options.extend(
[
("scan.bounded.watermark", bounded_watermark),
("scan.mode", mode),
("scan.snapshot-id", snapshot_id),
("scan.timestamp-millis", timestamp_millis),
("scan.infer-parallelism", infer_parallelism),
("scan.parallelism", parallelism),
]
)
builder = TableDescriptor.for_connector("paimon").schema(schema_builder.build())
builder = _add_connector_options(builder, paimon_options)
builder = _add_extra_options(builder, extra_options)
table = t_env.from_descriptor(builder.build())
if columns is not None:
table = table.select(*[col(c) for c in columns])
return DataFrame(table)
[docs]
def read_odps(
endpoint: str,
*,
schema: Optional[Dict[str, "DataType"]] = None,
columns: Optional[List[str]] = None,
# General options
tunnel_endpoint: Optional[str] = None,
project: str,
schema_name: Optional[str] = None,
table_name: str,
access_id: str,
access_key: str,
partition: Optional[Union[str, Tuple[str, Any], List[Tuple[str, Any]]]] = None,
compress_algorithm: Literal["RAW", "ZLIB", "SNAPPY"] = "SNAPPY",
quota_name: Optional[str] = None,
# Source table options
max_partition_count: int = 100,
use_arrow: bool = False,
split_size: str = "256mb",
compress_codec: Literal["", "ZSTD", "LZ4_FRAME"] = "",
dynamic_load_balance: bool = False,
# Incremental source options
start_partition: Optional[
Union[str, Tuple[str, Any], List[Tuple[str, Any]]]
] = None,
subscribe_interval_in_sec: int = 30,
modified_table_operation: Literal["NONE", "SKIP"] = "NONE",
# Dimension table options
cache: Literal["ALL"] = "ALL",
cache_size: int = 100000,
cache_ttl_ms: int = _ODPS_LONG_MAX_VALUE,
cache_reload_time_blacklist: Optional[str] = None,
max_load_retries: int = 10,
) -> DataFrame:
"""
Read a MaxCompute (ODPS) table into a DataFrame.
Uses the ODPS SQL connector under the hood. The connector options are
exposed with Pythonic snake_case names while preserving the order of the
ODPS documentation.
Args:
endpoint: MaxCompute endpoint.
schema: Dict of ``{column_name: DataType}`` specifying the DataFrame
schema. This parameter is required.
columns: Optional list of column names to read. If None, all columns
from ``schema`` are read.
tunnel_endpoint: MaxCompute Tunnel endpoint. If unset, MaxCompute
allocates tunnel connections through Server Load Balancer (SLB).
project: MaxCompute project name.
schema_name: Required only when the MaxCompute schema feature is
enabled. Set this to the table's schema name.
table_name: MaxCompute table name.
access_id: AccessKey ID used to access MaxCompute.
access_key: AccessKey secret used to access MaxCompute.
partition: Partition name in the MaxCompute table. Not required for
non-partitioned tables or incremental sources.
Examples:
- To read partition column ``dt`` with value ``20220901``, pass
``("dt", "20220901")``.
- To read source partitions whose ``dt`` values start with
``202209``, pass ``("dt", "202209*")``.
- To read all source partitions for column ``dt``, pass
``("dt", "*")``.
- For a three-level partition table ``dt``/``hh``/``mm``, to
read ``dt=20220901,hh=08`` and any ``mm`` value, pass
``[("dt", "20220901"), ("hh", "08")]`` or
``[("dt", "20220901"), ("hh", "08"), ("mm", "*")]``.
compress_algorithm: Compression algorithm for MaxCompute Tunnel.
Valid values are ``"RAW"`` (no compression), ``"ZLIB"``, and
``"SNAPPY"``. ``"SNAPPY"`` improves throughput by approximately
50% compared to ``"ZLIB"`` in test scenarios.
quota_name: Quota name for exclusive MaxCompute Tunnel resource
groups. If specified, remove ``tunnel_endpoint``; otherwise the
tunnel specified by ``tunnel_endpoint`` takes precedence.
max_partition_count: Maximum number of partitions to read from.
Reading from too many partitions can overload MaxCompute and slow
job startup. Increase this only when your workload requires it.
use_arrow: Read data using the Arrow format, which calls the
MaxCompute storage API. Batch deployments only.
split_size: Amount of data pulled per split when using the Arrow
format. Batch deployments only.
compress_codec: Compression algorithm when reading with the Arrow
format. Valid values are ``""`` (none), ``"ZSTD"``, and
``"LZ4_FRAME"``. Specifying a codec improves throughput over no
compression. Batch deployments only.
dynamic_load_balance: Enable dynamic shard allocation to improve
processing performance and reduce overall read time. This may
cause data skew because different operators read inconsistent
amounts of data. Batch deployments only.
start_partition: Start partition for incremental reads. Setting this
enables incremental source mode. When specified, ``partition`` is
ignored. For multi-level partitioned tables, configure partition
column values in descending order by level. For example, for a
three-level partitioned table with partition key columns
``dt``, ``hh``, and ``mm``, to start from ``dt=20220901``,
``hh=08``, and ``mm=10``, set this parameter to
``"dt=20220901,hh=08,mm=10"`` or
``[("dt", "20220901"), ("hh", "08"), ("mm", "10")]``.
For a single partition key, you can also use
``("dt", "20220901")``.
subscribe_interval_in_sec: Polling interval in seconds.
modified_table_operation: Action when a partition is modified during
reading. ``"NONE"`` requires updating ``start_partition`` to skip
the unavailable partition and restarting without state;
``"SKIP"`` automatically skips the unavailable partition when
resuming. In either mode, data already read from the modified
partition is retained and unread data is discarded.
cache: Cache policy. Must be set to ``"ALL"``. All dimension table
data is loaded into cache before the deployment runs, lookups
search the cache only, and the cache reloads after entries expire.
cache_size: Maximum rows to cache. Large caches consume significant
JVM heap memory and slow startup and cache refresh. Increase this
only when your workload requires it.
cache_ttl_ms: Cache timeout in milliseconds.
cache_reload_time_blacklist: Time periods during which the cache is
not refreshed. Use this during peak traffic periods to prevent
deployment instability from cache refreshes.
max_load_retries: Maximum retries for the initial cache load on
deployment startup. If retries are exhausted, the deployment
fails.
Returns:
A new DataFrame representing the ODPS source.
Example::
>>> import pyflink.dataframe as pf
>>> df = pf.read_odps(
... "http://service.cn-hangzhou.maxcompute.aliyun.com/api",
... schema={
... "id": pf.DataType.int64(),
... "payload": pf.DataType.string(),
... },
... project="my_project",
... table_name="events",
... access_id="${secret_values.ak_id}",
... access_key="${secret_values.ak_secret}",
... partition="ds=20260428",
... )
"""
from pyflink.table import TableDescriptor, Schema
from pyflink.table.expressions import col
from pyflink.dataframe.util.artifacts import add_built_in_connector
from pyflink.dataframe.context import get_or_create_table_environment
if schema is None:
raise ValueError(
"schema is required for read_odps. "
"Please provide a dict of {column_name: DataType}."
)
_validate_allowed_value(
compress_algorithm,
"compress_algorithm",
_ODPS_COMPRESS_ALGORITHMS,
)
_validate_allowed_value(
compress_codec,
"compress_codec",
_ODPS_COMPRESS_CODECS,
)
_validate_allowed_value(
modified_table_operation,
"modified_table_operation",
_ODPS_MODIFIED_TABLE_OPERATIONS,
)
_validate_allowed_value(cache, "cache", _ODPS_CACHE_VALUES)
partition = _format_odps_partition(partition)
start_partition = _format_odps_partition(
start_partition,
name="start_partition",
)
if partition is not None and start_partition is not None:
raise ValueError(
"'partition' and 'start_partition' cannot be specified for "
"ODPS source at the same time."
)
_validate_odps_common_options(
endpoint=endpoint,
project=project,
table_name=table_name,
access_id=access_id,
access_key=access_key,
tunnel_endpoint=tunnel_endpoint,
quota_name=quota_name,
)
options = [
("endpoint", endpoint),
("tunnelEndpoint", tunnel_endpoint),
("project", project),
("schemaName", schema_name),
("tableName", table_name),
("accessId", access_id),
("accessKey", access_key),
("partition", partition),
("compressAlgorithm", compress_algorithm),
("quotaName", quota_name),
("maxPartitionCount", max_partition_count),
("useArrow", use_arrow),
("splitSize", split_size),
("compressCodec", compress_codec or None),
("dynamicLoadBalance", dynamic_load_balance),
("startPartition", start_partition),
("subscribeIntervalInSec", subscribe_interval_in_sec),
("modifiedTableOperation", modified_table_operation),
("cache", cache),
("cacheSize", cache_size),
("cacheTTLMs", cache_ttl_ms),
("cacheReloadTimeBlackList", cache_reload_time_blacklist),
("maxLoadRetries", max_load_retries),
]
t_env = get_or_create_table_environment()
add_built_in_connector(t_env, "odps")
builder = TableDescriptor.for_connector("odps")
schema_builder = Schema.new_builder()
for name, dtype in schema.items():
schema_builder.column(name, dtype._table_type)
builder = builder.schema(schema_builder.build())
builder = _add_connector_options(builder, options)
table = t_env.from_descriptor(builder.build())
if columns is not None:
table = table.select(*[col(c) for c in columns])
return DataFrame(table)
[docs]
def read_sls(
endpoint: str,
*,
project: str,
logstore: str,
access_id: Optional[str] = None,
access_key: Optional[str] = None,
schema: Optional[Dict[str, "DataType"]] = None,
columns: Optional[List[str]] = None,
enable_new_source: bool = True,
shard_discovery_interval_ms: int = 60000,
startup_mode: Literal[
"timestamp", "latest", "earliest", "consumer_group"
] = "timestamp",
start_time: Optional[Union[str, datetime]] = None,
stop_time: Optional[Union[str, datetime]] = None,
time_zone: Optional[str] = None,
consumer_group: Optional[str] = None,
max_retries: int = 3,
batch_get_size: int = 100,
exit_after_finish: bool = False,
processor: Optional[str] = None,
extra_options: Optional[Dict[str, str]] = None,
) -> DataFrame:
"""Read an SLS (Aliyun Log Service) logstore into a DataFrame.
Wraps the Ververica SLS source connector. The connector options are
exposed with Pythonic snake_case names, in the same order as the
SLS documentation's "WITH parameters" table.
Args:
endpoint: SLS private-network service endpoint address.
project: Name of the SLS project.
logstore: SLS LogStore (or MetricStore) name.
access_id: Alibaba Cloud account AccessKey ID. If both
``access_id`` and ``access_key`` are ``None`` (the default), the
SLS connector falls back to the VVP STS token.
access_key: Alibaba Cloud account AccessKey Secret. See
``access_id`` for the STS fallback behaviour.
schema: Dict of ``{column_name: DataType}`` specifying the source's
schema. Required.
columns: Optional list of column names to read. If ``None``, all
columns from ``schema`` are read.
enable_new_source: When ``True`` (the default), use the FLIP-27 SLS
source that auto-adapts to shard changes. When ``False``, use
the legacy source implementation.
shard_discovery_interval_ms: Polling interval (milliseconds) for
shard-change discovery, applied when ``enable_new_source`` is
``True``. A non-positive value disables shard discovery.
startup_mode: Where the source begins consuming.
- ``"timestamp"`` (default): start from ``start_time``; if
unset, the SLS connector uses the current time.
- ``"latest"``: skip to the most recent log group.
- ``"earliest"``: start from the oldest available log group.
- ``"consumer_group"``: resume from the offset recorded for
``consumer_group`` (which is required in this mode).
start_time: Consumption start time, only valid when
``startup_mode='timestamp'``. Accepts a naive ``datetime``
(formatted as ``"yyyy-MM-dd HH:mm:ss"``) or a string in that
format. ``datetime`` with ``tzinfo`` is rejected; use the
``time_zone`` parameter to specify the timezone explicitly.
stop_time: Consumption end time. Use for historical replay only.
Accepts the same forms as ``start_time``.
time_zone: Timezone in which ``start_time`` / ``stop_time`` strings
are interpreted by the SLS connector (e.g. ``"Asia/Shanghai"``,
``"UTC"``). When unset, the SLS connector falls back to the
JobManager JVM's default timezone.
consumer_group: Consumer-group name used to record consumption
progress. Required when ``startup_mode='consumer_group'``. In
other startup modes, ``consumer_group`` is still forwarded to
the connector and used only for commit tracking — it does NOT
change where consumption starts.
max_retries: Number of retries after a failed SLS read.
batch_get_size: Number of LogGroups fetched per request. Must be
in [1, 1000]; exceeding 1000 errors out at the Java side.
exit_after_finish: When ``True``, the Flink job exits after the
source finishes consuming.
processor: SLS consumer processor (SPL) used to pre-filter data
before ingest.
extra_options: Additional connector options forwarded
through to the underlying SLS connector. This is for options
not exposed as named parameters of ``read_sls``, e.g.,
``regionId``, ``signVersion``, ``shardModDivisor``,
``shardModRemainder``. If a key matches an option
generated by a named parameter of `read_sls`, the value in
``extra_options`` takes precedence. ``"connector"`` is
reserved and must not be supplied.
Returns:
A DataFrame backed by the SLS source.
Raises:
ValueError: If ``schema`` is None, ``startup_mode`` is invalid,
``batch_get_size`` is out of range, ``start_time`` is set with
a non-timestamp ``startup_mode``, ``consumer_group`` is missing
when ``startup_mode='consumer_group'``, ``start_time`` /
``stop_time`` is an aware ``datetime`` (with ``tzinfo``), or
``extra_options`` contains a ``connector`` key.
Examples:
Minimal source (timestamp mode, defaults)::
import pyflink.dataframe as pf
df = pf.read_sls(
"cn-hangzhou-intranet.log.aliyuncs.com",
project="my-project",
logstore="my-logstore",
access_id=ALIYUN_AK_ID,
access_key=ALIYUN_AK_SECRET,
schema={
"__timestamp__": pf.DataType.bigint(),
"level": pf.DataType.string(),
"message": pf.DataType.string(),
}
)
Consumer-group mode (resume from recorded offset)::
df = pf.read_sls(
"cn-hangzhou-intranet.log.aliyuncs.com",
project="my-project",
logstore="my-logstore",
access_id=ALIYUN_AK_ID,
access_key=ALIYUN_AK_SECRET,
schema={"level": pf.DataType.string()},
startup_mode="consumer_group",
consumer_group="my-analytics-job"
)
Historical replay with ``datetime`` bounds::
from datetime import datetime
df = pf.read_sls(
"cn-hangzhou-intranet.log.aliyuncs.com",
project="my-project",
logstore="my-logstore",
access_id=ALIYUN_AK_ID,
access_key=ALIYUN_AK_SECRET,
schema={"message": pf.DataType.string()},
start_time=datetime(2026, 1, 15, 0, 0, 0),
stop_time=datetime(2026, 1, 16, 0, 0, 0),
time_zone="Asia/Shanghai",
exit_after_finish=True
)
Forward-compat: passing a non-named option (and overriding a named
one) via ``extra_options``::
df = pf.read_sls(
"cn-hangzhou-intranet.log.aliyuncs.com",
project="my-project",
logstore="my-logstore",
access_id=ALIYUN_AK_ID,
access_key=ALIYUN_AK_SECRET,
schema={"message": pf.DataType.string()},
extra_options={
"regionId": "cn-hangzhou",
"compressType": "lz4",
"batchGetSize": "256", # overrides default 100
}
)
"""
from pyflink.dataframe.context import get_or_create_table_environment
from pyflink.dataframe.util.artifacts import add_built_in_connector
from pyflink.table import TableDescriptor, Schema
from pyflink.table.expressions import col
_validate_allowed_value(startup_mode, "startup_mode", _SLS_STARTUP_MODES)
if not (1 <= batch_get_size <= 1000):
raise ValueError(f"batch_get_size must be in [1, 1000], got: {batch_get_size}")
if start_time is not None and startup_mode != "timestamp":
raise ValueError("start_time is only valid when startup_mode='timestamp'")
if startup_mode == "consumer_group" and not consumer_group:
raise ValueError(
"consumer_group is required when startup_mode='consumer_group'"
)
if schema is None:
raise ValueError("'schema' must be specified for SLS source.")
options = [
("endpoint", endpoint),
("project", project),
("logstore", logstore),
("enableNewSource", str(enable_new_source)),
("shardDiscoveryIntervalMs", str(shard_discovery_interval_ms)),
("startupMode", startup_mode),
("maxRetries", str(max_retries)),
("batchGetSize", str(batch_get_size)),
("exitAfterFinish", str(exit_after_finish)),
]
if access_id is not None:
options.append(("accessId", access_id))
if access_key is not None:
options.append(("accessKey", access_key))
if start_time is not None:
options.append(("startTime", _format_time(start_time)))
if stop_time is not None:
options.append(("stopTime", _format_time(stop_time)))
if time_zone is not None:
options.append(("timeZone", time_zone))
if consumer_group is not None:
options.append(("consumerGroup", consumer_group))
if processor is not None:
options.append(("processor", processor))
t_env = get_or_create_table_environment()
add_built_in_connector(t_env, "sls")
builder = TableDescriptor.for_connector("sls")
schema_builder = Schema.new_builder()
for name, dtype in schema.items():
schema_builder.column(name, dtype._table_type)
builder = builder.schema(schema_builder.build())
builder = _add_connector_options(builder, options)
builder = _add_extra_options(builder, extra_options)
table = t_env.from_descriptor(builder.build())
if columns is not None:
table = table.select(*[col(c) for c in columns])
return DataFrame(table)
[docs]
def read_hologres(
endpoint: str,
*,
db_name: str,
table_name: str,
username: str,
password: str,
schema: Optional[Dict[str, "DataType"]] = None,
primary_key: Optional[Union[str, List[str]]] = None,
columns: Optional[List[str]] = None,
connection_pool_size: int = 5,
connection_pool_name: str = "default",
connection_fixed: Optional[bool] = None,
connection_max_idle_ms: int = 60000,
connection_ssl_mode: Literal[
"disable", "require", "verify-ca", "verify-full"
] = "disable",
connection_ssl_root_cert_location: Optional[str] = None,
retry_count: int = 10,
retry_sleep_step_ms: int = 5000,
meta_cache_ttl_ms: int = 600000,
serverless_computing: bool = False,
binlog: bool = True,
binlog_read_mode: Literal["AUTO", "JDBC", "HOLOHUB"] = "AUTO",
binlog_change_log_mode: Literal[
"ALL", "UPSERT", "ALL_AS_APPEND_ONLY"
] = "ALL",
binlog_startup_mode: Literal[
"INITIAL", "EARLIEST_OFFSET", "TIMESTAMP", "LATEST_OFFSET"
] = "INITIAL",
binlog_batch_size: int = 512,
binlog_request_timeout_ms: int = 300000,
binlog_project_columns: Optional[bool] = None,
binlog_compression: Optional[bool] = None,
partition_binlog_mode: Literal[
"DISABLE", "STATIC", "DYNAMIC"
] = "DISABLE",
partition_binlog_lateness_timeout_minutes: int = 60,
partition_values_to_read: Optional[Union[str, List[str]]] = None,
start_time: Optional[Union[str, datetime]] = None,
fetch_size: int = 512,
scan_timeout_seconds: int = 600,
filter_push_down: bool = False,
binlog_filter_push_down: bool = False,
prefer_physical_column_over_metadata_column: bool = False,
lookup_batch_size: int = 256,
lookup_timeout_ms: int = 0,
lookup_column_table: bool = False,
lookup_insert_if_not_exists: bool = False,
lookup_async: bool = True,
lookup_filter_push_down: bool = False,
cache: Literal["None", "LRU"] = "None",
cache_size: int = 100000,
cache_ttl_ms: Optional[int] = None,
cache_empty: bool = True,
async_: bool = False,
extra_options: Optional[Dict[str, Any]] = None,
) -> DataFrame:
"""Read a Hologres table into a DataFrame.
Wraps the Ververica Hologres source connector. Hologres source supports
both bulk scans and binlog streaming; the consumption mode is controlled by
``binlog`` and the binlog-related parameters. The connector options are
exposed with Pythonic snake_case names, in the order the Hologres
documentation's "WITH parameters" table presents them.
Args:
endpoint: Hologres service endpoint (use the VPC endpoint for VVR).
db_name: Hologres database name. Optionally suffixed with a compute
group (e.g. ``"my_db/my_group"``).
table_name: Hologres table name. Use ``"schema.tableName"`` to read
from a non-public schema.
username: Hologres account username (or AccessKey ID).
password: Hologres account password (or AccessKey Secret).
schema: Dict of ``{column_name: DataType}`` specifying the source
schema. Required.
primary_key: Optional primary key column name or list of column
names. Set this when the upstream Hologres table has a primary
key — the streaming change-log behavior depends on it.
columns: Optional list of column names to read. If ``None``, all
columns from ``schema`` are read.
connection_pool_size: JDBC connection pool size. Applies to both
sink and dim-table workloads.
connection_pool_name: Connection pool name; tables sharing the same
name within a Flink TaskManager share connections.
connection_fixed: When ``True``, enable Hologres lightweight
(fixed) connection mode. When ``None`` (the default), the
connector picks a value based on the Hologres engine version.
connection_max_idle_ms: Idle time (milliseconds) before a JDBC
connection in the pool is released.
connection_ssl_mode: SSL transport mode. One of ``"disable"``,
``"require"``, ``"verify-ca"``, or ``"verify-full"``. The
verify-* modes require ``connection_ssl_root_cert_location``.
connection_ssl_root_cert_location: Path to the CA certificate file.
Files must be uploaded to VVP under ``/flink/usrlib/<name>``.
retry_count: Number of retries on connection failures.
retry_sleep_step_ms: Linear back-off step (milliseconds) added per
retry attempt.
meta_cache_ttl_ms: TTL (milliseconds) of the local Hologres
``TableSchema`` cache.
serverless_computing: When ``True``, run scans on Hologres
serverless compute (only available for batch reads).
binlog: When ``True`` (the default), consume binlog changes; when
``False``, run a bulk scan only.
binlog_read_mode: Binlog read transport. ``"AUTO"`` lets the
connector pick between JDBC and HoloHub.
binlog_change_log_mode: How binlog records are mapped to Flink
change-log rows. ``"ALL"`` (the default) emits +I/-U/+U/-D;
``"UPSERT"`` emits +I/+U/-D and requires a ``primary_key``;
``"ALL_AS_APPEND_ONLY"`` treats every event as +I.
binlog_startup_mode: Where the binlog consumer starts.
``"INITIAL"`` first snapshots, then tails the binlog;
``"EARLIEST_OFFSET"`` and ``"LATEST_OFFSET"`` start at the
binlog ends; ``"TIMESTAMP"`` starts at ``start_time``
(required in this mode). Setting ``start_time`` alone is
equivalent — the connector promotes the startup mode to
TIMESTAMP whenever ``start_time`` is provided.
binlog_batch_size: Number of rows read per binlog batch.
binlog_request_timeout_ms: Binlog request timeout (milliseconds).
binlog_project_columns: When ``True``, request only the declared
columns from the binlog stream. ``None`` (default) lets the
server pick.
binlog_compression: When ``True``, enable LZ4 compression on the
binlog transport. ``None`` (default) lets the server pick.
partition_binlog_mode: How to consume binlog for partitioned
tables. ``"DISABLE"`` (default) treats partition tables as
ordinary tables. ``"STATIC"`` reads a fixed set of partitions
(combine with ``partition_values_to_read``). ``"DYNAMIC"``
adapts to new partitions automatically.
partition_binlog_lateness_timeout_minutes: Maximum lateness, in
minutes, tolerated when ``partition_binlog_mode='DYNAMIC'``.
partition_values_to_read: Partition values to consume when
``partition_binlog_mode='STATIC'``. Accepts a list of strings
(e.g. ``["ds=20240101", "ds=20240102"]``) or a comma-separated
string (e.g. ``"ds=20240101,ds=20240102"``). Wildcards / regex
are not supported.
start_time: Binlog consumption start time. When set, the Hologres
connector forces TIMESTAMP startup and ignores
``binlog_startup_mode``, so it's fine to leave that argument
at its default. Accepts a naive ``datetime`` without ``tzinfo``
(formatted as ``"yyyy-MM-dd HH:mm:ss"``) or a string already
in that format. The Hologres connector parses this string in
the JobManager JVM's default timezone, so the wall-clock
value you pass must already be expressed in that zone.
fetch_size: JDBC fetch size for bulk scans.
scan_timeout_seconds: Bulk-scan request timeout, in seconds.
Default is 600 (10 minutes), matching the Hologres
connector's own default for native SQL DDL.
filter_push_down: When ``True``, push filters down during bulk
scans. Mutually exclusive with ``binlog_filter_push_down``.
binlog_filter_push_down: When ``True``, push filters down on the
binlog stream.
prefer_physical_column_over_metadata_column: When ``True``, prefer
a physical column over a connector-defined metadata column of
the same name. Effective only in binlog mode.
lookup_batch_size: Maximum number of rows per dim-table point-query
batch. Effective only when the DataFrame is used as a dim
table in a downstream lookup join.
lookup_timeout_ms: Per-lookup timeout (milliseconds). ``0`` means
no timeout.
lookup_column_table: When ``True``, allow using a column-store
Hologres table as a dim table. Performance is typically poor
and the connector logs a warning.
lookup_insert_if_not_exists: When ``True``, insert the current
join key into Hologres if the dim-table lookup misses.
lookup_async: When ``True`` (default), enable asynchronous dim-table
lookups. Async lookup is enabled when either ``lookup_async``
or ``async_`` is ``True``.
lookup_filter_push_down: When ``True``, push dim-table filters
down to Hologres (only column-vs-constant comparisons are
pushed down). Requires VVR 11.4+.
cache: Dim-table cache policy. ``"None"`` (the default) disables
caching; ``"LRU"`` enables an in-memory LRU cache.
cache_size: Maximum number of rows held by the LRU cache.
Default is 100000, matching the Hologres connector's own
default for native SQL DDL.
cache_ttl_ms: Cache entry TTL (milliseconds). ``None`` (the
default) lets the connector pick based on ``cache``.
cache_empty: When ``True`` (default), also cache empty lookup
results. Disable when the upstream Hologres table is
constantly receiving new rows that joins should pick up.
async_: Specifies whether to return results asynchronously.
Note, async lookup is controlled by both this and ``lookup_async``.
To disable async lookup, you need to set both to ``False``.
extra_options: Additional connector options forwarded
through to the underlying Hologres connector. This is for
options not exposed as named parameters of ``read_hologres``,
e.g., ``connection.direct.enabled``,
``connection.max-alive-ms``. If a key matches an option
generated by a named parameter of ``read_hologres``, the
value in ``extra_options`` takes precedence. ``"connector"``
is reserved and must not be supplied.
Returns:
A DataFrame backed by the Hologres source.
Raises:
ValueError: If ``schema`` is missing; if any enum value is
invalid (including ``cache``); if
``binlog_startup_mode='TIMESTAMP'`` without ``start_time``;
if ``start_time`` is an aware ``datetime`` (with
``tzinfo``); if ``binlog_change_log_mode='UPSERT'`` without
``primary_key``; if ``connection_ssl_mode`` is ``"verify-ca"``
or ``"verify-full"`` without
``connection_ssl_root_cert_location``; if
``lookup_filter_push_down`` is enabled together with
either ``filter_push_down`` or ``binlog_filter_push_down``,
or if ``filter_push_down`` and ``binlog_filter_push_down``
are both enabled; if ``partition_values_to_read`` is set
outside STATIC mode; or if ``extra_options`` contains a
``"connector"`` key.
Examples:
Streaming binlog source (INITIAL mode, defaults)::
import pyflink.dataframe as pf
df = pf.read_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.orders",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
schema={
"order_id": pf.DataType.bigint(),
"buyer": pf.DataType.string(),
"amount": pf.DataType.decimal(20, 4),
},
primary_key="order_id",
)
Bulk scan only (no binlog), with column projection::
df = pf.read_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.user_profiles",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
schema={
"user_id": pf.DataType.bigint(),
"city": pf.DataType.string(),
"country": pf.DataType.string(),
},
primary_key="user_id",
binlog=False,
columns=["user_id", "city"],
filter_push_down=True,
)
Time-bounded binlog replay (TIMESTAMP startup)::
from datetime import datetime
df = pf.read_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.events",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
schema={"event_id": pf.DataType.bigint()},
binlog_startup_mode="TIMESTAMP",
start_time=datetime(2026, 1, 15, 0, 0, 0),
binlog_change_log_mode="ALL",
)
Static-partitioned binlog (consume only two partitions)::
df = pf.read_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.daily_logs",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
schema={"day": pf.DataType.string(), "msg": pf.DataType.string()},
partition_binlog_mode="STATIC",
partition_values_to_read=["2026-01-15", "2026-01-16"],
)
Dim-table source for a downstream lookup join (LRU cache,
async lookup tuned)::
dim = pf.read_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.user_profiles",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
schema={
"user_id": pf.DataType.bigint(),
"country": pf.DataType.string(),
},
primary_key="user_id",
binlog=False,
cache="LRU",
cache_size=50000,
cache_ttl_ms=600000,
lookup_async=True,
lookup_batch_size=512,
)
Forward-compat: tune a non-named option (and override a named
one) via ``extra_options``::
df = pf.read_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.orders",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
schema={"order_id": pf.DataType.bigint()},
fetch_size=512,
extra_options={
"hologres.server.version": "3.1",
"source.scan.fetch-size": "8192", # overrides default
},
)
"""
from pyflink.dataframe.context import get_or_create_table_environment
from pyflink.dataframe.util.artifacts import add_built_in_connector
from pyflink.table import TableDescriptor, Schema
from pyflink.table.expressions import col
_validate_read_hologres_options(
binlog_read_mode=binlog_read_mode,
binlog_change_log_mode=binlog_change_log_mode,
binlog_startup_mode=binlog_startup_mode,
partition_binlog_mode=partition_binlog_mode,
connection_ssl_mode=connection_ssl_mode,
connection_ssl_root_cert_location=connection_ssl_root_cert_location,
start_time=start_time,
filter_push_down=filter_push_down,
binlog_filter_push_down=binlog_filter_push_down,
lookup_filter_push_down=lookup_filter_push_down,
partition_values_to_read=partition_values_to_read,
cache=cache,
primary_key=primary_key,
)
if schema is None:
raise ValueError("'schema' must be specified for Hologres source.")
def _format_partition_values(value):
if value is None:
return None
if isinstance(value, str):
# An empty string would be joined back into [""] on the Java
# side and trip HologresUtils.getPartitionsToRead with an
# opaque "<empty> is not a valid partition value" runtime
# error; reject up-front.
if not value:
raise ValueError(
"partition_values_to_read must not be empty."
)
return value
if isinstance(value, list):
if not value:
raise ValueError(
"partition_values_to_read must not be empty."
)
for item in value:
if not isinstance(item, str) or not item:
raise ValueError(
"partition_values_to_read entries must be non-empty strings."
)
return ",".join(value)
raise TypeError(
"'partition_values_to_read' must be a string or list of strings."
)
options = [
("endpoint", endpoint),
("dbname", db_name),
("tablename", table_name),
("username", username),
("password", password),
("connection.pool.size", str(connection_pool_size)),
("connection.pool.name", connection_pool_name),
("connection.max-idle-ms", str(connection_max_idle_ms)),
("connection.ssl.mode", connection_ssl_mode),
("retry-count", str(retry_count)),
("retry-sleep-step-ms", str(retry_sleep_step_ms)),
("meta-cache-ttl-ms", str(meta_cache_ttl_ms)),
("serverless-computing.enabled", str(serverless_computing)),
("source.binlog", str(binlog)),
("source.binlog.read-mode", binlog_read_mode),
("source.binlog.change-log-mode", binlog_change_log_mode),
("source.binlog.startup-mode", binlog_startup_mode),
("source.binlog.batch-size", str(binlog_batch_size)),
("source.binlog.request-timeout-ms", str(binlog_request_timeout_ms)),
("source.binlog.partition-binlog-mode", partition_binlog_mode),
(
"source.binlog.partition-binlog-lateness-timeout-minutes",
str(partition_binlog_lateness_timeout_minutes),
),
("source.scan.fetch-size", str(fetch_size)),
("source.scan.timeout-seconds", str(scan_timeout_seconds)),
("source.scan.filter-push-down.enabled", str(filter_push_down)),
(
"source.binlog.filter-push-down.enabled",
str(binlog_filter_push_down),
),
(
"scan.prefer.physical-column.over.metadata-column",
str(prefer_physical_column_over_metadata_column),
),
("lookup.read.batch-size", str(lookup_batch_size)),
("lookup.read.timeout-ms", str(lookup_timeout_ms)),
("lookup.read.column-table.enabled", str(lookup_column_table)),
("lookup.insert-if-not-exists", str(lookup_insert_if_not_exists)),
("lookup.async", str(lookup_async)),
("lookup.filter-push-down.enabled", str(lookup_filter_push_down)),
("cache", cache),
("cacheSize", str(cache_size)),
("cacheEmpty", str(cache_empty)),
("async", str(async_)),
]
if connection_fixed is not None:
options.append(("connection.fixed.enabled", str(connection_fixed)))
if connection_ssl_root_cert_location is not None:
options.append(
(
"connection.ssl.root-cert.location",
connection_ssl_root_cert_location,
)
)
if binlog_project_columns is not None:
options.append(
("source.binlog.project-columns.enabled", str(binlog_project_columns))
)
if binlog_compression is not None:
options.append(
("source.binlog.compression.enabled", str(binlog_compression))
)
formatted_partition_values = _format_partition_values(partition_values_to_read)
if formatted_partition_values is not None:
options.append(
("source.binlog.partition-values-to-read", formatted_partition_values)
)
if start_time is not None:
options.append(("startTime", _format_time(start_time)))
if cache_ttl_ms is not None:
options.append(("cacheTTLMs", str(cache_ttl_ms)))
primary_key_columns = _normalize_primary_key(primary_key, schema)
t_env = get_or_create_table_environment()
add_built_in_connector(t_env, "hologres")
schema_builder = Schema.new_builder()
primary_key_set = set(primary_key_columns)
for name, dtype in schema.items():
table_type = dtype._table_type
if name in primary_key_set:
table_type = table_type.not_null()
schema_builder.column(name, table_type)
if primary_key_columns:
schema_builder.primary_key(*primary_key_columns)
builder = TableDescriptor.for_connector("hologres").schema(
schema_builder.build()
)
builder = _add_connector_options(builder, options)
builder = _add_extra_options(builder, extra_options)
table = t_env.from_descriptor(builder.build())
if columns is not None:
table = table.select(*[col(c) for c in columns])
return DataFrame(table)
[docs]
def read_custom(
connector: str,
*,
schema: Optional[Dict[str, "DataType"]] = None,
primary_key: Optional[Union[str, List[str]]] = None,
columns: Optional[List[str]] = None,
options: Optional[Dict[str, Any]] = None,
) -> DataFrame:
"""
Read data from a custom connector into a DataFrame.
This is a generic entrypoint for connectors that are not covered by
the dedicated ``read_*`` helpers (e.g. ``read_parquet``,
``read_kafka``, ``read_odps``). It targets SQL connectors that are
discoverable through Flink's standard factory mechanism.
Args:
connector: Factory identifier of the connector. This is the same
value users put in SQL DDL ``WITH ('connector' = '...')``.
For example, ``"datagen"``, ``"filesystem"``, or the name
of a custom connector registered on the platform.
schema: Dict of ``{column_name: DataType}`` describing the
schema of the source. Required.
primary_key: Optional primary key column name or list of column
names. Set this only for connectors that expect primary key
constraints in the table schema.
columns: Optional list of column names to read (projection
pushdown). If ``None``, all columns from ``schema`` are
read.
options: Optional dict of connector and table options. Each key
is forwarded as-is with the same name it would have in
``WITH (...)`` in SQL DDL. Values are converted to strings.
``"connector"`` is reserved and must be
specified via the ``connector`` argument.
Returns:
A new DataFrame backed by the configured source.
Example::
>>> import pyflink.dataframe as pf
>>>
>>> # Read with a connector identified by ``"my-custom"`` (uploaded
>>> # connector JAR registered with factory identifier ``my-custom``).
>>> df = pf.read_custom(
... "my-custom",
... schema={
... "id": pf.DataType.int64(),
... "name": pf.DataType.string(),
... },
... options={
... "host": "example.com",
... "port": "9000",
... },
... )
>>>
>>> # Read with a format and format options
>>> df = pf.read_custom(
... "my-custom",
... schema={"id": pf.DataType.int64()},
... primary_key="id",
... options={
... "endpoint": "example.com:9000",
... "format": "json",
... "json.ignore-parse-errors": "true",
... },
... )
"""
from pyflink.table import TableDescriptor, Schema
from pyflink.table.expressions import col
from pyflink.dataframe.context import get_or_create_table_environment
from pyflink.dataframe.util.artifacts import add_built_in_connector
if not isinstance(connector, str) or not connector:
raise ValueError(
"'connector' must be a non-empty string identifying the "
"connector (the same value used as 'connector' in SQL DDL)."
)
if schema is None:
raise ValueError(
"schema is required for read_custom. "
"Please provide a dict of {column_name: DataType}.\n"
"Example:\n"
" pf.read_custom('my_connector', schema={\n"
" 'id': pf.DataType.int64(),\n"
" 'name': pf.DataType.string(),\n"
" })"
)
connector_options = _collect_custom_options(options)
primary_key_columns = _normalize_primary_key(
primary_key,
schema,
)
t_env = get_or_create_table_environment()
# If `connector` is a built-in connector that we do not
# expose directly via a dedicated read_* helper, auto-load its jar.
# Unknown / user-defined connector names are silently ignored by the
# helper.
add_built_in_connector(t_env, connector)
builder = TableDescriptor.for_connector(connector)
schema_builder = Schema.new_builder()
primary_key_set = set(primary_key_columns)
for name, dtype in schema.items():
table_type = dtype._table_type
if name in primary_key_set:
table_type = table_type.not_null()
schema_builder.column(name, table_type)
if primary_key_columns:
schema_builder.primary_key(*primary_key_columns)
builder = builder.schema(schema_builder.build())
for key, value in connector_options.items():
builder = builder.option(key, str(value))
table = t_env.from_descriptor(builder.build())
if columns is not None:
table = table.select(*[col(c) for c in columns])
return DataFrame(table)
def _validate_odps_common_options(
*,
endpoint: str,
project: str,
table_name: str,
access_id: str,
access_key: str,
tunnel_endpoint: Optional[str],
quota_name: Optional[str],
) -> None:
for name, value in (
("endpoint", endpoint),
("project", project),
("table_name", table_name),
("access_id", access_id),
("access_key", access_key),
):
if value is None or value == "":
raise ValueError(f"'{name}' must be specified for ODPS.")
if tunnel_endpoint is not None and quota_name is not None:
raise ValueError(
"Only one of 'tunnel_endpoint' or 'quota_name' can be "
"specified. Remove 'tunnel_endpoint' when using an ODPS "
"exclusive data-transfer quota."
)
def _format_odps_partition(
partition,
*,
allow_dynamic: bool = False,
name: str = "partition",
):
def _format_odps_fixed_partition(partition):
parts = []
for item in partition:
if not isinstance(item, tuple) or len(item) != 2:
raise ValueError(
f"'{name}' entries must be "
"(partition_column, partition_value) tuples."
)
partition_column, partition_value = item
if not isinstance(partition_column, str) or not partition_column:
raise ValueError(
f"'{name}' partition column names must be " "non-empty strings."
)
if partition_value is None:
raise ValueError(f"'{name}' partition values must not be None.")
parts.append(f"{partition_column}={_to_str(partition_value)}")
return ",".join(parts)
if partition is None or isinstance(partition, str):
return partition
if isinstance(partition, tuple):
return _format_odps_fixed_partition([partition])
if not isinstance(partition, list):
raise TypeError(
f"'{name}' must be a string, a "
"(partition_column, partition_value) tuple, or a list."
)
if not partition:
raise ValueError(f"'{name}' list must not be empty.")
if all(isinstance(item, str) for item in partition):
if not allow_dynamic:
raise ValueError(
f"'{name}' list entries must be "
"(partition_column, partition_value) tuples."
)
for partition_column in partition:
if not partition_column:
raise ValueError(
f"'{name}' dynamic partition column names must be "
"non-empty strings."
)
return ",".join(partition)
return _format_odps_fixed_partition(partition)
def _format_paimon_column_list(
value: Optional[Union[str, List[str]]],
name: str,
) -> Optional[str]:
if value is None:
return None
if isinstance(value, str):
columns = [column.strip() for column in value.split(",")]
elif isinstance(value, (list, tuple)):
if not value:
raise ValueError(f"'{name}' list must not be empty.")
columns = []
for column in value:
if not isinstance(column, str):
raise ValueError(f"'{name}' entries must be non-empty strings.")
columns.append(column.strip())
else:
raise TypeError(f"'{name}' must be a string or a list of column names.")
seen = set()
for column in columns:
if not column:
raise ValueError(f"'{name}' entries must be non-empty strings.")
if column in seen:
raise ValueError(f"'{name}' must not contain duplicate column names.")
seen.add(column)
return ",".join(columns)
def _validate_paimon_common_options(
*,
file_format: str,
changelog_producer: str,
merge_engine: str,
write_mode: Optional[str] = None,
) -> None:
_validate_allowed_value(file_format, "file_format", _PAIMON_FILE_FORMATS)
_validate_allowed_value(
changelog_producer,
"changelog_producer",
_PAIMON_CHANGELOG_PRODUCERS,
)
_validate_allowed_value(merge_engine, "merge_engine", _PAIMON_MERGE_ENGINES)
if write_mode is not None:
_validate_allowed_value(write_mode, "write_mode", _PAIMON_WRITE_MODES)
def _validate_read_hologres_options(
*,
binlog_read_mode,
binlog_change_log_mode,
binlog_startup_mode,
partition_binlog_mode,
connection_ssl_mode,
connection_ssl_root_cert_location,
start_time,
filter_push_down,
binlog_filter_push_down,
lookup_filter_push_down,
partition_values_to_read,
cache,
primary_key,
) -> None:
_validate_allowed_value(
binlog_read_mode, "binlog_read_mode", _HOLOGRES_BINLOG_READ_MODES
)
_validate_allowed_value(
binlog_change_log_mode,
"binlog_change_log_mode",
_HOLOGRES_BINLOG_CHANGE_LOG_MODES,
)
_validate_allowed_value(
binlog_startup_mode,
"binlog_startup_mode",
_HOLOGRES_BINLOG_STARTUP_MODES,
)
_validate_allowed_value(
partition_binlog_mode,
"partition_binlog_mode",
_HOLOGRES_PARTITION_BINLOG_MODES,
)
_validate_allowed_value(
connection_ssl_mode, "connection_ssl_mode", _HOLOGRES_SSL_MODES
)
_validate_allowed_value(cache, "cache", _HOLOGRES_CACHE_TYPES)
_validate_hologres_ssl_pair(
connection_ssl_mode, connection_ssl_root_cert_location
)
# NOTE: start_time without binlog_startup_mode='TIMESTAMP' is permitted.
# The Hologres connector documents that startTime takes precedence and
# forces TIMESTAMP startup mode when set (see HologresBinlogInputSplit).
if binlog_startup_mode == "TIMESTAMP" and start_time is None:
raise ValueError(
"start_time is required when binlog_startup_mode='TIMESTAMP'."
)
if binlog_change_log_mode == "UPSERT" and not primary_key:
raise ValueError(
"binlog_change_log_mode='UPSERT' requires primary_key to be set; "
"Flink rejects an upsert changelog source without a primary key."
)
if filter_push_down and binlog_filter_push_down:
raise ValueError(
"filter_push_down and binlog_filter_push_down cannot both be True."
)
# HologresTableSource#verifyPushdownConfigs throws when lookup
# filter-push-down is combined with either source-side filter-push-down.
if lookup_filter_push_down and filter_push_down:
raise ValueError(
"lookup_filter_push_down and filter_push_down cannot both be True."
)
if lookup_filter_push_down and binlog_filter_push_down:
raise ValueError(
"lookup_filter_push_down and binlog_filter_push_down "
"cannot both be True."
)
if (
partition_values_to_read is not None
and partition_binlog_mode != "STATIC"
):
raise ValueError(
"partition_values_to_read is only valid when "
"partition_binlog_mode='STATIC'."
)
def _validate_hologres_ssl_pair(
connection_ssl_mode, connection_ssl_root_cert_location
) -> None:
if (
connection_ssl_mode in ("verify-ca", "verify-full")
and connection_ssl_root_cert_location is None
):
raise ValueError(
f"connection_ssl_mode={connection_ssl_mode!r} requires "
"connection_ssl_root_cert_location to be set "
"(the Hologres JDBC driver rejects verify-* modes without a "
"root certificate path)."
)
def _paimon_common_options(
*,
path: Optional[str],
auto_create: bool,
file_format: str,
bucket: int,
bucket_key: Optional[Union[str, List[str]]],
changelog_producer: str,
full_compaction_delta_commits: Optional[int],
lookup_cache_max_memory_size: str,
merge_engine: str,
partial_update_ignore_delete: bool,
ignore_delete: bool,
partition_default_name: str,
partition_expiration_check_interval: str,
partition_expiration_time: Optional[str],
partition_timestamp_formatter: Optional[str],
partition_timestamp_pattern: Optional[str],
snapshot_num_retained_max: int,
snapshot_num_retained_min: int,
snapshot_time_retained: str,
write_mode: Optional[str] = None,
) -> List[Tuple[str, Any]]:
options = [
("path", path),
("auto-create", auto_create),
("file.format", file_format),
("bucket", bucket),
("bucket-key", _format_paimon_column_list(bucket_key, "bucket_key")),
("changelog-producer", changelog_producer),
("full-compaction.delta-commits", full_compaction_delta_commits),
("lookup.cache-max-memory-size", lookup_cache_max_memory_size),
("merge-engine", merge_engine),
("partial-update.ignore-delete", partial_update_ignore_delete),
("ignore-delete", ignore_delete),
("partition.default-name", partition_default_name),
(
"partition.expiration-check-interval",
partition_expiration_check_interval,
),
("partition.expiration-time", partition_expiration_time),
("partition.timestamp-formatter", partition_timestamp_formatter),
("partition.timestamp-pattern", partition_timestamp_pattern),
("snapshot.num-retained.max", snapshot_num_retained_max),
("snapshot.num-retained.min", snapshot_num_retained_min),
("snapshot.time-retained", snapshot_time_retained),
]
if write_mode is not None:
options.append(("write-mode", write_mode))
return options
def _validate_allowed_value(value: str, name: str, allowed) -> None:
if value is None:
raise ValueError(f"'{name}' must not be None.")
if value not in allowed:
raise ValueError(
f"Invalid {name} '{value}'. Must be one of " f"{sorted(allowed)}."
)
def _add_connector_options(builder, options):
for key, value in options:
if value is not None:
builder = builder.option(key, str(value))
return builder
def _add_extra_options(builder, extra_options):
if extra_options is None:
return builder
if not isinstance(extra_options, dict):
raise TypeError("'extra_options' must be a dict of option key/value pairs.")
for key, value in extra_options.items():
if isinstance(key, str) and key.lower() == "connector":
raise ValueError(
"'extra_options' must not override the 'connector' option."
)
builder = builder.option(key, str(value))
return builder
def _to_str(value: Any) -> str:
if isinstance(value, bool):
return str(value).lower()
return str(value)
def _format_time(value: Union[str, datetime], format: str = "%Y-%m-%d %H:%M:%S") -> str:
"""Format a time value.
``datetime`` is formatted according to the given ``format``. ``str`` is returned
unchanged.
Aware ``datetime`` (with ``tzinfo``) is rejected, because ``strftime``
would silently drop the ``tzinfo``.
"""
if isinstance(value, datetime):
if value.tzinfo is not None:
raise ValueError(
"datetime with tzinfo is not supported: "
"pass a naive datetime instead."
)
return value.strftime(format)
return value
def _collect_custom_options(
options: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
"""Validate and copy connector options."""
if options is None:
return {}
if not isinstance(options, dict):
raise TypeError("'options' must be a dict of {option_key: option_value}.")
connector_options: Dict[str, Any] = {}
for key, value in options.items():
if not isinstance(key, str) or not key:
raise ValueError(
f"Invalid key '{key}' in options. "
"Option keys must be non-empty strings."
)
if key == "connector":
raise ValueError(
"Connector identifier must be specified via the "
"'connector' argument, not as options['connector']."
)
connector_options[key] = value
return connector_options
def _normalize_primary_key(
primary_key: Optional[Union[str, List[str]]],
column_names,
) -> List[str]:
if primary_key is None:
return []
if isinstance(primary_key, str):
primary_key_columns = [primary_key]
elif isinstance(primary_key, list):
primary_key_columns = primary_key
else:
raise TypeError(
"'primary_key' must be a string column name or a list of " "column names."
)
if not primary_key_columns:
raise ValueError("'primary_key' list must not be empty.")
column_name_set = set(column_names)
for column in primary_key_columns:
if not isinstance(column, str) or not column:
raise ValueError("'primary_key' column names must be non-empty strings.")
if column not in column_name_set:
raise ValueError(
f"Primary key column '{column}' is not present in " "the schema."
)
return primary_key_columns