pyflink.dataframe.io.read_kafka#
- read_kafka(bootstrap_servers: str, *, schema: Dict[str, DataType] | None = None, columns: List[str] | None = None, properties: Dict[str, str] | None = None, format: str = 'json', format_options: Dict[str, str] | None = None, key_format: str | None = None, key_format_options: Dict[str, str] | None = None, key_fields: List[str] | None = None, key_fields_prefix: str | None = None, value_format: str | None = None, value_format_options: Dict[str, str] | None = None, value_fields_include: Literal['ALL', 'EXCEPT_KEY'] = 'ALL', topic: str | List[str] | None = None, topic_pattern: str | None = None, group_id: str | None = None, startup_mode: Literal['earliest-offset', 'latest-offset', 'group-offsets', 'timestamp', 'specific-offsets'] = 'group-offsets', startup_specific_offsets: str | Dict[int, int] | None = None, startup_timestamp_millis: int | None = None, topic_partition_discovery_interval: str = '5 min', header_filter: str | None = None, check_duplicated_group_id: bool = False, bounded_mode: Literal['unbounded', 'group-offsets', 'latest-offset', 'timestamp', 'specific-offsets'] = 'unbounded', bounded_timestamp_millis: int | None = None, bounded_specific_offsets: str | Dict[int, int] | None = None) DataFrame[source]#
Read data from one or more Kafka topics into a DataFrame.
Uses Flink’s Kafka SQL Connector under the hood. This creates a streaming source that continuously reads from Kafka. For bounded (batch) reads, use the
bounded_modeparameter.- Parameters:
bootstrap_servers – Comma-separated list of Kafka broker addresses (e.g.,
"localhost:9092"or"broker1:9092,broker2:9092").schema – Dict of
{column_name: DataType}specifying the schema of the data in Kafka messages.columns – Optional list of column names to read (projection pushdown). If None, all columns from the schema are read.
properties –
Extra Kafka consumer properties. Each key in this dict is directly passed to Kafka.
Example:
{"allow.auto.create.topics": "false"}Note:
Property names should be valid Kafka consumer config keys.
key.deserializerandvalue.deserializercannot be set, because Kafka connector overrides them.
format – The format used to deserialize the value part of Kafka messages (equivalent to
value_format). Supported options:"csv","json","avro","debezium-json","canal-json","maxwell-json","avro-confluent","raw".format_options – Additional options for the value format used by
formatas a dict. Keys are format-specific options without the format prefix (e.g.,{"map-null-key.mode": "FAIL"}for JSON format).key_format –
The format used to deserialize the key part of Kafka messages.
If a key format is defined, the
key_fieldsoption is required as well. Otherwise the Kafka records will have an empty key.key_format_options – Additional options for the key format, as a dict. Same structure as
format_options.key_fields – List of column names that form the message key (e.g.
["field1", "field2"]).key_fields_prefix –
Prefix for key field column names to avoid name clashes with value fields. For example, with prefix
"k_", a key fielduser_idbecomesk_user_idin the schema.This prefix is only for table-column disambiguation; it is removed when parsing/serializing Kafka key fields.
When this option is set,
value_fields_includemust be set to"EXCEPT_KEY".value_format –
The format used to deserialize the value part of Kafka messages.
Only one of
formatorvalue_formatare required. If both are configured,value_formatshould take precedence.value_format_options – Extra options for
value_format.value_fields_include –
Whether key fields are included in value fields during value parsing.
"ALL"(default): all columns are treated as value fields."EXCEPT_KEY": fields inkey_fieldsare excluded from value fields.
topic –
Topic name(s) to read. It can be a single topic, a list of topics, or a semicolon-separated topic string (for example,
"topic-1;topic-2").Exactly one of
topicandtopic_patterncan be set.topic_pattern –
Regular expression for matching topic names. All topics matching this pattern are consumed at runtime.
Exactly one of
topicandtopic_patterncan be set.group_id –
Consumer group id.
If a new group id is used for the first time, configure
auto.offset.resettoearliestorlatestinpropertiesto control initial startup position.startup_mode –
Startup position mode for Kafka consumer. Values:
"group-offsets"(default): Start from committed offsets in the consumer group."earliest-offset": Start from the earliest available offset."latest-offset": Start from the latest offset."timestamp": Start from a user-specified timestamp (requiresstartup_timestamp_millis)."specific-offsets": Start from specified offsets per partition (requiresstartup_specific_offsets).
Note: This option is effective for stateless startup. When restarting from checkpoint/savepoint, restored state progress has higher priority.
startup_specific_offsets – Per-partition offsets to start reading from. Used when
startup_mode="specific-offsets". Can be a string like"partition:0,offset:42;partition:1,offset:300"or a dict mapping partition IDs to offsets like{0: 42, 1: 300}.startup_timestamp_millis – Epoch timestamp in milliseconds to start reading from. Used when
startup_mode="timestamp".topic_partition_discovery_interval –
Interval for discovering new partitions and topics dynamically (e.g.,
"30 s","5 min"). Default is 5 minutes.Note:
Set this to a non-positive interval to disable discovery.
In
topic_patternmode, this discovers both: new partitions in existing matched topics and newly created topics that match the pattern.
header_filter –
Logical expression for filtering Kafka records by headers.
Syntax:
Use
key:valuefor one header condition.Use
&and|to combine conditions.Use
!for negation.
Example:
"depart:toy|depart:book&!env:test"keeps records withdepart=toyordepart=book, and withoutenv=test.Notes:
Parentheses are not supported.
Operators are evaluated from left to right.
Header values are converted to UTF-8 strings before comparison.
check_duplicated_group_id –
Whether to check if the consumer group ID already exists on broker. Default is
False.True: check duplicate group id before job starts. If found, fail fast to avoid consumer-group conflicts.False: start directly without conflict check.
bounded_mode –
Bounded mode for Kafka consumer (connector
scan.bounded.mode). One of:"unbounded"(default): Do not stop consuming."latest-offset": bounded by latest offsets. This is evaluated at the start of consumption from a given partition."group-offsets": bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition."timestamp": bounded by a user-supplied timestamp (requiresbounded_timestamp_millis)."specific-offsets": bounded by user-supplied specific offsets for each partition (requiresbounded_specific_offsets).
bounded_timestamp_millis – Epoch timestamp in milliseconds for the bounded end position. Used when
bounded_mode="timestamp".bounded_specific_offsets – Per-partition offsets for the bounded end position. Used when
bounded_mode="specific-offsets". Same format asstartup_specific_offsets.
- Returns:
A new DataFrame representing the Kafka source.
Example:
>>> import pyflink.dataframe as pf >>> >>> # Simple JSON consumption from a single topic >>> df = pf.read_kafka( ... "localhost:9092", ... topic="user_events", ... schema={ ... "user_id": pf.DataType.int64(), ... "event": pf.DataType.string(), ... "ts": pf.DataType.timestamp(3), ... }, ... ) >>> >>> # Read from earliest offset with explicit consumer group >>> df = pf.read_kafka( ... "broker1:9092,broker2:9092", ... topic="orders", ... schema={ ... "order_id": pf.DataType.int64(), ... "amount": pf.DataType.float64(), ... }, ... group_id="my-consumer-group", ... startup_mode="earliest-offset", ... ) >>> >>> # Read multiple topics >>> df = pf.read_kafka( ... "localhost:9092", ... topic=["topic_a", "topic_b"], ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... ) >>> >>> # Subscribe to topics by pattern >>> df = pf.read_kafka( ... "localhost:9092", ... topic_pattern="events-.*", ... schema={ ... "id": pf.DataType.int64(), ... "payload": pf.DataType.string(), ... }, ... ) >>> >>> # Bounded read (batch mode) — read up to the latest offset >>> df = pf.read_kafka( ... "localhost:9092", ... topic="orders", ... schema={ ... "order_id": pf.DataType.int64(), ... "amount": pf.DataType.float64(), ... }, ... startup_mode="earliest-offset", ... bounded_mode="latest-offset", ... ) >>> >>> # Read with key+value formats and Avro >>> df = pf.read_kafka( ... "localhost:9092", ... topic="keyed_events", ... schema={ ... "user_id": pf.DataType.int64(), ... "event": pf.DataType.string(), ... }, ... format="avro", ... key_format="json", ... key_fields=["user_id"], ... ) >>> >>> # Start from a specific timestamp >>> df = pf.read_kafka( ... "localhost:9092", ... topic="events", ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... startup_mode="timestamp", ... startup_timestamp_millis=1700000000000, ... ) >>> >>> # Start from specific offsets per partition >>> df = pf.read_kafka( ... "localhost:9092", ... topic="events", ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... startup_mode="specific-offsets", ... startup_specific_offsets={0: 42, 1: 300}, ... ) >>> >>> # JSON format with custom parse options >>> df = pf.read_kafka( ... "localhost:9092", ... topic="events", ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... format="json", ... format_options={"ignore-parse-errors": "true"}, ... ) >>> >>> # With Kafka security (SASL/SSL) >>> # When using Aliyun Kafka, download the truststore file from the Kafka >>> # console and upload it as an additional dependency file. >>> df = pf.read_kafka( ... "localhost:9092", ... topic="secure_events", ... schema={ ... "id": pf.DataType.int64(), ... "data": pf.DataType.string(), ... }, ... properties={ ... "ssl.truststore.location": "/flink/usrlib/only.4096.client.truststore.jks", ... "ssl.truststore.password": "KafkaOnsClient", ... "ssl.endpoint.identification.algorithm": "", ... "sasl.jaas.config": ( ... "org.apache.flink.kafka.shaded.org.apache.kafka.common.security" ... ".plain.PlainLoginModule required " ... f'username="<username>" ' ... f'password="<password>";' ... ), ... "sasl.mechanism": "PLAIN", ... "security.protocol": "SASL_SSL", ... }, ... ) >>> >>> # Read with column projection >>> df = pf.read_kafka( ... "localhost:9092", ... topic="events", ... schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... "value": pf.DataType.float64(), ... }, ... columns=["id", "name"], ... ) >>> >>> # CDC with Debezium format >>> df = pf.read_kafka( ... "localhost:9092", ... topic="cdc_events", ... schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... }, ... format="debezium-json", ... )