pyflink.dataframe.io.read_custom#
- read_custom(connector: str, *, schema: Dict[str, DataType] | None = None, primary_key: str | List[str] | None = None, columns: List[str] | None = None, options: Dict[str, Any] | None = None) DataFrame[source]#
Read data from a custom connector into a DataFrame.
This is a generic entrypoint for connectors that are not covered by the dedicated
read_*helpers (e.g.read_parquet,read_kafka,read_odps). It targets SQL connectors that are discoverable through Flink’s standard factory mechanism.- Parameters:
connector – Factory identifier of the connector. This is the same value users put in SQL DDL
WITH ('connector' = '...'). For example,"datagen","filesystem", or the name of a custom connector registered on the platform.schema – Dict of
{column_name: DataType}describing the schema of the source. Required.primary_key – Optional primary key column name or list of column names. Set this only for connectors that expect primary key constraints in the table schema.
columns – Optional list of column names to read (projection pushdown). If
None, all columns fromschemaare read.options – Optional dict of connector and table options. Each key is forwarded as-is with the same name it would have in
WITH (...)in SQL DDL. Values are converted to strings."connector"is reserved and must be specified via theconnectorargument.
- Returns:
A new DataFrame backed by the configured source.
Example:
>>> import pyflink.dataframe as pf >>> >>> # Read with a connector identified by ``"my-custom"`` (uploaded >>> # connector JAR registered with factory identifier ``my-custom``). >>> df = pf.read_custom( ... "my-custom", ... schema={ ... "id": pf.DataType.int64(), ... "name": pf.DataType.string(), ... }, ... options={ ... "host": "example.com", ... "port": "9000", ... }, ... ) >>> >>> # Read with a format and format options >>> df = pf.read_custom( ... "my-custom", ... schema={"id": pf.DataType.int64()}, ... primary_key="id", ... options={ ... "endpoint": "example.com:9000", ... "format": "json", ... "json.ignore-parse-errors": "true", ... }, ... )