pyflink.dataframe.dataframe.DataFrame.write_custom#
- DataFrame.write_custom(connector: str, *, primary_key: str | List[str] | None = None, options: Dict[str, Any] | None = None) None[source]#
Write the DataFrame using a custom connector.
This is a generic entrypoint for connectors that are not covered by the dedicated
write_*helpers (e.g.write_parquet,write_kafka,write_odps). It targets SQL connectors that are discoverable through Flink’s standard factory mechanism.The DataFrame’s physical columns are forwarded to the sink. Primary key constraints are not inherited from upstream tables; pass
primary_keywhen the sink schema should include one.- Parameters:
connector – Factory identifier of the connector. This is the same value users put in SQL DDL
WITH ('connector' = '...').primary_key – Optional primary key column name or list of column names. Set this when the custom sink connector should receive a primary key constraint in its table schema.
options – Optional dict of connector and table options. Each key is forwarded as-is with the same name it would have in
WITH (...)in SQL DDL. Values are converted to strings."connector"is reserved and must be specified via theconnectorargument.
Example:
>>> import pyflink.dataframe as pf >>> >>> df = pf.from_records([(1, "a")], schema=["id", "name"]) >>> >>> # Write through a custom connector identified by ``"my-custom"``. >>> df.write_custom( ... "my-custom", ... options={ ... "host": "example.com", ... "port": "9000", ... }, ... ) >>> >>> # Use a format and additional sink options >>> df.write_custom( ... "my-custom", ... primary_key="id", ... options={ ... "endpoint": "example.com:9000", ... "format": "json", ... "json.map-null-key.mode": "FAIL", ... "sink.parallelism": "4", ... }, ... )