Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • Input/Output
  • pyflink.dataframe.io.read_kafka

pyflink.dataframe.io.read_kafka#

read_kafka(bootstrap_servers: str, *, schema: Dict[str, DataType] | None = None, columns: List[str] | None = None, properties: Dict[str, str] | None = None, format: str = 'json', format_options: Dict[str, str] | None = None, key_format: str | None = None, key_format_options: Dict[str, str] | None = None, key_fields: List[str] | None = None, key_fields_prefix: str | None = None, value_format: str | None = None, value_format_options: Dict[str, str] | None = None, value_fields_include: Literal['ALL', 'EXCEPT_KEY'] = 'ALL', topic: str | List[str] | None = None, topic_pattern: str | None = None, group_id: str | None = None, startup_mode: Literal['earliest-offset', 'latest-offset', 'group-offsets', 'timestamp', 'specific-offsets'] = 'group-offsets', startup_specific_offsets: str | Dict[int, int] | None = None, startup_timestamp_millis: int | None = None, topic_partition_discovery_interval: str = '5 min', header_filter: str | None = None, check_duplicated_group_id: bool = False, bounded_mode: Literal['unbounded', 'group-offsets', 'latest-offset', 'timestamp', 'specific-offsets'] = 'unbounded', bounded_timestamp_millis: int | None = None, bounded_specific_offsets: str | Dict[int, int] | None = None) → DataFrame[source]#

Read data from one or more Kafka topics into a DataFrame.

Uses Flink’s Kafka SQL Connector under the hood. This creates a streaming source that continuously reads from Kafka. For bounded (batch) reads, use the bounded_mode parameter.

Parameters:
  • bootstrap_servers – Comma-separated list of Kafka broker addresses (e.g., "localhost:9092" or "broker1:9092,broker2:9092").

  • schema – Dict of {column_name: DataType} specifying the schema of the data in Kafka messages.

  • columns – Optional list of column names to read (projection pushdown). If None, all columns from the schema are read.

  • properties –

    Extra Kafka consumer properties. Each key in this dict is directly passed to Kafka.

    Example: {"allow.auto.create.topics": "false"}

    Note:

    • Property names should be valid Kafka consumer config keys.

    • key.deserializer and value.deserializer cannot be set, because Kafka connector overrides them.

  • format – The format used to deserialize the value part of Kafka messages (equivalent to value_format). Supported options: "csv", "json", "avro", "debezium-json", "canal-json", "maxwell-json", "avro-confluent", "raw".

  • format_options – Additional options for the value format used by format as a dict. Keys are format-specific options without the format prefix (e.g., {"map-null-key.mode": "FAIL"} for JSON format).

  • key_format –

    The format used to deserialize the key part of Kafka messages.

    If a key format is defined, the key_fields option is required as well. Otherwise the Kafka records will have an empty key.

  • key_format_options – Additional options for the key format, as a dict. Same structure as format_options.

  • key_fields – List of column names that form the message key (e.g. ["field1", "field2"]).

  • key_fields_prefix –

    Prefix for key field column names to avoid name clashes with value fields. For example, with prefix "k_", a key field user_id becomes k_user_id in the schema.

    This prefix is only for table-column disambiguation; it is removed when parsing/serializing Kafka key fields.

    When this option is set, value_fields_include must be set to "EXCEPT_KEY".

  • value_format –

    The format used to deserialize the value part of Kafka messages.

    Only one of format or value_format are required. If both are configured, value_format should take precedence.

  • value_format_options – Extra options for value_format.

  • value_fields_include –

    Whether key fields are included in value fields during value parsing.

    • "ALL" (default): all columns are treated as value fields.

    • "EXCEPT_KEY": fields in key_fields are excluded from value fields.

  • topic –

    Topic name(s) to read. It can be a single topic, a list of topics, or a semicolon-separated topic string (for example, "topic-1;topic-2").

    Exactly one of topic and topic_pattern can be set.

  • topic_pattern –

    Regular expression for matching topic names. All topics matching this pattern are consumed at runtime.

    Exactly one of topic and topic_pattern can be set.

  • group_id –

    Consumer group id.

    If a new group id is used for the first time, configure auto.offset.reset to earliest or latest in properties to control initial startup position.

  • startup_mode –

    Startup position mode for Kafka consumer. Values:

    • "group-offsets" (default): Start from committed offsets in the consumer group.

    • "earliest-offset": Start from the earliest available offset.

    • "latest-offset": Start from the latest offset.

    • "timestamp": Start from a user-specified timestamp (requires startup_timestamp_millis).

    • "specific-offsets": Start from specified offsets per partition (requires startup_specific_offsets).

    Note: This option is effective for stateless startup. When restarting from checkpoint/savepoint, restored state progress has higher priority.

  • startup_specific_offsets – Per-partition offsets to start reading from. Used when startup_mode="specific-offsets". Can be a string like "partition:0,offset:42;partition:1,offset:300" or a dict mapping partition IDs to offsets like {0: 42, 1: 300}.

  • startup_timestamp_millis – Epoch timestamp in milliseconds to start reading from. Used when startup_mode="timestamp".

  • topic_partition_discovery_interval –

    Interval for discovering new partitions and topics dynamically (e.g., "30 s", "5 min"). Default is 5 minutes.

    Note:

    • Set this to a non-positive interval to disable discovery.

    • In topic_pattern mode, this discovers both: new partitions in existing matched topics and newly created topics that match the pattern.

  • header_filter –

    Logical expression for filtering Kafka records by headers.

    Syntax:

    • Use key:value for one header condition.

    • Use & and | to combine conditions.

    • Use ! for negation.

    Example: "depart:toy|depart:book&!env:test" keeps records with depart=toy or depart=book, and without env=test.

    Notes:

    • Parentheses are not supported.

    • Operators are evaluated from left to right.

    • Header values are converted to UTF-8 strings before comparison.

  • check_duplicated_group_id –

    Whether to check if the consumer group ID already exists on broker. Default is False.

    • True: check duplicate group id before job starts. If found, fail fast to avoid consumer-group conflicts.

    • False: start directly without conflict check.

  • bounded_mode –

    Bounded mode for Kafka consumer (connector scan.bounded.mode). One of:

    • "unbounded" (default): Do not stop consuming.

    • "latest-offset": bounded by latest offsets. This is evaluated at the start of consumption from a given partition.

    • "group-offsets": bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.

    • "timestamp": bounded by a user-supplied timestamp (requires bounded_timestamp_millis).

    • "specific-offsets": bounded by user-supplied specific offsets for each partition (requires bounded_specific_offsets).

  • bounded_timestamp_millis – Epoch timestamp in milliseconds for the bounded end position. Used when bounded_mode="timestamp".

  • bounded_specific_offsets – Per-partition offsets for the bounded end position. Used when bounded_mode="specific-offsets". Same format as startup_specific_offsets.

Returns:

A new DataFrame representing the Kafka source.

Example:

>>> import pyflink.dataframe as pf
>>>
>>> # Simple JSON consumption from a single topic
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic="user_events",
...     schema={
...         "user_id": pf.DataType.int64(),
...         "event": pf.DataType.string(),
...         "ts": pf.DataType.timestamp(3),
...     },
... )
>>>
>>> # Read from earliest offset with explicit consumer group
>>> df = pf.read_kafka(
...     "broker1:9092,broker2:9092",
...     topic="orders",
...     schema={
...         "order_id": pf.DataType.int64(),
...         "amount": pf.DataType.float64(),
...     },
...     group_id="my-consumer-group",
...     startup_mode="earliest-offset",
... )
>>>
>>> # Read multiple topics
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic=["topic_a", "topic_b"],
...     schema={
...         "id": pf.DataType.int64(),
...         "data": pf.DataType.string(),
...     },
... )
>>>
>>> # Subscribe to topics by pattern
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic_pattern="events-.*",
...     schema={
...         "id": pf.DataType.int64(),
...         "payload": pf.DataType.string(),
...     },
... )
>>>
>>> # Bounded read (batch mode) — read up to the latest offset
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic="orders",
...     schema={
...         "order_id": pf.DataType.int64(),
...         "amount": pf.DataType.float64(),
...     },
...     startup_mode="earliest-offset",
...     bounded_mode="latest-offset",
... )
>>>
>>> # Read with key+value formats and Avro
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic="keyed_events",
...     schema={
...         "user_id": pf.DataType.int64(),
...         "event": pf.DataType.string(),
...     },
...     format="avro",
...     key_format="json",
...     key_fields=["user_id"],
... )
>>>
>>> # Start from a specific timestamp
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic="events",
...     schema={
...         "id": pf.DataType.int64(),
...         "data": pf.DataType.string(),
...     },
...     startup_mode="timestamp",
...     startup_timestamp_millis=1700000000000,
... )
>>>
>>> # Start from specific offsets per partition
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic="events",
...     schema={
...         "id": pf.DataType.int64(),
...         "data": pf.DataType.string(),
...     },
...     startup_mode="specific-offsets",
...     startup_specific_offsets={0: 42, 1: 300},
... )
>>>
>>> # JSON format with custom parse options
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic="events",
...     schema={
...         "id": pf.DataType.int64(),
...         "data": pf.DataType.string(),
...     },
...     format="json",
...     format_options={"ignore-parse-errors": "true"},
... )
>>>
>>> # With Kafka security (SASL/SSL)
>>> # When using Aliyun Kafka, download the truststore file from the Kafka
>>> # console and upload it as an additional dependency file.
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic="secure_events",
...     schema={
...         "id": pf.DataType.int64(),
...         "data": pf.DataType.string(),
...     },
...     properties={
...         "ssl.truststore.location": "/flink/usrlib/only.4096.client.truststore.jks",
...         "ssl.truststore.password": "KafkaOnsClient",
...         "ssl.endpoint.identification.algorithm": "",
...         "sasl.jaas.config": (
...             "org.apache.flink.kafka.shaded.org.apache.kafka.common.security"
...             ".plain.PlainLoginModule required "
...             f'username="<username>" '
...             f'password="<password>";'
...         ),
...         "sasl.mechanism": "PLAIN",
...         "security.protocol": "SASL_SSL",
...     },
... )
>>>
>>> # Read with column projection
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic="events",
...     schema={
...         "id": pf.DataType.int64(),
...         "name": pf.DataType.string(),
...         "value": pf.DataType.float64(),
...     },
...     columns=["id", "name"],
... )
>>>
>>> # CDC with Debezium format
>>> df = pf.read_kafka(
...     "localhost:9092",
...     topic="cdc_events",
...     schema={
...         "id": pf.DataType.int64(),
...         "name": pf.DataType.string(),
...     },
...     format="debezium-json",
... )

previous

pyflink.dataframe.io.read_parquet

next

pyflink.dataframe.io.read_odps

On this page
  • read_kafka()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.