pyflink.dataframe.dataframe.DataFrame.write_kafka#
- DataFrame.write_kafka(bootstrap_servers: str, *, properties: Dict[str, str] | None = None, format: str = 'json', format_options: Dict[str, str] | None = None, key_format: str | None = None, key_format_options: Dict[str, str] | None = None, key_fields: List[str] | None = None, key_fields_prefix: str | None = None, value_format: str | None = None, value_format_options: Dict[str, str] | None = None, value_fields_include: Literal['ALL', 'EXCEPT_KEY'] = 'ALL', topic: str, partitioner: str = 'default', delivery_guarantee: Literal['none', 'at-least-once', 'exactly-once'] = 'at-least-once', transactional_id_prefix: str | None = None, parallelism: int | None = None) None[source]#
Write the DataFrame to a Kafka topic.
Uses Flink’s Kafka SQL Connector under the hood. The DataFrame rows are serialized using the specified format and written to the given Kafka topic.
- Parameters:
bootstrap_servers – Comma-separated list of Kafka broker addresses (e.g.,
"localhost:9092"or"broker1:9092,broker2:9092").properties –
Extra Kafka producer properties. Each key in this dict is directly passed to Kafka.
Example:
{"batch.size": "16384"}Note: - Property names should be valid Kafka producer config keys.
format – Format for serializing message values. Supported options:
"csv","json","avro","debezium-json","canal-json","maxwell-json","avro-confluent","raw".format_options – Additional options for the value format, as a dict. Keys are format-specific options without the format prefix (e.g.,
{"timestamp-format.standard": "ISO-8601"}for JSON format).key_format – Format for serializing message keys (e.g.,
"json","csv","avro"). If specified,key_fieldsshould also be provided.key_format_options – Additional options for the key format, as a dict. Same structure as
format_options.key_fields – List of column names that form the message key. Required when
key_formatis specified.key_fields_prefix – Prefix for key field column names to avoid name clashes with value fields. Requires
value_fields_include="EXCEPT_KEY".value_format –
The format used to serialize the value part of Kafka messages.
Only one of
formatorvalue_formatare required. If both are configured,value_formatshould take precedence.value_format_options – Extra options for
value_format.value_fields_include –
Whether key fields are included in value fields during value serialization.
"ALL"(default): all columns are treated as value fields."EXCEPT_KEY": fields inkey_fieldsare excluded from value fields.
topic – Target Kafka topic name to write to.
partitioner –
How to distribute records to Kafka partitions. One of:
"default": Use Kafka’s default partitioner (stickyround-robin for null keys, murmur2 hash for keys).
"fixed": Each Flink partition maps to at most one Kafkapartition.
"round-robin": Records are distributed round-robinacross partitions (only for records without keys).
- A fully qualified class name of a custom
FlinkKafkaPartitionersubclass.
delivery_guarantee –
Delivery semantic for the Kafka sink. One of:
"at-least-once"(default): Messages are never lost but may be duplicated on failure."exactly-once": Kafka transactions ensure no duplicates. Requirestransactional_id_prefixto be set and Flink checkpointing to be enabled."none": No guarantees — messages may be lost or duplicated.
transactional_id_prefix – Prefix for Kafka transaction IDs. Required when
delivery_guarantee="exactly-once". Should be stable across restarts. Different applications must use different prefixes.parallelism – Custom parallelism for the Kafka sink operator. By default, inherits parallelism from the upstream operator.
Example:
>>> import pyflink.dataframe as pf >>> >>> df = pf.from_records( ... [(1, "alice"), (2, "bob")], ... schema=["user_id", "name"], ... ) >>> >>> # Simple write with JSON format >>> df.write_kafka("localhost:9092", topic="user_events") >>> >>> # Write with Avro format >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... format="avro", ... ) >>> >>> # Write with key fields >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... key_format="json", ... key_fields=["user_id"], ... ) >>> >>> # Write with exactly-once delivery guarantee >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... delivery_guarantee="exactly-once", ... transactional_id_prefix="my-app-sink", ... ) >>> >>> # Write with fixed partitioning and parallelism >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... partitioner="fixed", ... parallelism=4, ... ) >>> >>> # Write with JSON format options >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... format="json", ... format_options={ ... "timestamp-format.standard": "ISO-8601", ... }, ... ) >>> >>> # Write with Kafka security (SASL/SSL) >>> df.write_kafka( ... "localhost:9092", ... topic="secure_events", ... properties={ ... "security.protocol": "SASL_SSL", ... "sasl.mechanism": "PLAIN", ... "sasl.jaas.config": ( ... "org.apache.kafka.common.security.plain" ... ".PlainLoginModule required" ... ' username="user" password="pass";' ... ), ... }, ... )