Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • DataFrame
  • pyflink.dataframe.dataframe.DataFrame.write_custom

pyflink.dataframe.dataframe.DataFrame.write_custom#

DataFrame.write_custom(connector: str, *, primary_key: str | List[str] | None = None, options: Dict[str, Any] | None = None) → None[source]#

Write the DataFrame using a custom connector.

This is a generic entrypoint for connectors that are not covered by the dedicated write_* helpers (e.g. write_parquet, write_kafka, write_odps). It targets SQL connectors that are discoverable through Flink’s standard factory mechanism.

The DataFrame’s physical columns are forwarded to the sink. Primary key constraints are not inherited from upstream tables; pass primary_key when the sink schema should include one.

Parameters:
  • connector – Factory identifier of the connector. This is the same value users put in SQL DDL WITH ('connector' = '...').

  • primary_key – Optional primary key column name or list of column names. Set this when the custom sink connector should receive a primary key constraint in its table schema.

  • options – Optional dict of connector and table options. Each key is forwarded as-is with the same name it would have in WITH (...) in SQL DDL. Values are converted to strings. "connector" is reserved and must be specified via the connector argument.

Example:

>>> import pyflink.dataframe as pf
>>>
>>> df = pf.from_records([(1, "a")], schema=["id", "name"])
>>>
>>> # Write through a custom connector identified by ``"my-custom"``.
>>> df.write_custom(
...     "my-custom",
...     options={
...         "host": "example.com",
...         "port": "9000",
...     },
... )
>>>
>>> # Use a format and additional sink options
>>> df.write_custom(
...     "my-custom",
...     primary_key="id",
...     options={
...         "endpoint": "example.com:9000",
...         "format": "json",
...         "json.map-null-key.mode": "FAIL",
...         "sink.parallelism": "4",
...     },
... )

previous

pyflink.dataframe.dataframe.DataFrame.write_hologres

next

pyflink.dataframe.dataframe.DataFrame.explain

On this page
  • DataFrame.write_custom()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.