Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • Input/Output
  • pyflink.dataframe.io.read_custom

pyflink.dataframe.io.read_custom#

read_custom(connector: str, *, schema: Dict[str, DataType] | None = None, primary_key: str | List[str] | None = None, columns: List[str] | None = None, options: Dict[str, Any] | None = None) → DataFrame[source]#

Read data from a custom connector into a DataFrame.

This is a generic entrypoint for connectors that are not covered by the dedicated read_* helpers (e.g. read_parquet, read_kafka, read_odps). It targets SQL connectors that are discoverable through Flink’s standard factory mechanism.

Parameters:
  • connector – Factory identifier of the connector. This is the same value users put in SQL DDL WITH ('connector' = '...'). For example, "datagen", "filesystem", or the name of a custom connector registered on the platform.

  • schema – Dict of {column_name: DataType} describing the schema of the source. Required.

  • primary_key – Optional primary key column name or list of column names. Set this only for connectors that expect primary key constraints in the table schema.

  • columns – Optional list of column names to read (projection pushdown). If None, all columns from schema are read.

  • options – Optional dict of connector and table options. Each key is forwarded as-is with the same name it would have in WITH (...) in SQL DDL. Values are converted to strings. "connector" is reserved and must be specified via the connector argument.

Returns:

A new DataFrame backed by the configured source.

Example:

>>> import pyflink.dataframe as pf
>>>
>>> # Read with a connector identified by ``"my-custom"`` (uploaded
>>> # connector JAR registered with factory identifier ``my-custom``).
>>> df = pf.read_custom(
...     "my-custom",
...     schema={
...         "id": pf.DataType.int64(),
...         "name": pf.DataType.string(),
...     },
...     options={
...         "host": "example.com",
...         "port": "9000",
...     },
... )
>>>
>>> # Read with a format and format options
>>> df = pf.read_custom(
...     "my-custom",
...     schema={"id": pf.DataType.int64()},
...     primary_key="id",
...     options={
...         "endpoint": "example.com:9000",
...         "format": "json",
...         "json.ignore-parse-errors": "true",
...     },
... )

previous

pyflink.dataframe.io.read_hologres

next

SQL

On this page
  • read_custom()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.