Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • DataFrame
  • pyflink.dataframe.dataframe.DataFrame.write_kafka

pyflink.dataframe.dataframe.DataFrame.write_kafka#

DataFrame.write_kafka(bootstrap_servers: str, *, properties: Dict[str, str] | None = None, format: str = 'json', format_options: Dict[str, str] | None = None, key_format: str | None = None, key_format_options: Dict[str, str] | None = None, key_fields: List[str] | None = None, key_fields_prefix: str | None = None, value_format: str | None = None, value_format_options: Dict[str, str] | None = None, value_fields_include: Literal['ALL', 'EXCEPT_KEY'] = 'ALL', topic: str, partitioner: str = 'default', delivery_guarantee: Literal['none', 'at-least-once', 'exactly-once'] = 'at-least-once', transactional_id_prefix: str | None = None, parallelism: int | None = None) → None[source]#

Write the DataFrame to a Kafka topic.

Uses Flink’s Kafka SQL Connector under the hood. The DataFrame rows are serialized using the specified format and written to the given Kafka topic.

Parameters:
  • bootstrap_servers – Comma-separated list of Kafka broker addresses (e.g., "localhost:9092" or "broker1:9092,broker2:9092").

  • properties –

    Extra Kafka producer properties. Each key in this dict is directly passed to Kafka.

    Example: {"batch.size": "16384"}

    Note: - Property names should be valid Kafka producer config keys.

  • format – Format for serializing message values. Supported options: "csv", "json", "avro", "debezium-json", "canal-json", "maxwell-json", "avro-confluent", "raw".

  • format_options – Additional options for the value format, as a dict. Keys are format-specific options without the format prefix (e.g., {"timestamp-format.standard": "ISO-8601"} for JSON format).

  • key_format – Format for serializing message keys (e.g., "json", "csv", "avro"). If specified, key_fields should also be provided.

  • key_format_options – Additional options for the key format, as a dict. Same structure as format_options.

  • key_fields – List of column names that form the message key. Required when key_format is specified.

  • key_fields_prefix – Prefix for key field column names to avoid name clashes with value fields. Requires value_fields_include="EXCEPT_KEY".

  • value_format –

    The format used to serialize the value part of Kafka messages.

    Only one of format or value_format are required. If both are configured, value_format should take precedence.

  • value_format_options – Extra options for value_format.

  • value_fields_include –

    Whether key fields are included in value fields during value serialization.

    • "ALL" (default): all columns are treated as value fields.

    • "EXCEPT_KEY": fields in key_fields are excluded from value fields.

  • topic – Target Kafka topic name to write to.

  • partitioner –

    How to distribute records to Kafka partitions. One of:

    • "default": Use Kafka’s default partitioner (sticky

      round-robin for null keys, murmur2 hash for keys).

    • "fixed": Each Flink partition maps to at most one Kafka

      partition.

    • "round-robin": Records are distributed round-robin

      across partitions (only for records without keys).

    • A fully qualified class name of a custom

      FlinkKafkaPartitioner subclass.

  • delivery_guarantee –

    Delivery semantic for the Kafka sink. One of:

    • "at-least-once" (default): Messages are never lost but may be duplicated on failure.

    • "exactly-once": Kafka transactions ensure no duplicates. Requires transactional_id_prefix to be set and Flink checkpointing to be enabled.

    • "none": No guarantees — messages may be lost or duplicated.

  • transactional_id_prefix – Prefix for Kafka transaction IDs. Required when delivery_guarantee="exactly-once". Should be stable across restarts. Different applications must use different prefixes.

  • parallelism – Custom parallelism for the Kafka sink operator. By default, inherits parallelism from the upstream operator.

Example:

>>> import pyflink.dataframe as pf
>>>
>>> df = pf.from_records(
...     [(1, "alice"), (2, "bob")],
...     schema=["user_id", "name"],
... )
>>>
>>> # Simple write with JSON format
>>> df.write_kafka("localhost:9092", topic="user_events")
>>>
>>> # Write with Avro format
>>> df.write_kafka(
...     "localhost:9092",
...     topic="user_events",
...     format="avro",
... )
>>>
>>> # Write with key fields
>>> df.write_kafka(
...     "localhost:9092",
...     topic="user_events",
...     key_format="json",
...     key_fields=["user_id"],
... )
>>>
>>> # Write with exactly-once delivery guarantee
>>> df.write_kafka(
...     "localhost:9092",
...     topic="user_events",
...     delivery_guarantee="exactly-once",
...     transactional_id_prefix="my-app-sink",
... )
>>>
>>> # Write with fixed partitioning and parallelism
>>> df.write_kafka(
...     "localhost:9092",
...     topic="user_events",
...     partitioner="fixed",
...     parallelism=4,
... )
>>>
>>> # Write with JSON format options
>>> df.write_kafka(
...     "localhost:9092",
...     topic="user_events",
...     format="json",
...     format_options={
...         "timestamp-format.standard": "ISO-8601",
...     },
... )
>>>
>>> # Write with Kafka security (SASL/SSL)
>>> df.write_kafka(
...     "localhost:9092",
...     topic="secure_events",
...     properties={
...         "security.protocol": "SASL_SSL",
...         "sasl.mechanism": "PLAIN",
...         "sasl.jaas.config": (
...             "org.apache.kafka.common.security.plain"
...             ".PlainLoginModule required"
...             ' username="user" password="pass";'
...         ),
...     },
... )

previous

pyflink.dataframe.dataframe.DataFrame.write_parquet

next

pyflink.dataframe.dataframe.DataFrame.write_odps

On this page
  • DataFrame.write_kafka()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.