Source code for pyflink.dataframe.io

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
I/O utilities for reading external data sources into DataFrame.
"""

import uuid
from datetime import datetime
from typing import Any, Dict, List, Literal, Optional, Tuple, Union, TYPE_CHECKING

from pyflink.dataframe.dataframe import DataFrame

if TYPE_CHECKING:
    from pyflink.dataframe.datatype import DataType

__all__ = [
    "read_parquet",
    "read_kafka",
    "read_odps",
    "read_paimon",
    "read_hologres",
    "read_sls",
    "read_custom",
]


_ODPS_LONG_MAX_VALUE = 9223372036854775807
_ODPS_COMPRESS_ALGORITHMS = {"RAW", "ZLIB", "SNAPPY"}
_ODPS_COMPRESS_CODECS = {"", "ZSTD", "LZ4_FRAME"}
_ODPS_MODIFIED_TABLE_OPERATIONS = {"NONE", "SKIP"}
_ODPS_CACHE_VALUES = {"ALL"}
_ODPS_SINK_OPERATIONS = {"insert", "upsert"}

_SLS_STARTUP_MODES = ("timestamp", "latest", "earliest", "consumer_group")

_PAIMON_FILE_FORMATS = {"orc", "parquet", "avro", "lance"}
_PAIMON_CHANGELOG_PRODUCERS = {"none", "input", "full-compaction", "lookup"}
_PAIMON_MERGE_ENGINES = {"deduplicate", "partial-update", "aggregation"}
_PAIMON_SCAN_MODES = {
    "default",
    "latest-full",
    "latest",
    "compacted-full",
    "from-timestamp",
    "from-snapshot",
}
_PAIMON_WRITE_MODES = {"change-log", "append-only"}
_PAIMON_DELETE_STRATEGIES = {
    "NONE",
    "IGNORE_DELETE",
    "NON_PK_FIELD_TO_NULL",
    "DELETE_ROW_ON_PK",
    "CHANGELOG_STANDARD",
}

_HOLOGRES_BINLOG_READ_MODES = ("AUTO", "JDBC", "HOLOHUB")
_HOLOGRES_BINLOG_CHANGE_LOG_MODES = ("ALL", "UPSERT", "ALL_AS_APPEND_ONLY")
_HOLOGRES_BINLOG_STARTUP_MODES = (
    "INITIAL",
    "EARLIEST_OFFSET",
    "TIMESTAMP",
    "LATEST_OFFSET",
)
_HOLOGRES_PARTITION_BINLOG_MODES = ("DISABLE", "STATIC", "DYNAMIC")
_HOLOGRES_SSL_MODES = ("disable", "require", "verify-ca", "verify-full")
_HOLOGRES_WRITE_MODES = (
    "AUTO",
    "INSERT",
    "COPY_STREAM",
    "COPY_BULK_LOAD",
    "COPY_BULK_LOAD_ON_CONFLICT",
)
_HOLOGRES_ON_CONFLICT_ACTIONS = (
    "INSERT_OR_IGNORE",
    "INSERT_OR_UPDATE",
    "INSERT_OR_REPLACE",
)
_HOLOGRES_DELETE_STRATEGIES = (
    "IGNORE_DELETE",
    "NON_PK_FIELD_TO_NULL",
    "DELETE_ROW_ON_PK",
    "CHANGELOG_STANDARD",
)
_HOLOGRES_COPY_FORMATS = ("binary", "text", "binaryrow")
_HOLOGRES_CACHE_TYPES = ("None", "LRU")


[docs] def read_parquet( path: str, *, schema: Optional[Dict[str, "DataType"]] = None, columns: Optional[List[str]] = None, monitor_interval: Optional[str] = None, path_regex_pattern: Optional[str] = None, ) -> DataFrame: """ Read a Parquet file or directory into a DataFrame. Uses Flink's FileSystem Connector with Parquet Format under the hood. Args: path: Path to a Parquet file, directory, or glob pattern. Supports local paths and any Flink-supported file system (HDFS, OSS, S3, etc.). schema: Dict of {column_name: DataType} specifying the schema. This parameter is required. columns: Optional list of column names to read (projection pushdown). If None, all columns are read. monitor_interval: Optional interval for continuously monitoring the directory for new files (e.g., '10s', '1min'). If None, the path is scanned once (bounded source). If set, the source becomes unbounded (streaming). path_regex_pattern: Optional regex pattern to filter files. The pattern is matched against each file's absolute path. Only files whose path matches the pattern will be read. This is useful when reading from a directory that contains mixed file types or when you want to select a subset of files based on naming conventions. Returns: A new DataFrame. Example:: >>> import pyflink.dataframe as pf >>> >>> # Read with explicit schema >>> df = pf.read_parquet("/path/to/data.parquet", schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... }) >>> >>> # Read only specific columns >>> df = pf.read_parquet("/path/to/data.parquet", schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... "value": pf.DataType.float64(), ... }, columns=["id", "name"]) >>> >>> # Continuously monitor directory for new files >>> df = pf.read_parquet("/path/to/dir", schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... }, monitor_interval="10s") >>> >>> # Filter files by regex pattern (matched against absolute path) >>> df = pf.read_parquet("/path/to/dir", schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... }, path_regex_pattern="/path/to/dir/part-.*\\.parquet") >>> >>> # Only read files from specific date partitions >>> df = pf.read_parquet("/path/to/dir", schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... }, path_regex_pattern=".*/dt=2024-01-0[1-3]/.*") """ from pyflink.table import TableDescriptor, FormatDescriptor, Schema from pyflink.table.expressions import col from pyflink.dataframe.util.artifacts import ( add_built_in_connector, add_built_in_format, ) from pyflink.dataframe.context import get_or_create_table_environment t_env = get_or_create_table_environment() add_built_in_connector(t_env, "filesystem") add_built_in_format(t_env, "parquet") builder = ( TableDescriptor.for_connector("filesystem") .option("path", path) .format(FormatDescriptor.for_format("parquet").build()) ) if schema is None: raise ValueError( "schema is required for read_parquet. " "Please provide a dict of {column_name: DataType}.\n" "Example:\n" " pf.read_parquet('/path/to/data.parquet', schema={\n" " 'id': pf.DataType.int64(),\n" " 'name': pf.DataType.string(),\n" " })" ) schema_builder = Schema.new_builder() for name, dtype in schema.items(): schema_builder.column(name, dtype._table_type) builder = builder.schema(schema_builder.build()) if monitor_interval is not None: builder = builder.option("source.monitor-interval", monitor_interval) if path_regex_pattern is not None: builder = builder.option("source.path.regex-pattern", path_regex_pattern) table = t_env.from_descriptor(builder.build()) if columns is not None: table = table.select(*[col(c) for c in columns]) return DataFrame(table)
[docs] def read_kafka( bootstrap_servers: str, *, schema: Optional[Dict[str, "DataType"]] = None, columns: Optional[List[str]] = None, # General options properties: Optional[Dict[str, str]] = None, format: str = "json", format_options: Optional[Dict[str, str]] = None, key_format: Optional[str] = None, key_format_options: Optional[Dict[str, str]] = None, key_fields: Optional[List[str]] = None, key_fields_prefix: Optional[str] = None, value_format: Optional[str] = None, value_format_options: Optional[Dict[str, str]] = None, value_fields_include: Literal["ALL", "EXCEPT_KEY"] = "ALL", # Source-specific options topic: Optional[Union[str, List[str]]] = None, topic_pattern: Optional[str] = None, group_id: Optional[str] = None, startup_mode: Literal[ "earliest-offset", "latest-offset", "group-offsets", "timestamp", "specific-offsets", ] = "group-offsets", startup_specific_offsets: Optional[Union[str, Dict[int, int]]] = None, startup_timestamp_millis: Optional[int] = None, topic_partition_discovery_interval: str = "5 min", header_filter: Optional[str] = None, check_duplicated_group_id: bool = False, bounded_mode: Literal[ "unbounded", "group-offsets", "latest-offset", "timestamp", "specific-offsets" ] = "unbounded", bounded_timestamp_millis: Optional[int] = None, bounded_specific_offsets: Optional[Union[str, Dict[int, int]]] = None, ) -> DataFrame: """ Read data from one or more Kafka topics into a DataFrame. Uses Flink's Kafka SQL Connector under the hood. This creates a streaming source that continuously reads from Kafka. For bounded (batch) reads, use the ``bounded_mode`` parameter. Args: bootstrap_servers: Comma-separated list of Kafka broker addresses (e.g., ``"localhost:9092"`` or ``"broker1:9092,broker2:9092"``). schema: Dict of ``{column_name: DataType}`` specifying the schema of the data in Kafka messages. columns: Optional list of column names to read (projection pushdown). If None, all columns from the schema are read. properties: Extra Kafka consumer properties. Each key in this dict is directly passed to Kafka. Example: ``{"allow.auto.create.topics": "false"}`` Note: - Property names should be valid Kafka consumer config keys. - ``key.deserializer`` and ``value.deserializer`` cannot be set, because Kafka connector overrides them. format: The format used to deserialize the value part of Kafka messages (equivalent to ``value_format``). Supported options: ``"csv"``, ``"json"``, ``"avro"``, ``"debezium-json"``, ``"canal-json"``, ``"maxwell-json"``, ``"avro-confluent"``, ``"raw"``. format_options: Additional options for the value format used by ``format`` as a dict. Keys are format-specific options without the format prefix (e.g., ``{"map-null-key.mode": "FAIL"}`` for JSON format). key_format: The format used to deserialize the key part of Kafka messages. If a key format is defined, the ``key_fields`` option is required as well. Otherwise the Kafka records will have an empty key. key_format_options: Additional options for the key format, as a dict. Same structure as ``format_options``. key_fields: List of column names that form the message key (e.g. ``["field1", "field2"]``). key_fields_prefix: Prefix for key field column names to avoid name clashes with value fields. For example, with prefix ``"k_"``, a key field ``user_id`` becomes ``k_user_id`` in the schema. This prefix is only for table-column disambiguation; it is removed when parsing/serializing Kafka key fields. When this option is set, ``value_fields_include`` must be set to ``"EXCEPT_KEY"``. value_format: The format used to deserialize the value part of Kafka messages. Only one of ``format`` or ``value_format`` are required. If both are configured, ``value_format`` should take precedence. value_format_options: Extra options for ``value_format``. value_fields_include: Whether key fields are included in value fields during value parsing. - ``"ALL"`` (default): all columns are treated as value fields. - ``"EXCEPT_KEY"``: fields in ``key_fields`` are excluded from value fields. topic: Topic name(s) to read. It can be a single topic, a list of topics, or a semicolon-separated topic string (for example, ``"topic-1;topic-2"``). Exactly one of ``topic`` and ``topic_pattern`` can be set. topic_pattern: Regular expression for matching topic names. All topics matching this pattern are consumed at runtime. Exactly one of ``topic`` and ``topic_pattern`` can be set. group_id: Consumer group id. If a new group id is used for the first time, configure ``auto.offset.reset`` to ``earliest`` or ``latest`` in ``properties`` to control initial startup position. startup_mode: Startup position mode for Kafka consumer. Values: - ``"group-offsets"`` (default): Start from committed offsets in the consumer group. - ``"earliest-offset"``: Start from the earliest available offset. - ``"latest-offset"``: Start from the latest offset. - ``"timestamp"``: Start from a user-specified timestamp (requires ``startup_timestamp_millis``). - ``"specific-offsets"``: Start from specified offsets per partition (requires ``startup_specific_offsets``). Note: This option is effective for stateless startup. When restarting from checkpoint/savepoint, restored state progress has higher priority. startup_specific_offsets: Per-partition offsets to start reading from. Used when ``startup_mode="specific-offsets"``. Can be a string like ``"partition:0,offset:42;partition:1,offset:300"`` or a dict mapping partition IDs to offsets like ``{0: 42, 1: 300}``. startup_timestamp_millis: Epoch timestamp in milliseconds to start reading from. Used when ``startup_mode="timestamp"``. topic_partition_discovery_interval: Interval for discovering new partitions and topics dynamically (e.g., ``"30 s"``, ``"5 min"``). Default is 5 minutes. Note: - Set this to a non-positive interval to disable discovery. - In ``topic_pattern`` mode, this discovers both: new partitions in existing matched topics and newly created topics that match the pattern. header_filter: Logical expression for filtering Kafka records by headers. Syntax: - Use ``key:value`` for one header condition. - Use ``&`` and ``|`` to combine conditions. - Use ``!`` for negation. Example: ``"depart:toy|depart:book&!env:test"`` keeps records with ``depart=toy`` or ``depart=book``, and without ``env=test``. Notes: - Parentheses are not supported. - Operators are evaluated from left to right. - Header values are converted to UTF-8 strings before comparison. check_duplicated_group_id: Whether to check if the consumer group ID already exists on broker. Default is ``False``. - ``True``: check duplicate group id before job starts. If found, fail fast to avoid consumer-group conflicts. - ``False``: start directly without conflict check. bounded_mode: Bounded mode for Kafka consumer (connector ``scan.bounded.mode``). One of: - ``"unbounded"`` (default): Do not stop consuming. - ``"latest-offset"``: bounded by latest offsets. This is evaluated at the start of consumption from a given partition. - ``"group-offsets"``: bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition. - ``"timestamp"``: bounded by a user-supplied timestamp (requires ``bounded_timestamp_millis``). - ``"specific-offsets"``: bounded by user-supplied specific offsets for each partition (requires ``bounded_specific_offsets``). bounded_timestamp_millis: Epoch timestamp in milliseconds for the bounded end position. Used when ``bounded_mode="timestamp"``. bounded_specific_offsets: Per-partition offsets for the bounded end position. Used when ``bounded_mode="specific-offsets"``. Same format as ``startup_specific_offsets``. Returns: A new DataFrame representing the Kafka source. Example:: >>> import pyflink.dataframe as pf >>> >>> # Simple JSON consumption from a single topic >>> df = pf.read_kafka( ... "localhost:9092", ... topic="user_events", ... schema={ ... "user_id": pf.DataType.int64(), ... "event": pf.DataType.string(), ... "ts": pf.DataType.timestamp(3), ... }, ... ) >>> >>> # Read from earliest offset with explicit consumer group >>> df = pf.read_kafka( ... "broker1:9092,broker2:9092", ... topic="orders", ... schema={ ... "order_id": pf.DataType.int64(), ... "amount": pf.DataType.float64(), ... }, ... group_id="my-consumer-group", ... startup_mode="earliest-offset", ... ) >>> >>> # Read multiple topics >>> df = pf.read_kafka( ... "localhost:9092", ... topic=["topic_a", "topic_b"], ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... ) >>> >>> # Subscribe to topics by pattern >>> df = pf.read_kafka( ... "localhost:9092", ... topic_pattern="events-.*", ... schema={ ... "id": pf.DataType.int64(), ... "payload": pf.DataType.string(), ... }, ... ) >>> >>> # Bounded read (batch mode) — read up to the latest offset >>> df = pf.read_kafka( ... "localhost:9092", ... topic="orders", ... schema={ ... "order_id": pf.DataType.int64(), ... "amount": pf.DataType.float64(), ... }, ... startup_mode="earliest-offset", ... bounded_mode="latest-offset", ... ) >>> >>> # Read with key+value formats and Avro >>> df = pf.read_kafka( ... "localhost:9092", ... topic="keyed_events", ... schema={ ... "user_id": pf.DataType.int64(), ... "event": pf.DataType.string(), ... }, ... format="avro", ... key_format="json", ... key_fields=["user_id"], ... ) >>> >>> # Start from a specific timestamp >>> df = pf.read_kafka( ... "localhost:9092", ... topic="events", ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... startup_mode="timestamp", ... startup_timestamp_millis=1700000000000, ... ) >>> >>> # Start from specific offsets per partition >>> df = pf.read_kafka( ... "localhost:9092", ... topic="events", ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... startup_mode="specific-offsets", ... startup_specific_offsets={0: 42, 1: 300}, ... ) >>> >>> # JSON format with custom parse options >>> df = pf.read_kafka( ... "localhost:9092", ... topic="events", ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... format="json", ... format_options={"ignore-parse-errors": "true"}, ... ) >>> >>> # With Kafka security (SASL/SSL) >>> # When using Aliyun Kafka, download the truststore file from the Kafka >>> # console and upload it as an additional dependency file. >>> df = pf.read_kafka( ... "localhost:9092", ... topic="secure_events", ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... properties={ ... "ssl.truststore.location": "/flink/usrlib/only.4096.client.truststore.jks", ... "ssl.truststore.password": "KafkaOnsClient", ... "ssl.endpoint.identification.algorithm": "", ... "sasl.jaas.config": ( ... "org.apache.flink.kafka.shaded.org.apache.kafka.common.security" ... ".plain.PlainLoginModule required " ... f'username="<username>" ' ... f'password="<password>";' ... ), ... "sasl.mechanism": "PLAIN", ... "security.protocol": "SASL_SSL", ... }, ... ) >>> >>> # Read with column projection >>> df = pf.read_kafka( ... "localhost:9092", ... topic="events", ... schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... "value": pf.DataType.float64(), ... }, ... columns=["id", "name"], ... ) >>> >>> # CDC with Debezium format >>> df = pf.read_kafka( ... "localhost:9092", ... topic="cdc_events", ... schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... }, ... format="debezium-json", ... ) """ from pyflink.table import TableDescriptor, FormatDescriptor, Schema from pyflink.table.expressions import col from pyflink.dataframe.util.artifacts import ( add_built_in_connector, add_built_in_format, ) from pyflink.dataframe.context import get_or_create_table_environment def _format_specific_offsets( offsets: Union[str, Dict[int, int]], ) -> str: if isinstance(offsets, str): return offsets return ";".join(f"partition:{p},offset:{o}" for p, o in offsets.items()) # Validate schema if schema is None: raise ValueError( "schema is required for read_kafka. " "Please provide a dict of {column_name: DataType}.\n" "Example:\n" " pf.read_kafka('localhost:9092', topic='my_topic', schema={\n" " 'id': pf.DataType.int64(),\n" " 'name': pf.DataType.string(),\n" " })" ) # Validate topic/topic_pattern if topic is None and topic_pattern is None: raise ValueError("Either 'topic' or 'topic_pattern' must be specified.") if topic is not None and topic_pattern is not None: raise ValueError( "Only one of 'topic' or 'topic_pattern' can be specified, " "not both." ) # Validate key/value option dependencies if key_format is not None and key_fields is None: raise ValueError("'key_fields' must be specified when 'key_format' is defined.") if key_fields_prefix is not None and value_fields_include != "EXCEPT_KEY": raise ValueError( "'value_fields_include' must be 'EXCEPT_KEY' when " "'key_fields_prefix' is set." ) def _validate_keys_are_strings(d: Dict, name: str) -> None: for k in d.keys(): if not isinstance(k, str): raise ValueError( f"Invalid key '{k}' in {name}. " "Option keys must be strings." ) if format_options is not None: _validate_keys_are_strings(format_options, "format_options") if key_format_options is not None: _validate_keys_are_strings(key_format_options, "key_format_options") if value_format_options is not None: _validate_keys_are_strings(value_format_options, "value_format_options") # Validate startup and bounded mode dependencies def _validate_scan_mode_options( mode_name: str, mode_value: str, timestamp_millis: Optional[int], specific_offsets: Optional[Union[str, Dict[int, int]]], ) -> None: timestamp_option = f"{mode_name}_timestamp_millis" specific_offsets_option = f"{mode_name}_specific_offsets" if mode_value == "group-offsets" and group_id is None: raise ValueError( f"'group_id' must be specified when {mode_name}_mode is " "'group-offsets'." ) if mode_value == "timestamp" and timestamp_millis is None: raise ValueError( f"'{timestamp_option}' must be specified when " f"{mode_name}_mode is 'timestamp'." ) if mode_value != "timestamp" and timestamp_millis is not None: raise ValueError( f"'{timestamp_option}' must not be specified unless " f"{mode_name}_mode is 'timestamp'." ) if mode_value == "specific-offsets" and specific_offsets is None: raise ValueError( f"'{specific_offsets_option}' must be specified when " f"{mode_name}_mode is 'specific-offsets'." ) if mode_value != "specific-offsets" and specific_offsets is not None: raise ValueError( f"'{specific_offsets_option}' must not be specified unless " f"{mode_name}_mode is 'specific-offsets'." ) _validate_scan_mode_options( mode_name="startup", mode_value=startup_mode, timestamp_millis=startup_timestamp_millis, specific_offsets=startup_specific_offsets, ) _validate_scan_mode_options( mode_name="bounded", mode_value=bounded_mode, timestamp_millis=bounded_timestamp_millis, specific_offsets=bounded_specific_offsets, ) t_env = get_or_create_table_environment() add_built_in_connector(t_env, "kafka") add_built_in_format(t_env, format) add_built_in_format(t_env, key_format) add_built_in_format(t_env, value_format) # Build default format descriptor format_builder = FormatDescriptor.for_format(format) if format_options: for key, value in format_options.items(): format_builder = format_builder.option(key, _to_str(value)) # Build connector builder = ( TableDescriptor.for_connector("kafka") .option("properties.bootstrap.servers", bootstrap_servers) .format(format_builder.build()) ) # Schema schema_builder = Schema.new_builder() for name, dtype in schema.items(): schema_builder.column(name, dtype._table_type) builder = builder.schema(schema_builder.build()) # Additional Kafka properties if properties: for key, value in properties.items(): builder = builder.option(f"properties.{key}", _to_str(value)) # Key format if key_format is not None: from pyflink.common.config_options import ConfigOptions key_format_opt = ( ConfigOptions.key("key.format").string_type().no_default_value() ) key_format_builder = FormatDescriptor.for_format(key_format) if key_format_options: for key, value in key_format_options.items(): key_format_builder = key_format_builder.option(key, _to_str(value)) builder = builder.format( key_format_builder.build(), format_option=key_format_opt, ) if key_fields is not None: builder = builder.option("key.fields", ";".join(key_fields)) if key_fields_prefix is not None: builder = builder.option("key.fields-prefix", key_fields_prefix) # Value format if value_format is not None: from pyflink.common.config_options import ConfigOptions value_format_opt = ( ConfigOptions.key("value.format").string_type().no_default_value() ) value_format_builder = FormatDescriptor.for_format(value_format) if value_format_options: for key, value in value_format_options.items(): value_format_builder = value_format_builder.option(key, _to_str(value)) builder = builder.format( value_format_builder.build(), format_option=value_format_opt, ) if value_fields_include is not None: builder = builder.option("value.fields-include", value_fields_include) # Source-specific options if topic is not None: if isinstance(topic, (list, tuple)): topic_str = ";".join(topic) else: topic_str = topic builder = builder.option("topic", topic_str) else: builder = builder.option("topic-pattern", topic_pattern) if group_id is not None: builder = builder.option("properties.group.id", group_id) if startup_mode is not None: builder = builder.option("scan.startup.mode", startup_mode) if startup_specific_offsets is not None: builder = builder.option( "scan.startup.specific-offsets", _format_specific_offsets(startup_specific_offsets), ) if startup_timestamp_millis is not None: builder = builder.option( "scan.startup.timestamp-millis", str(startup_timestamp_millis), ) if topic_partition_discovery_interval is not None: builder = builder.option( "scan.topic-partition-discovery.interval", topic_partition_discovery_interval, ) if header_filter is not None: builder = builder.option("scan.header-filter", header_filter) builder = builder.option( "scan.check.duplicated.group.id", str(check_duplicated_group_id).lower() ) # Bounded mode if bounded_mode is not None: builder = builder.option("scan.bounded.mode", bounded_mode) if bounded_timestamp_millis is not None: builder = builder.option( "scan.bounded.timestamp-millis", str(bounded_timestamp_millis), ) if bounded_specific_offsets is not None: builder = builder.option( "scan.bounded.specific-offsets", _format_specific_offsets(bounded_specific_offsets), ) # Kafka's KafkaDynamicTableFactory rejects anonymous sources (those created # via ``t_env.from_descriptor`` without a registered name), so we register # the descriptor as a named temporary table and resolve it back. _kafka_table_name = f"_pf_df_kafka_{uuid.uuid4().hex[:8]}" t_env.create_temporary_table(_kafka_table_name, builder.build()) table = t_env.from_path(_kafka_table_name) # Column projection if columns is not None: table = table.select(*[col(c) for c in columns]) return DataFrame(table)
[docs] def read_paimon( path: Optional[str] = None, *, schema: Optional[Dict[str, "DataType"]] = None, primary_key: Optional[Union[str, List[str]]] = None, columns: Optional[List[str]] = None, auto_create: bool = False, file_format: Literal["orc", "parquet", "avro", "lance"] = "parquet", bucket: int = 1, bucket_key: Optional[Union[str, List[str]]] = None, changelog_producer: Literal["none", "input", "full-compaction", "lookup"] = "none", full_compaction_delta_commits: Optional[int] = None, lookup_cache_max_memory_size: str = "256 MB", merge_engine: Literal[ "deduplicate", "partial-update", "aggregation" ] = "deduplicate", partial_update_ignore_delete: bool = False, ignore_delete: bool = False, partition_default_name: str = "__DEFAULT_PARTITION__", partition_expiration_check_interval: str = "1h", partition_expiration_time: Optional[str] = None, partition_timestamp_formatter: Optional[str] = None, partition_timestamp_pattern: Optional[str] = None, bounded_watermark: Optional[int] = None, mode: Literal[ "default", "latest-full", "latest", "compacted-full", "from-timestamp", "from-snapshot", ] = "default", snapshot_id: Optional[int] = None, timestamp_millis: Optional[int] = None, snapshot_num_retained_max: int = 2147483647, snapshot_num_retained_min: int = 10, snapshot_time_retained: str = "1h", infer_parallelism: bool = True, parallelism: Optional[int] = None, extra_options: Optional[Dict[str, Any]] = None, ) -> DataFrame: """ Read a Paimon table into a DataFrame. Uses the Paimon SQL connector under the hood. The function creates a temporary connector descriptor, so ``schema`` is required even when the Paimon files already exist at ``path``. Args: path: File-system path of the Paimon table. schema: Dict of ``{column_name: DataType}`` specifying the table schema. This parameter is required. primary_key: Optional primary key column name or list of column names. columns: Optional list of column names to read. If None, all columns from the schema are read. auto_create: Whether to create Paimon table files while constructing the temporary table if the target path has no existing Paimon table. Default is False. file_format: Format of Paimon data files. One of ``"orc"``, ``"parquet"``, ``"avro"``, or ``"lance"``. Default is ``"parquet"``. bucket: Bucket number for the file store. bucket_key: Column name or list of column names defining the Paimon distribution policy. Data is assigned to each bucket according to the hash value of these fields. When unset, Paimon uses the primary key, or the full row if no primary key exists. changelog_producer: Whether and how to double-write to changelog files. Changelog files keep data-change details and can be read directly during stream reads. ``"none"`` produces no changelog file, ``"input"`` writes input changes when flushing the memory table, ``"full-compaction"`` generates changelog files during full compaction, and ``"lookup"`` produces changelog by lookup before committing snapshots. full_compaction_delta_commits: Maximum number of committed snapshots between two full compactions. lookup_cache_max_memory_size: Memory size used by lookup cache and the lookup changelog producer cache. Default is ``"256 MB"``. merge_engine: Merge engine for tables with primary keys. ``"deduplicate"`` de-duplicates and keeps the last row, ``"partial-update"`` updates non-null fields, and ``"aggregation"`` aggregates rows with the same primary key. partial_update_ignore_delete: Whether to ignore delete records in partial-update mode. ignore_delete: Whether to ignore delete records. partition_default_name: Default partition name used when a dynamic partition column value is null or an empty string. partition_expiration_check_interval: Interval for checking expired partitions. partition_expiration_time: Partition retention time. When unset, partitions never expire. partition_timestamp_formatter: Formatter for converting partition time strings to timestamps. partition_timestamp_pattern: Pattern for extracting a time string from partition values. bounded_watermark: Stop producing source records when source watermark exceeds this value. mode: Source scanning behavior. ``"default"`` lets Paimon infer the actual startup mode: ``timestamp_millis`` selects ``"from-timestamp"``, ``snapshot_id`` selects ``"from-snapshot"``, and otherwise it behaves as ``"latest-full"``. ``"latest-full"`` produces the latest snapshot first and then reads changes in streaming mode. ``"latest"`` reads latest changes without an initial snapshot in streaming mode. ``"compacted-full"`` starts from the latest compacted snapshot. ``"from-timestamp"`` and ``"from-snapshot"`` start from the configured timestamp or snapshot. The deprecated ``"full"`` mode is intentionally not accepted. snapshot_id: Optional snapshot id used by ``mode="from-snapshot"``. timestamp_millis: Optional timestamp in milliseconds used by ``mode="from-timestamp"``. snapshot_num_retained_max: Maximum number of completed snapshots to retain. snapshot_num_retained_min: Minimum number of completed snapshots to retain. snapshot_time_retained: Maximum time to retain completed snapshots. infer_parallelism: Whether to infer scan source parallelism from the bucket count. parallelism: Custom scan source parallelism. If unset, the planner derives statement parallelism while considering global configuration. extra_options: Additional connector options forwarded through to the underlying Paimon connector. This is for options not exposed as named parameters of ``read_paimon``. If a key matches an option generated by a named parameter of `read_paimon`, the value in ``extra_options`` takes precedence. ``"connector"`` is reserved and must not be supplied. Returns: A new DataFrame representing the Paimon source. Example:: >>> import pyflink.dataframe as pf >>> >>> order_schema = { ... "order_id": pf.DataType.int64(), ... "user_id": pf.DataType.int64(), ... "status": pf.DataType.string(), ... "event_ts": pf.DataType.timestamp(3), ... } >>> >>> # Read the latest snapshot with a primary key and projection >>> orders = pf.read_paimon( ... "oss://bucket/warehouse/db.db/orders", ... schema=order_schema, ... primary_key="order_id", ... columns=["order_id", "status", "event_ts"], ... ) >>> >>> # Stream changes from a specific snapshot >>> changes = pf.read_paimon( ... "oss://bucket/warehouse/db.db/orders", ... schema=order_schema, ... primary_key="order_id", ... changelog_producer="lookup", ... mode="from-snapshot", ... snapshot_id=42, ... ) >>> >>> # Stream from a timestamp in milliseconds with explicit parallelism >>> changes = pf.read_paimon( ... "oss://bucket/warehouse/db.db/orders", ... schema=order_schema, ... primary_key="order_id", ... mode="from-timestamp", ... timestamp_millis=1710000000000, ... bounded_watermark=1710003600000, ... infer_parallelism=False, ... parallelism=4, ... ) >>> >>> # Read a table with bucket distribution >>> events = pf.read_paimon( ... "oss://bucket/warehouse/db.db/events", ... schema={ ... "dt": pf.DataType.string(), ... "user_id": pf.DataType.int64(), ... "payload": pf.DataType.string(), ... }, ... bucket=16, ... bucket_key=["dt", "user_id"], ... extra_options={ ... "snapshot.time-retained": "12h", ... }, ... ) """ def _validate_paimon_scan_options( *, mode: str, snapshot_id: Optional[int], timestamp_millis: Optional[int], ) -> None: if snapshot_id is not None and timestamp_millis is not None: raise ValueError( "'snapshot_id' and 'timestamp_millis' must not both be set." ) if mode == "from-snapshot" and snapshot_id is None: raise ValueError( "'snapshot_id' must be specified when mode is 'from-snapshot'." ) if mode == "from-timestamp" and timestamp_millis is None: raise ValueError( "'timestamp_millis' must be specified when mode is " "'from-timestamp'." ) if snapshot_id is not None and mode not in ("default", "from-snapshot"): raise ValueError( "'snapshot_id' must not be specified unless mode is " "'default' or 'from-snapshot'." ) if timestamp_millis is not None and mode not in ("default", "from-timestamp"): raise ValueError( "'timestamp_millis' must not be specified unless mode is " "'default' or 'from-timestamp'." ) from pyflink.table import TableDescriptor, Schema from pyflink.table.expressions import col from pyflink.dataframe.context import get_or_create_table_environment from pyflink.dataframe.util.artifacts import add_built_in_connector if schema is None: raise ValueError( "schema is required for read_paimon. " "Please provide a dict of {column_name: DataType}.\n" "Example:\n" " pf.read_paimon('/path/to/paimon-table', schema={\n" " 'id': pf.DataType.int64(),\n" " 'name': pf.DataType.string(),\n" " })" ) _validate_paimon_common_options( file_format=file_format, changelog_producer=changelog_producer, merge_engine=merge_engine, ) _validate_allowed_value(mode, "mode", _PAIMON_SCAN_MODES) _validate_paimon_scan_options( mode=mode, snapshot_id=snapshot_id, timestamp_millis=timestamp_millis, ) primary_key_columns = _normalize_primary_key( primary_key, schema, ) t_env = get_or_create_table_environment() add_built_in_connector(t_env, "paimon") schema_builder = Schema.new_builder() primary_key_set = set(primary_key_columns) for name, dtype in schema.items(): table_type = dtype._table_type if name in primary_key_set: table_type = table_type.not_null() schema_builder.column(name, table_type) if primary_key_columns: schema_builder.primary_key(*primary_key_columns) paimon_options = _paimon_common_options( path=path, auto_create=auto_create, file_format=file_format, bucket=bucket, bucket_key=bucket_key, changelog_producer=changelog_producer, full_compaction_delta_commits=full_compaction_delta_commits, lookup_cache_max_memory_size=lookup_cache_max_memory_size, merge_engine=merge_engine, partial_update_ignore_delete=partial_update_ignore_delete, ignore_delete=ignore_delete, partition_default_name=partition_default_name, partition_expiration_check_interval=partition_expiration_check_interval, partition_expiration_time=partition_expiration_time, partition_timestamp_formatter=partition_timestamp_formatter, partition_timestamp_pattern=partition_timestamp_pattern, snapshot_num_retained_max=snapshot_num_retained_max, snapshot_num_retained_min=snapshot_num_retained_min, snapshot_time_retained=snapshot_time_retained, ) paimon_options.extend( [ ("scan.bounded.watermark", bounded_watermark), ("scan.mode", mode), ("scan.snapshot-id", snapshot_id), ("scan.timestamp-millis", timestamp_millis), ("scan.infer-parallelism", infer_parallelism), ("scan.parallelism", parallelism), ] ) builder = TableDescriptor.for_connector("paimon").schema(schema_builder.build()) builder = _add_connector_options(builder, paimon_options) builder = _add_extra_options(builder, extra_options) table = t_env.from_descriptor(builder.build()) if columns is not None: table = table.select(*[col(c) for c in columns]) return DataFrame(table)
[docs] def read_odps( endpoint: str, *, schema: Optional[Dict[str, "DataType"]] = None, columns: Optional[List[str]] = None, # General options tunnel_endpoint: Optional[str] = None, project: str, schema_name: Optional[str] = None, table_name: str, access_id: str, access_key: str, partition: Optional[Union[str, Tuple[str, Any], List[Tuple[str, Any]]]] = None, compress_algorithm: Literal["RAW", "ZLIB", "SNAPPY"] = "SNAPPY", quota_name: Optional[str] = None, # Source table options max_partition_count: int = 100, use_arrow: bool = False, split_size: str = "256mb", compress_codec: Literal["", "ZSTD", "LZ4_FRAME"] = "", dynamic_load_balance: bool = False, # Incremental source options start_partition: Optional[ Union[str, Tuple[str, Any], List[Tuple[str, Any]]] ] = None, subscribe_interval_in_sec: int = 30, modified_table_operation: Literal["NONE", "SKIP"] = "NONE", # Dimension table options cache: Literal["ALL"] = "ALL", cache_size: int = 100000, cache_ttl_ms: int = _ODPS_LONG_MAX_VALUE, cache_reload_time_blacklist: Optional[str] = None, max_load_retries: int = 10, ) -> DataFrame: """ Read a MaxCompute (ODPS) table into a DataFrame. Uses the ODPS SQL connector under the hood. The connector options are exposed with Pythonic snake_case names while preserving the order of the ODPS documentation. Args: endpoint: MaxCompute endpoint. schema: Dict of ``{column_name: DataType}`` specifying the DataFrame schema. This parameter is required. columns: Optional list of column names to read. If None, all columns from ``schema`` are read. tunnel_endpoint: MaxCompute Tunnel endpoint. If unset, MaxCompute allocates tunnel connections through Server Load Balancer (SLB). project: MaxCompute project name. schema_name: Required only when the MaxCompute schema feature is enabled. Set this to the table's schema name. table_name: MaxCompute table name. access_id: AccessKey ID used to access MaxCompute. access_key: AccessKey secret used to access MaxCompute. partition: Partition name in the MaxCompute table. Not required for non-partitioned tables or incremental sources. Examples: - To read partition column ``dt`` with value ``20220901``, pass ``("dt", "20220901")``. - To read source partitions whose ``dt`` values start with ``202209``, pass ``("dt", "202209*")``. - To read all source partitions for column ``dt``, pass ``("dt", "*")``. - For a three-level partition table ``dt``/``hh``/``mm``, to read ``dt=20220901,hh=08`` and any ``mm`` value, pass ``[("dt", "20220901"), ("hh", "08")]`` or ``[("dt", "20220901"), ("hh", "08"), ("mm", "*")]``. compress_algorithm: Compression algorithm for MaxCompute Tunnel. Valid values are ``"RAW"`` (no compression), ``"ZLIB"``, and ``"SNAPPY"``. ``"SNAPPY"`` improves throughput by approximately 50% compared to ``"ZLIB"`` in test scenarios. quota_name: Quota name for exclusive MaxCompute Tunnel resource groups. If specified, remove ``tunnel_endpoint``; otherwise the tunnel specified by ``tunnel_endpoint`` takes precedence. max_partition_count: Maximum number of partitions to read from. Reading from too many partitions can overload MaxCompute and slow job startup. Increase this only when your workload requires it. use_arrow: Read data using the Arrow format, which calls the MaxCompute storage API. Batch deployments only. split_size: Amount of data pulled per split when using the Arrow format. Batch deployments only. compress_codec: Compression algorithm when reading with the Arrow format. Valid values are ``""`` (none), ``"ZSTD"``, and ``"LZ4_FRAME"``. Specifying a codec improves throughput over no compression. Batch deployments only. dynamic_load_balance: Enable dynamic shard allocation to improve processing performance and reduce overall read time. This may cause data skew because different operators read inconsistent amounts of data. Batch deployments only. start_partition: Start partition for incremental reads. Setting this enables incremental source mode. When specified, ``partition`` is ignored. For multi-level partitioned tables, configure partition column values in descending order by level. For example, for a three-level partitioned table with partition key columns ``dt``, ``hh``, and ``mm``, to start from ``dt=20220901``, ``hh=08``, and ``mm=10``, set this parameter to ``"dt=20220901,hh=08,mm=10"`` or ``[("dt", "20220901"), ("hh", "08"), ("mm", "10")]``. For a single partition key, you can also use ``("dt", "20220901")``. subscribe_interval_in_sec: Polling interval in seconds. modified_table_operation: Action when a partition is modified during reading. ``"NONE"`` requires updating ``start_partition`` to skip the unavailable partition and restarting without state; ``"SKIP"`` automatically skips the unavailable partition when resuming. In either mode, data already read from the modified partition is retained and unread data is discarded. cache: Cache policy. Must be set to ``"ALL"``. All dimension table data is loaded into cache before the deployment runs, lookups search the cache only, and the cache reloads after entries expire. cache_size: Maximum rows to cache. Large caches consume significant JVM heap memory and slow startup and cache refresh. Increase this only when your workload requires it. cache_ttl_ms: Cache timeout in milliseconds. cache_reload_time_blacklist: Time periods during which the cache is not refreshed. Use this during peak traffic periods to prevent deployment instability from cache refreshes. max_load_retries: Maximum retries for the initial cache load on deployment startup. If retries are exhausted, the deployment fails. Returns: A new DataFrame representing the ODPS source. Example:: >>> import pyflink.dataframe as pf >>> df = pf.read_odps( ... "http://service.cn-hangzhou.maxcompute.aliyun.com/api", ... schema={ ... "id": pf.DataType.int64(), ... "payload": pf.DataType.string(), ... }, ... project="my_project", ... table_name="events", ... access_id="${secret_values.ak_id}", ... access_key="${secret_values.ak_secret}", ... partition="ds=20260428", ... ) """ from pyflink.table import TableDescriptor, Schema from pyflink.table.expressions import col from pyflink.dataframe.util.artifacts import add_built_in_connector from pyflink.dataframe.context import get_or_create_table_environment if schema is None: raise ValueError( "schema is required for read_odps. " "Please provide a dict of {column_name: DataType}." ) _validate_allowed_value( compress_algorithm, "compress_algorithm", _ODPS_COMPRESS_ALGORITHMS, ) _validate_allowed_value( compress_codec, "compress_codec", _ODPS_COMPRESS_CODECS, ) _validate_allowed_value( modified_table_operation, "modified_table_operation", _ODPS_MODIFIED_TABLE_OPERATIONS, ) _validate_allowed_value(cache, "cache", _ODPS_CACHE_VALUES) partition = _format_odps_partition(partition) start_partition = _format_odps_partition( start_partition, name="start_partition", ) if partition is not None and start_partition is not None: raise ValueError( "'partition' and 'start_partition' cannot be specified for " "ODPS source at the same time." ) _validate_odps_common_options( endpoint=endpoint, project=project, table_name=table_name, access_id=access_id, access_key=access_key, tunnel_endpoint=tunnel_endpoint, quota_name=quota_name, ) options = [ ("endpoint", endpoint), ("tunnelEndpoint", tunnel_endpoint), ("project", project), ("schemaName", schema_name), ("tableName", table_name), ("accessId", access_id), ("accessKey", access_key), ("partition", partition), ("compressAlgorithm", compress_algorithm), ("quotaName", quota_name), ("maxPartitionCount", max_partition_count), ("useArrow", use_arrow), ("splitSize", split_size), ("compressCodec", compress_codec or None), ("dynamicLoadBalance", dynamic_load_balance), ("startPartition", start_partition), ("subscribeIntervalInSec", subscribe_interval_in_sec), ("modifiedTableOperation", modified_table_operation), ("cache", cache), ("cacheSize", cache_size), ("cacheTTLMs", cache_ttl_ms), ("cacheReloadTimeBlackList", cache_reload_time_blacklist), ("maxLoadRetries", max_load_retries), ] t_env = get_or_create_table_environment() add_built_in_connector(t_env, "odps") builder = TableDescriptor.for_connector("odps") schema_builder = Schema.new_builder() for name, dtype in schema.items(): schema_builder.column(name, dtype._table_type) builder = builder.schema(schema_builder.build()) builder = _add_connector_options(builder, options) table = t_env.from_descriptor(builder.build()) if columns is not None: table = table.select(*[col(c) for c in columns]) return DataFrame(table)
[docs] def read_sls( endpoint: str, *, project: str, logstore: str, access_id: Optional[str] = None, access_key: Optional[str] = None, schema: Optional[Dict[str, "DataType"]] = None, columns: Optional[List[str]] = None, enable_new_source: bool = True, shard_discovery_interval_ms: int = 60000, startup_mode: Literal[ "timestamp", "latest", "earliest", "consumer_group" ] = "timestamp", start_time: Optional[Union[str, datetime]] = None, stop_time: Optional[Union[str, datetime]] = None, time_zone: Optional[str] = None, consumer_group: Optional[str] = None, max_retries: int = 3, batch_get_size: int = 100, exit_after_finish: bool = False, processor: Optional[str] = None, extra_options: Optional[Dict[str, str]] = None, ) -> DataFrame: """Read an SLS (Aliyun Log Service) logstore into a DataFrame. Wraps the Ververica SLS source connector. The connector options are exposed with Pythonic snake_case names, in the same order as the SLS documentation's "WITH parameters" table. Args: endpoint: SLS private-network service endpoint address. project: Name of the SLS project. logstore: SLS LogStore (or MetricStore) name. access_id: Alibaba Cloud account AccessKey ID. If both ``access_id`` and ``access_key`` are ``None`` (the default), the SLS connector falls back to the VVP STS token. access_key: Alibaba Cloud account AccessKey Secret. See ``access_id`` for the STS fallback behaviour. schema: Dict of ``{column_name: DataType}`` specifying the source's schema. Required. columns: Optional list of column names to read. If ``None``, all columns from ``schema`` are read. enable_new_source: When ``True`` (the default), use the FLIP-27 SLS source that auto-adapts to shard changes. When ``False``, use the legacy source implementation. shard_discovery_interval_ms: Polling interval (milliseconds) for shard-change discovery, applied when ``enable_new_source`` is ``True``. A non-positive value disables shard discovery. startup_mode: Where the source begins consuming. - ``"timestamp"`` (default): start from ``start_time``; if unset, the SLS connector uses the current time. - ``"latest"``: skip to the most recent log group. - ``"earliest"``: start from the oldest available log group. - ``"consumer_group"``: resume from the offset recorded for ``consumer_group`` (which is required in this mode). start_time: Consumption start time, only valid when ``startup_mode='timestamp'``. Accepts a naive ``datetime`` (formatted as ``"yyyy-MM-dd HH:mm:ss"``) or a string in that format. ``datetime`` with ``tzinfo`` is rejected; use the ``time_zone`` parameter to specify the timezone explicitly. stop_time: Consumption end time. Use for historical replay only. Accepts the same forms as ``start_time``. time_zone: Timezone in which ``start_time`` / ``stop_time`` strings are interpreted by the SLS connector (e.g. ``"Asia/Shanghai"``, ``"UTC"``). When unset, the SLS connector falls back to the JobManager JVM's default timezone. consumer_group: Consumer-group name used to record consumption progress. Required when ``startup_mode='consumer_group'``. In other startup modes, ``consumer_group`` is still forwarded to the connector and used only for commit tracking — it does NOT change where consumption starts. max_retries: Number of retries after a failed SLS read. batch_get_size: Number of LogGroups fetched per request. Must be in [1, 1000]; exceeding 1000 errors out at the Java side. exit_after_finish: When ``True``, the Flink job exits after the source finishes consuming. processor: SLS consumer processor (SPL) used to pre-filter data before ingest. extra_options: Additional connector options forwarded through to the underlying SLS connector. This is for options not exposed as named parameters of ``read_sls``, e.g., ``regionId``, ``signVersion``, ``shardModDivisor``, ``shardModRemainder``. If a key matches an option generated by a named parameter of `read_sls`, the value in ``extra_options`` takes precedence. ``"connector"`` is reserved and must not be supplied. Returns: A DataFrame backed by the SLS source. Raises: ValueError: If ``schema`` is None, ``startup_mode`` is invalid, ``batch_get_size`` is out of range, ``start_time`` is set with a non-timestamp ``startup_mode``, ``consumer_group`` is missing when ``startup_mode='consumer_group'``, ``start_time`` / ``stop_time`` is an aware ``datetime`` (with ``tzinfo``), or ``extra_options`` contains a ``connector`` key. Examples: Minimal source (timestamp mode, defaults):: import pyflink.dataframe as pf df = pf.read_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, schema={ "__timestamp__": pf.DataType.bigint(), "level": pf.DataType.string(), "message": pf.DataType.string(), } ) Consumer-group mode (resume from recorded offset):: df = pf.read_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, schema={"level": pf.DataType.string()}, startup_mode="consumer_group", consumer_group="my-analytics-job" ) Historical replay with ``datetime`` bounds:: from datetime import datetime df = pf.read_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, schema={"message": pf.DataType.string()}, start_time=datetime(2026, 1, 15, 0, 0, 0), stop_time=datetime(2026, 1, 16, 0, 0, 0), time_zone="Asia/Shanghai", exit_after_finish=True ) Forward-compat: passing a non-named option (and overriding a named one) via ``extra_options``:: df = pf.read_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, schema={"message": pf.DataType.string()}, extra_options={ "regionId": "cn-hangzhou", "compressType": "lz4", "batchGetSize": "256", # overrides default 100 } ) """ from pyflink.dataframe.context import get_or_create_table_environment from pyflink.dataframe.util.artifacts import add_built_in_connector from pyflink.table import TableDescriptor, Schema from pyflink.table.expressions import col _validate_allowed_value(startup_mode, "startup_mode", _SLS_STARTUP_MODES) if not (1 <= batch_get_size <= 1000): raise ValueError(f"batch_get_size must be in [1, 1000], got: {batch_get_size}") if start_time is not None and startup_mode != "timestamp": raise ValueError("start_time is only valid when startup_mode='timestamp'") if startup_mode == "consumer_group" and not consumer_group: raise ValueError( "consumer_group is required when startup_mode='consumer_group'" ) if schema is None: raise ValueError("'schema' must be specified for SLS source.") options = [ ("endpoint", endpoint), ("project", project), ("logstore", logstore), ("enableNewSource", str(enable_new_source)), ("shardDiscoveryIntervalMs", str(shard_discovery_interval_ms)), ("startupMode", startup_mode), ("maxRetries", str(max_retries)), ("batchGetSize", str(batch_get_size)), ("exitAfterFinish", str(exit_after_finish)), ] if access_id is not None: options.append(("accessId", access_id)) if access_key is not None: options.append(("accessKey", access_key)) if start_time is not None: options.append(("startTime", _format_time(start_time))) if stop_time is not None: options.append(("stopTime", _format_time(stop_time))) if time_zone is not None: options.append(("timeZone", time_zone)) if consumer_group is not None: options.append(("consumerGroup", consumer_group)) if processor is not None: options.append(("processor", processor)) t_env = get_or_create_table_environment() add_built_in_connector(t_env, "sls") builder = TableDescriptor.for_connector("sls") schema_builder = Schema.new_builder() for name, dtype in schema.items(): schema_builder.column(name, dtype._table_type) builder = builder.schema(schema_builder.build()) builder = _add_connector_options(builder, options) builder = _add_extra_options(builder, extra_options) table = t_env.from_descriptor(builder.build()) if columns is not None: table = table.select(*[col(c) for c in columns]) return DataFrame(table)
[docs] def read_hologres( endpoint: str, *, db_name: str, table_name: str, username: str, password: str, schema: Optional[Dict[str, "DataType"]] = None, primary_key: Optional[Union[str, List[str]]] = None, columns: Optional[List[str]] = None, connection_pool_size: int = 5, connection_pool_name: str = "default", connection_fixed: Optional[bool] = None, connection_max_idle_ms: int = 60000, connection_ssl_mode: Literal[ "disable", "require", "verify-ca", "verify-full" ] = "disable", connection_ssl_root_cert_location: Optional[str] = None, retry_count: int = 10, retry_sleep_step_ms: int = 5000, meta_cache_ttl_ms: int = 600000, serverless_computing: bool = False, binlog: bool = True, binlog_read_mode: Literal["AUTO", "JDBC", "HOLOHUB"] = "AUTO", binlog_change_log_mode: Literal[ "ALL", "UPSERT", "ALL_AS_APPEND_ONLY" ] = "ALL", binlog_startup_mode: Literal[ "INITIAL", "EARLIEST_OFFSET", "TIMESTAMP", "LATEST_OFFSET" ] = "INITIAL", binlog_batch_size: int = 512, binlog_request_timeout_ms: int = 300000, binlog_project_columns: Optional[bool] = None, binlog_compression: Optional[bool] = None, partition_binlog_mode: Literal[ "DISABLE", "STATIC", "DYNAMIC" ] = "DISABLE", partition_binlog_lateness_timeout_minutes: int = 60, partition_values_to_read: Optional[Union[str, List[str]]] = None, start_time: Optional[Union[str, datetime]] = None, fetch_size: int = 512, scan_timeout_seconds: int = 600, filter_push_down: bool = False, binlog_filter_push_down: bool = False, prefer_physical_column_over_metadata_column: bool = False, lookup_batch_size: int = 256, lookup_timeout_ms: int = 0, lookup_column_table: bool = False, lookup_insert_if_not_exists: bool = False, lookup_async: bool = True, lookup_filter_push_down: bool = False, cache: Literal["None", "LRU"] = "None", cache_size: int = 100000, cache_ttl_ms: Optional[int] = None, cache_empty: bool = True, async_: bool = False, extra_options: Optional[Dict[str, Any]] = None, ) -> DataFrame: """Read a Hologres table into a DataFrame. Wraps the Ververica Hologres source connector. Hologres source supports both bulk scans and binlog streaming; the consumption mode is controlled by ``binlog`` and the binlog-related parameters. The connector options are exposed with Pythonic snake_case names, in the order the Hologres documentation's "WITH parameters" table presents them. Args: endpoint: Hologres service endpoint (use the VPC endpoint for VVR). db_name: Hologres database name. Optionally suffixed with a compute group (e.g. ``"my_db/my_group"``). table_name: Hologres table name. Use ``"schema.tableName"`` to read from a non-public schema. username: Hologres account username (or AccessKey ID). password: Hologres account password (or AccessKey Secret). schema: Dict of ``{column_name: DataType}`` specifying the source schema. Required. primary_key: Optional primary key column name or list of column names. Set this when the upstream Hologres table has a primary key — the streaming change-log behavior depends on it. columns: Optional list of column names to read. If ``None``, all columns from ``schema`` are read. connection_pool_size: JDBC connection pool size. Applies to both sink and dim-table workloads. connection_pool_name: Connection pool name; tables sharing the same name within a Flink TaskManager share connections. connection_fixed: When ``True``, enable Hologres lightweight (fixed) connection mode. When ``None`` (the default), the connector picks a value based on the Hologres engine version. connection_max_idle_ms: Idle time (milliseconds) before a JDBC connection in the pool is released. connection_ssl_mode: SSL transport mode. One of ``"disable"``, ``"require"``, ``"verify-ca"``, or ``"verify-full"``. The verify-* modes require ``connection_ssl_root_cert_location``. connection_ssl_root_cert_location: Path to the CA certificate file. Files must be uploaded to VVP under ``/flink/usrlib/<name>``. retry_count: Number of retries on connection failures. retry_sleep_step_ms: Linear back-off step (milliseconds) added per retry attempt. meta_cache_ttl_ms: TTL (milliseconds) of the local Hologres ``TableSchema`` cache. serverless_computing: When ``True``, run scans on Hologres serverless compute (only available for batch reads). binlog: When ``True`` (the default), consume binlog changes; when ``False``, run a bulk scan only. binlog_read_mode: Binlog read transport. ``"AUTO"`` lets the connector pick between JDBC and HoloHub. binlog_change_log_mode: How binlog records are mapped to Flink change-log rows. ``"ALL"`` (the default) emits +I/-U/+U/-D; ``"UPSERT"`` emits +I/+U/-D and requires a ``primary_key``; ``"ALL_AS_APPEND_ONLY"`` treats every event as +I. binlog_startup_mode: Where the binlog consumer starts. ``"INITIAL"`` first snapshots, then tails the binlog; ``"EARLIEST_OFFSET"`` and ``"LATEST_OFFSET"`` start at the binlog ends; ``"TIMESTAMP"`` starts at ``start_time`` (required in this mode). Setting ``start_time`` alone is equivalent — the connector promotes the startup mode to TIMESTAMP whenever ``start_time`` is provided. binlog_batch_size: Number of rows read per binlog batch. binlog_request_timeout_ms: Binlog request timeout (milliseconds). binlog_project_columns: When ``True``, request only the declared columns from the binlog stream. ``None`` (default) lets the server pick. binlog_compression: When ``True``, enable LZ4 compression on the binlog transport. ``None`` (default) lets the server pick. partition_binlog_mode: How to consume binlog for partitioned tables. ``"DISABLE"`` (default) treats partition tables as ordinary tables. ``"STATIC"`` reads a fixed set of partitions (combine with ``partition_values_to_read``). ``"DYNAMIC"`` adapts to new partitions automatically. partition_binlog_lateness_timeout_minutes: Maximum lateness, in minutes, tolerated when ``partition_binlog_mode='DYNAMIC'``. partition_values_to_read: Partition values to consume when ``partition_binlog_mode='STATIC'``. Accepts a list of strings (e.g. ``["ds=20240101", "ds=20240102"]``) or a comma-separated string (e.g. ``"ds=20240101,ds=20240102"``). Wildcards / regex are not supported. start_time: Binlog consumption start time. When set, the Hologres connector forces TIMESTAMP startup and ignores ``binlog_startup_mode``, so it's fine to leave that argument at its default. Accepts a naive ``datetime`` without ``tzinfo`` (formatted as ``"yyyy-MM-dd HH:mm:ss"``) or a string already in that format. The Hologres connector parses this string in the JobManager JVM's default timezone, so the wall-clock value you pass must already be expressed in that zone. fetch_size: JDBC fetch size for bulk scans. scan_timeout_seconds: Bulk-scan request timeout, in seconds. Default is 600 (10 minutes), matching the Hologres connector's own default for native SQL DDL. filter_push_down: When ``True``, push filters down during bulk scans. Mutually exclusive with ``binlog_filter_push_down``. binlog_filter_push_down: When ``True``, push filters down on the binlog stream. prefer_physical_column_over_metadata_column: When ``True``, prefer a physical column over a connector-defined metadata column of the same name. Effective only in binlog mode. lookup_batch_size: Maximum number of rows per dim-table point-query batch. Effective only when the DataFrame is used as a dim table in a downstream lookup join. lookup_timeout_ms: Per-lookup timeout (milliseconds). ``0`` means no timeout. lookup_column_table: When ``True``, allow using a column-store Hologres table as a dim table. Performance is typically poor and the connector logs a warning. lookup_insert_if_not_exists: When ``True``, insert the current join key into Hologres if the dim-table lookup misses. lookup_async: When ``True`` (default), enable asynchronous dim-table lookups. Async lookup is enabled when either ``lookup_async`` or ``async_`` is ``True``. lookup_filter_push_down: When ``True``, push dim-table filters down to Hologres (only column-vs-constant comparisons are pushed down). Requires VVR 11.4+. cache: Dim-table cache policy. ``"None"`` (the default) disables caching; ``"LRU"`` enables an in-memory LRU cache. cache_size: Maximum number of rows held by the LRU cache. Default is 100000, matching the Hologres connector's own default for native SQL DDL. cache_ttl_ms: Cache entry TTL (milliseconds). ``None`` (the default) lets the connector pick based on ``cache``. cache_empty: When ``True`` (default), also cache empty lookup results. Disable when the upstream Hologres table is constantly receiving new rows that joins should pick up. async_: Specifies whether to return results asynchronously. Note, async lookup is controlled by both this and ``lookup_async``. To disable async lookup, you need to set both to ``False``. extra_options: Additional connector options forwarded through to the underlying Hologres connector. This is for options not exposed as named parameters of ``read_hologres``, e.g., ``connection.direct.enabled``, ``connection.max-alive-ms``. If a key matches an option generated by a named parameter of ``read_hologres``, the value in ``extra_options`` takes precedence. ``"connector"`` is reserved and must not be supplied. Returns: A DataFrame backed by the Hologres source. Raises: ValueError: If ``schema`` is missing; if any enum value is invalid (including ``cache``); if ``binlog_startup_mode='TIMESTAMP'`` without ``start_time``; if ``start_time`` is an aware ``datetime`` (with ``tzinfo``); if ``binlog_change_log_mode='UPSERT'`` without ``primary_key``; if ``connection_ssl_mode`` is ``"verify-ca"`` or ``"verify-full"`` without ``connection_ssl_root_cert_location``; if ``lookup_filter_push_down`` is enabled together with either ``filter_push_down`` or ``binlog_filter_push_down``, or if ``filter_push_down`` and ``binlog_filter_push_down`` are both enabled; if ``partition_values_to_read`` is set outside STATIC mode; or if ``extra_options`` contains a ``"connector"`` key. Examples: Streaming binlog source (INITIAL mode, defaults):: import pyflink.dataframe as pf df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.orders", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={ "order_id": pf.DataType.bigint(), "buyer": pf.DataType.string(), "amount": pf.DataType.decimal(20, 4), }, primary_key="order_id", ) Bulk scan only (no binlog), with column projection:: df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.user_profiles", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={ "user_id": pf.DataType.bigint(), "city": pf.DataType.string(), "country": pf.DataType.string(), }, primary_key="user_id", binlog=False, columns=["user_id", "city"], filter_push_down=True, ) Time-bounded binlog replay (TIMESTAMP startup):: from datetime import datetime df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.events", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={"event_id": pf.DataType.bigint()}, binlog_startup_mode="TIMESTAMP", start_time=datetime(2026, 1, 15, 0, 0, 0), binlog_change_log_mode="ALL", ) Static-partitioned binlog (consume only two partitions):: df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.daily_logs", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={"day": pf.DataType.string(), "msg": pf.DataType.string()}, partition_binlog_mode="STATIC", partition_values_to_read=["2026-01-15", "2026-01-16"], ) Dim-table source for a downstream lookup join (LRU cache, async lookup tuned):: dim = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.user_profiles", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={ "user_id": pf.DataType.bigint(), "country": pf.DataType.string(), }, primary_key="user_id", binlog=False, cache="LRU", cache_size=50000, cache_ttl_ms=600000, lookup_async=True, lookup_batch_size=512, ) Forward-compat: tune a non-named option (and override a named one) via ``extra_options``:: df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.orders", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={"order_id": pf.DataType.bigint()}, fetch_size=512, extra_options={ "hologres.server.version": "3.1", "source.scan.fetch-size": "8192", # overrides default }, ) """ from pyflink.dataframe.context import get_or_create_table_environment from pyflink.dataframe.util.artifacts import add_built_in_connector from pyflink.table import TableDescriptor, Schema from pyflink.table.expressions import col _validate_read_hologres_options( binlog_read_mode=binlog_read_mode, binlog_change_log_mode=binlog_change_log_mode, binlog_startup_mode=binlog_startup_mode, partition_binlog_mode=partition_binlog_mode, connection_ssl_mode=connection_ssl_mode, connection_ssl_root_cert_location=connection_ssl_root_cert_location, start_time=start_time, filter_push_down=filter_push_down, binlog_filter_push_down=binlog_filter_push_down, lookup_filter_push_down=lookup_filter_push_down, partition_values_to_read=partition_values_to_read, cache=cache, primary_key=primary_key, ) if schema is None: raise ValueError("'schema' must be specified for Hologres source.") def _format_partition_values(value): if value is None: return None if isinstance(value, str): # An empty string would be joined back into [""] on the Java # side and trip HologresUtils.getPartitionsToRead with an # opaque "<empty> is not a valid partition value" runtime # error; reject up-front. if not value: raise ValueError( "partition_values_to_read must not be empty." ) return value if isinstance(value, list): if not value: raise ValueError( "partition_values_to_read must not be empty." ) for item in value: if not isinstance(item, str) or not item: raise ValueError( "partition_values_to_read entries must be non-empty strings." ) return ",".join(value) raise TypeError( "'partition_values_to_read' must be a string or list of strings." ) options = [ ("endpoint", endpoint), ("dbname", db_name), ("tablename", table_name), ("username", username), ("password", password), ("connection.pool.size", str(connection_pool_size)), ("connection.pool.name", connection_pool_name), ("connection.max-idle-ms", str(connection_max_idle_ms)), ("connection.ssl.mode", connection_ssl_mode), ("retry-count", str(retry_count)), ("retry-sleep-step-ms", str(retry_sleep_step_ms)), ("meta-cache-ttl-ms", str(meta_cache_ttl_ms)), ("serverless-computing.enabled", str(serverless_computing)), ("source.binlog", str(binlog)), ("source.binlog.read-mode", binlog_read_mode), ("source.binlog.change-log-mode", binlog_change_log_mode), ("source.binlog.startup-mode", binlog_startup_mode), ("source.binlog.batch-size", str(binlog_batch_size)), ("source.binlog.request-timeout-ms", str(binlog_request_timeout_ms)), ("source.binlog.partition-binlog-mode", partition_binlog_mode), ( "source.binlog.partition-binlog-lateness-timeout-minutes", str(partition_binlog_lateness_timeout_minutes), ), ("source.scan.fetch-size", str(fetch_size)), ("source.scan.timeout-seconds", str(scan_timeout_seconds)), ("source.scan.filter-push-down.enabled", str(filter_push_down)), ( "source.binlog.filter-push-down.enabled", str(binlog_filter_push_down), ), ( "scan.prefer.physical-column.over.metadata-column", str(prefer_physical_column_over_metadata_column), ), ("lookup.read.batch-size", str(lookup_batch_size)), ("lookup.read.timeout-ms", str(lookup_timeout_ms)), ("lookup.read.column-table.enabled", str(lookup_column_table)), ("lookup.insert-if-not-exists", str(lookup_insert_if_not_exists)), ("lookup.async", str(lookup_async)), ("lookup.filter-push-down.enabled", str(lookup_filter_push_down)), ("cache", cache), ("cacheSize", str(cache_size)), ("cacheEmpty", str(cache_empty)), ("async", str(async_)), ] if connection_fixed is not None: options.append(("connection.fixed.enabled", str(connection_fixed))) if connection_ssl_root_cert_location is not None: options.append( ( "connection.ssl.root-cert.location", connection_ssl_root_cert_location, ) ) if binlog_project_columns is not None: options.append( ("source.binlog.project-columns.enabled", str(binlog_project_columns)) ) if binlog_compression is not None: options.append( ("source.binlog.compression.enabled", str(binlog_compression)) ) formatted_partition_values = _format_partition_values(partition_values_to_read) if formatted_partition_values is not None: options.append( ("source.binlog.partition-values-to-read", formatted_partition_values) ) if start_time is not None: options.append(("startTime", _format_time(start_time))) if cache_ttl_ms is not None: options.append(("cacheTTLMs", str(cache_ttl_ms))) primary_key_columns = _normalize_primary_key(primary_key, schema) t_env = get_or_create_table_environment() add_built_in_connector(t_env, "hologres") schema_builder = Schema.new_builder() primary_key_set = set(primary_key_columns) for name, dtype in schema.items(): table_type = dtype._table_type if name in primary_key_set: table_type = table_type.not_null() schema_builder.column(name, table_type) if primary_key_columns: schema_builder.primary_key(*primary_key_columns) builder = TableDescriptor.for_connector("hologres").schema( schema_builder.build() ) builder = _add_connector_options(builder, options) builder = _add_extra_options(builder, extra_options) table = t_env.from_descriptor(builder.build()) if columns is not None: table = table.select(*[col(c) for c in columns]) return DataFrame(table)
[docs] def read_custom( connector: str, *, schema: Optional[Dict[str, "DataType"]] = None, primary_key: Optional[Union[str, List[str]]] = None, columns: Optional[List[str]] = None, options: Optional[Dict[str, Any]] = None, ) -> DataFrame: """ Read data from a custom connector into a DataFrame. This is a generic entrypoint for connectors that are not covered by the dedicated ``read_*`` helpers (e.g. ``read_parquet``, ``read_kafka``, ``read_odps``). It targets SQL connectors that are discoverable through Flink's standard factory mechanism. Args: connector: Factory identifier of the connector. This is the same value users put in SQL DDL ``WITH ('connector' = '...')``. For example, ``"datagen"``, ``"filesystem"``, or the name of a custom connector registered on the platform. schema: Dict of ``{column_name: DataType}`` describing the schema of the source. Required. primary_key: Optional primary key column name or list of column names. Set this only for connectors that expect primary key constraints in the table schema. columns: Optional list of column names to read (projection pushdown). If ``None``, all columns from ``schema`` are read. options: Optional dict of connector and table options. Each key is forwarded as-is with the same name it would have in ``WITH (...)`` in SQL DDL. Values are converted to strings. ``"connector"`` is reserved and must be specified via the ``connector`` argument. Returns: A new DataFrame backed by the configured source. Example:: >>> import pyflink.dataframe as pf >>> >>> # Read with a connector identified by ``"my-custom"`` (uploaded >>> # connector JAR registered with factory identifier ``my-custom``). >>> df = pf.read_custom( ... "my-custom", ... schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... }, ... options={ ... "host": "example.com", ... "port": "9000", ... }, ... ) >>> >>> # Read with a format and format options >>> df = pf.read_custom( ... "my-custom", ... schema={"id": pf.DataType.int64()}, ... primary_key="id", ... options={ ... "endpoint": "example.com:9000", ... "format": "json", ... "json.ignore-parse-errors": "true", ... }, ... ) """ from pyflink.table import TableDescriptor, Schema from pyflink.table.expressions import col from pyflink.dataframe.context import get_or_create_table_environment from pyflink.dataframe.util.artifacts import add_built_in_connector if not isinstance(connector, str) or not connector: raise ValueError( "'connector' must be a non-empty string identifying the " "connector (the same value used as 'connector' in SQL DDL)." ) if schema is None: raise ValueError( "schema is required for read_custom. " "Please provide a dict of {column_name: DataType}.\n" "Example:\n" " pf.read_custom('my_connector', schema={\n" " 'id': pf.DataType.int64(),\n" " 'name': pf.DataType.string(),\n" " })" ) connector_options = _collect_custom_options(options) primary_key_columns = _normalize_primary_key( primary_key, schema, ) t_env = get_or_create_table_environment() # If `connector` is a built-in connector that we do not # expose directly via a dedicated read_* helper, auto-load its jar. # Unknown / user-defined connector names are silently ignored by the # helper. add_built_in_connector(t_env, connector) builder = TableDescriptor.for_connector(connector) schema_builder = Schema.new_builder() primary_key_set = set(primary_key_columns) for name, dtype in schema.items(): table_type = dtype._table_type if name in primary_key_set: table_type = table_type.not_null() schema_builder.column(name, table_type) if primary_key_columns: schema_builder.primary_key(*primary_key_columns) builder = builder.schema(schema_builder.build()) for key, value in connector_options.items(): builder = builder.option(key, str(value)) table = t_env.from_descriptor(builder.build()) if columns is not None: table = table.select(*[col(c) for c in columns]) return DataFrame(table)
def _validate_odps_common_options( *, endpoint: str, project: str, table_name: str, access_id: str, access_key: str, tunnel_endpoint: Optional[str], quota_name: Optional[str], ) -> None: for name, value in ( ("endpoint", endpoint), ("project", project), ("table_name", table_name), ("access_id", access_id), ("access_key", access_key), ): if value is None or value == "": raise ValueError(f"'{name}' must be specified for ODPS.") if tunnel_endpoint is not None and quota_name is not None: raise ValueError( "Only one of 'tunnel_endpoint' or 'quota_name' can be " "specified. Remove 'tunnel_endpoint' when using an ODPS " "exclusive data-transfer quota." ) def _format_odps_partition( partition, *, allow_dynamic: bool = False, name: str = "partition", ): def _format_odps_fixed_partition(partition): parts = [] for item in partition: if not isinstance(item, tuple) or len(item) != 2: raise ValueError( f"'{name}' entries must be " "(partition_column, partition_value) tuples." ) partition_column, partition_value = item if not isinstance(partition_column, str) or not partition_column: raise ValueError( f"'{name}' partition column names must be " "non-empty strings." ) if partition_value is None: raise ValueError(f"'{name}' partition values must not be None.") parts.append(f"{partition_column}={_to_str(partition_value)}") return ",".join(parts) if partition is None or isinstance(partition, str): return partition if isinstance(partition, tuple): return _format_odps_fixed_partition([partition]) if not isinstance(partition, list): raise TypeError( f"'{name}' must be a string, a " "(partition_column, partition_value) tuple, or a list." ) if not partition: raise ValueError(f"'{name}' list must not be empty.") if all(isinstance(item, str) for item in partition): if not allow_dynamic: raise ValueError( f"'{name}' list entries must be " "(partition_column, partition_value) tuples." ) for partition_column in partition: if not partition_column: raise ValueError( f"'{name}' dynamic partition column names must be " "non-empty strings." ) return ",".join(partition) return _format_odps_fixed_partition(partition) def _format_paimon_column_list( value: Optional[Union[str, List[str]]], name: str, ) -> Optional[str]: if value is None: return None if isinstance(value, str): columns = [column.strip() for column in value.split(",")] elif isinstance(value, (list, tuple)): if not value: raise ValueError(f"'{name}' list must not be empty.") columns = [] for column in value: if not isinstance(column, str): raise ValueError(f"'{name}' entries must be non-empty strings.") columns.append(column.strip()) else: raise TypeError(f"'{name}' must be a string or a list of column names.") seen = set() for column in columns: if not column: raise ValueError(f"'{name}' entries must be non-empty strings.") if column in seen: raise ValueError(f"'{name}' must not contain duplicate column names.") seen.add(column) return ",".join(columns) def _validate_paimon_common_options( *, file_format: str, changelog_producer: str, merge_engine: str, write_mode: Optional[str] = None, ) -> None: _validate_allowed_value(file_format, "file_format", _PAIMON_FILE_FORMATS) _validate_allowed_value( changelog_producer, "changelog_producer", _PAIMON_CHANGELOG_PRODUCERS, ) _validate_allowed_value(merge_engine, "merge_engine", _PAIMON_MERGE_ENGINES) if write_mode is not None: _validate_allowed_value(write_mode, "write_mode", _PAIMON_WRITE_MODES) def _validate_read_hologres_options( *, binlog_read_mode, binlog_change_log_mode, binlog_startup_mode, partition_binlog_mode, connection_ssl_mode, connection_ssl_root_cert_location, start_time, filter_push_down, binlog_filter_push_down, lookup_filter_push_down, partition_values_to_read, cache, primary_key, ) -> None: _validate_allowed_value( binlog_read_mode, "binlog_read_mode", _HOLOGRES_BINLOG_READ_MODES ) _validate_allowed_value( binlog_change_log_mode, "binlog_change_log_mode", _HOLOGRES_BINLOG_CHANGE_LOG_MODES, ) _validate_allowed_value( binlog_startup_mode, "binlog_startup_mode", _HOLOGRES_BINLOG_STARTUP_MODES, ) _validate_allowed_value( partition_binlog_mode, "partition_binlog_mode", _HOLOGRES_PARTITION_BINLOG_MODES, ) _validate_allowed_value( connection_ssl_mode, "connection_ssl_mode", _HOLOGRES_SSL_MODES ) _validate_allowed_value(cache, "cache", _HOLOGRES_CACHE_TYPES) _validate_hologres_ssl_pair( connection_ssl_mode, connection_ssl_root_cert_location ) # NOTE: start_time without binlog_startup_mode='TIMESTAMP' is permitted. # The Hologres connector documents that startTime takes precedence and # forces TIMESTAMP startup mode when set (see HologresBinlogInputSplit). if binlog_startup_mode == "TIMESTAMP" and start_time is None: raise ValueError( "start_time is required when binlog_startup_mode='TIMESTAMP'." ) if binlog_change_log_mode == "UPSERT" and not primary_key: raise ValueError( "binlog_change_log_mode='UPSERT' requires primary_key to be set; " "Flink rejects an upsert changelog source without a primary key." ) if filter_push_down and binlog_filter_push_down: raise ValueError( "filter_push_down and binlog_filter_push_down cannot both be True." ) # HologresTableSource#verifyPushdownConfigs throws when lookup # filter-push-down is combined with either source-side filter-push-down. if lookup_filter_push_down and filter_push_down: raise ValueError( "lookup_filter_push_down and filter_push_down cannot both be True." ) if lookup_filter_push_down and binlog_filter_push_down: raise ValueError( "lookup_filter_push_down and binlog_filter_push_down " "cannot both be True." ) if ( partition_values_to_read is not None and partition_binlog_mode != "STATIC" ): raise ValueError( "partition_values_to_read is only valid when " "partition_binlog_mode='STATIC'." ) def _validate_hologres_ssl_pair( connection_ssl_mode, connection_ssl_root_cert_location ) -> None: if ( connection_ssl_mode in ("verify-ca", "verify-full") and connection_ssl_root_cert_location is None ): raise ValueError( f"connection_ssl_mode={connection_ssl_mode!r} requires " "connection_ssl_root_cert_location to be set " "(the Hologres JDBC driver rejects verify-* modes without a " "root certificate path)." ) def _paimon_common_options( *, path: Optional[str], auto_create: bool, file_format: str, bucket: int, bucket_key: Optional[Union[str, List[str]]], changelog_producer: str, full_compaction_delta_commits: Optional[int], lookup_cache_max_memory_size: str, merge_engine: str, partial_update_ignore_delete: bool, ignore_delete: bool, partition_default_name: str, partition_expiration_check_interval: str, partition_expiration_time: Optional[str], partition_timestamp_formatter: Optional[str], partition_timestamp_pattern: Optional[str], snapshot_num_retained_max: int, snapshot_num_retained_min: int, snapshot_time_retained: str, write_mode: Optional[str] = None, ) -> List[Tuple[str, Any]]: options = [ ("path", path), ("auto-create", auto_create), ("file.format", file_format), ("bucket", bucket), ("bucket-key", _format_paimon_column_list(bucket_key, "bucket_key")), ("changelog-producer", changelog_producer), ("full-compaction.delta-commits", full_compaction_delta_commits), ("lookup.cache-max-memory-size", lookup_cache_max_memory_size), ("merge-engine", merge_engine), ("partial-update.ignore-delete", partial_update_ignore_delete), ("ignore-delete", ignore_delete), ("partition.default-name", partition_default_name), ( "partition.expiration-check-interval", partition_expiration_check_interval, ), ("partition.expiration-time", partition_expiration_time), ("partition.timestamp-formatter", partition_timestamp_formatter), ("partition.timestamp-pattern", partition_timestamp_pattern), ("snapshot.num-retained.max", snapshot_num_retained_max), ("snapshot.num-retained.min", snapshot_num_retained_min), ("snapshot.time-retained", snapshot_time_retained), ] if write_mode is not None: options.append(("write-mode", write_mode)) return options def _validate_allowed_value(value: str, name: str, allowed) -> None: if value is None: raise ValueError(f"'{name}' must not be None.") if value not in allowed: raise ValueError( f"Invalid {name} '{value}'. Must be one of " f"{sorted(allowed)}." ) def _add_connector_options(builder, options): for key, value in options: if value is not None: builder = builder.option(key, str(value)) return builder def _add_extra_options(builder, extra_options): if extra_options is None: return builder if not isinstance(extra_options, dict): raise TypeError("'extra_options' must be a dict of option key/value pairs.") for key, value in extra_options.items(): if isinstance(key, str) and key.lower() == "connector": raise ValueError( "'extra_options' must not override the 'connector' option." ) builder = builder.option(key, str(value)) return builder def _to_str(value: Any) -> str: if isinstance(value, bool): return str(value).lower() return str(value) def _format_time(value: Union[str, datetime], format: str = "%Y-%m-%d %H:%M:%S") -> str: """Format a time value. ``datetime`` is formatted according to the given ``format``. ``str`` is returned unchanged. Aware ``datetime`` (with ``tzinfo``) is rejected, because ``strftime`` would silently drop the ``tzinfo``. """ if isinstance(value, datetime): if value.tzinfo is not None: raise ValueError( "datetime with tzinfo is not supported: " "pass a naive datetime instead." ) return value.strftime(format) return value def _collect_custom_options( options: Optional[Dict[str, Any]], ) -> Dict[str, Any]: """Validate and copy connector options.""" if options is None: return {} if not isinstance(options, dict): raise TypeError("'options' must be a dict of {option_key: option_value}.") connector_options: Dict[str, Any] = {} for key, value in options.items(): if not isinstance(key, str) or not key: raise ValueError( f"Invalid key '{key}' in options. " "Option keys must be non-empty strings." ) if key == "connector": raise ValueError( "Connector identifier must be specified via the " "'connector' argument, not as options['connector']." ) connector_options[key] = value return connector_options def _normalize_primary_key( primary_key: Optional[Union[str, List[str]]], column_names, ) -> List[str]: if primary_key is None: return [] if isinstance(primary_key, str): primary_key_columns = [primary_key] elif isinstance(primary_key, list): primary_key_columns = primary_key else: raise TypeError( "'primary_key' must be a string column name or a list of " "column names." ) if not primary_key_columns: raise ValueError("'primary_key' list must not be empty.") column_name_set = set(column_names) for column in primary_key_columns: if not isinstance(column, str) or not column: raise ValueError("'primary_key' column names must be non-empty strings.") if column not in column_name_set: raise ValueError( f"Primary key column '{column}' is not present in " "the schema." ) return primary_key_columns