Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • Input/Output
  • pyflink.dataframe.io.read_hologres

pyflink.dataframe.io.read_hologres#

read_hologres(endpoint: str, *, db_name: str, table_name: str, username: str, password: str, schema: Dict[str, DataType] | None = None, primary_key: str | List[str] | None = None, columns: List[str] | None = None, connection_pool_size: int = 5, connection_pool_name: str = 'default', connection_fixed: bool | None = None, connection_max_idle_ms: int = 60000, connection_ssl_mode: Literal['disable', 'require', 'verify-ca', 'verify-full'] = 'disable', connection_ssl_root_cert_location: str | None = None, retry_count: int = 10, retry_sleep_step_ms: int = 5000, meta_cache_ttl_ms: int = 600000, serverless_computing: bool = False, binlog: bool = True, binlog_read_mode: Literal['AUTO', 'JDBC', 'HOLOHUB'] = 'AUTO', binlog_change_log_mode: Literal['ALL', 'UPSERT', 'ALL_AS_APPEND_ONLY'] = 'ALL', binlog_startup_mode: Literal['INITIAL', 'EARLIEST_OFFSET', 'TIMESTAMP', 'LATEST_OFFSET'] = 'INITIAL', binlog_batch_size: int = 512, binlog_request_timeout_ms: int = 300000, binlog_project_columns: bool | None = None, binlog_compression: bool | None = None, partition_binlog_mode: Literal['DISABLE', 'STATIC', 'DYNAMIC'] = 'DISABLE', partition_binlog_lateness_timeout_minutes: int = 60, partition_values_to_read: str | List[str] | None = None, start_time: str | datetime | None = None, fetch_size: int = 512, scan_timeout_seconds: int = 600, filter_push_down: bool = False, binlog_filter_push_down: bool = False, prefer_physical_column_over_metadata_column: bool = False, lookup_batch_size: int = 256, lookup_timeout_ms: int = 0, lookup_column_table: bool = False, lookup_insert_if_not_exists: bool = False, lookup_async: bool = True, lookup_filter_push_down: bool = False, cache: Literal['None', 'LRU'] = 'None', cache_size: int = 100000, cache_ttl_ms: int | None = None, cache_empty: bool = True, async_: bool = False, extra_options: Dict[str, Any] | None = None) → DataFrame[source]#

Read a Hologres table into a DataFrame.

Wraps the Ververica Hologres source connector. Hologres source supports both bulk scans and binlog streaming; the consumption mode is controlled by binlog and the binlog-related parameters. The connector options are exposed with Pythonic snake_case names, in the order the Hologres documentation’s “WITH parameters” table presents them.

Parameters:
  • endpoint – Hologres service endpoint (use the VPC endpoint for VVR).

  • db_name – Hologres database name. Optionally suffixed with a compute group (e.g. "my_db/my_group").

  • table_name – Hologres table name. Use "schema.tableName" to read from a non-public schema.

  • username – Hologres account username (or AccessKey ID).

  • password – Hologres account password (or AccessKey Secret).

  • schema – Dict of {column_name: DataType} specifying the source schema. Required.

  • primary_key – Optional primary key column name or list of column names. Set this when the upstream Hologres table has a primary key — the streaming change-log behavior depends on it.

  • columns – Optional list of column names to read. If None, all columns from schema are read.

  • connection_pool_size – JDBC connection pool size. Applies to both sink and dim-table workloads.

  • connection_pool_name – Connection pool name; tables sharing the same name within a Flink TaskManager share connections.

  • connection_fixed – When True, enable Hologres lightweight (fixed) connection mode. When None (the default), the connector picks a value based on the Hologres engine version.

  • connection_max_idle_ms – Idle time (milliseconds) before a JDBC connection in the pool is released.

  • connection_ssl_mode – SSL transport mode. One of "disable", "require", "verify-ca", or "verify-full". The verify-* modes require connection_ssl_root_cert_location.

  • connection_ssl_root_cert_location – Path to the CA certificate file. Files must be uploaded to VVP under /flink/usrlib/<name>.

  • retry_count – Number of retries on connection failures.

  • retry_sleep_step_ms – Linear back-off step (milliseconds) added per retry attempt.

  • meta_cache_ttl_ms – TTL (milliseconds) of the local Hologres TableSchema cache.

  • serverless_computing – When True, run scans on Hologres serverless compute (only available for batch reads).

  • binlog – When True (the default), consume binlog changes; when False, run a bulk scan only.

  • binlog_read_mode – Binlog read transport. "AUTO" lets the connector pick between JDBC and HoloHub.

  • binlog_change_log_mode – How binlog records are mapped to Flink change-log rows. "ALL" (the default) emits +I/-U/+U/-D; "UPSERT" emits +I/+U/-D and requires a primary_key; "ALL_AS_APPEND_ONLY" treats every event as +I.

  • binlog_startup_mode – Where the binlog consumer starts. "INITIAL" first snapshots, then tails the binlog; "EARLIEST_OFFSET" and "LATEST_OFFSET" start at the binlog ends; "TIMESTAMP" starts at start_time (required in this mode). Setting start_time alone is equivalent — the connector promotes the startup mode to TIMESTAMP whenever start_time is provided.

  • binlog_batch_size – Number of rows read per binlog batch.

  • binlog_request_timeout_ms – Binlog request timeout (milliseconds).

  • binlog_project_columns – When True, request only the declared columns from the binlog stream. None (default) lets the server pick.

  • binlog_compression – When True, enable LZ4 compression on the binlog transport. None (default) lets the server pick.

  • partition_binlog_mode – How to consume binlog for partitioned tables. "DISABLE" (default) treats partition tables as ordinary tables. "STATIC" reads a fixed set of partitions (combine with partition_values_to_read). "DYNAMIC" adapts to new partitions automatically.

  • partition_binlog_lateness_timeout_minutes – Maximum lateness, in minutes, tolerated when partition_binlog_mode='DYNAMIC'.

  • partition_values_to_read – Partition values to consume when partition_binlog_mode='STATIC'. Accepts a list of strings (e.g. ["ds=20240101", "ds=20240102"]) or a comma-separated string (e.g. "ds=20240101,ds=20240102"). Wildcards / regex are not supported.

  • start_time – Binlog consumption start time. When set, the Hologres connector forces TIMESTAMP startup and ignores binlog_startup_mode, so it’s fine to leave that argument at its default. Accepts a naive datetime without tzinfo (formatted as "yyyy-MM-dd HH:mm:ss") or a string already in that format. The Hologres connector parses this string in the JobManager JVM’s default timezone, so the wall-clock value you pass must already be expressed in that zone.

  • fetch_size – JDBC fetch size for bulk scans.

  • scan_timeout_seconds – Bulk-scan request timeout, in seconds. Default is 600 (10 minutes), matching the Hologres connector’s own default for native SQL DDL.

  • filter_push_down – When True, push filters down during bulk scans. Mutually exclusive with binlog_filter_push_down.

  • binlog_filter_push_down – When True, push filters down on the binlog stream.

  • prefer_physical_column_over_metadata_column – When True, prefer a physical column over a connector-defined metadata column of the same name. Effective only in binlog mode.

  • lookup_batch_size – Maximum number of rows per dim-table point-query batch. Effective only when the DataFrame is used as a dim table in a downstream lookup join.

  • lookup_timeout_ms – Per-lookup timeout (milliseconds). 0 means no timeout.

  • lookup_column_table – When True, allow using a column-store Hologres table as a dim table. Performance is typically poor and the connector logs a warning.

  • lookup_insert_if_not_exists – When True, insert the current join key into Hologres if the dim-table lookup misses.

  • lookup_async – When True (default), enable asynchronous dim-table lookups. Async lookup is enabled when either lookup_async or async_ is True.

  • lookup_filter_push_down – When True, push dim-table filters down to Hologres (only column-vs-constant comparisons are pushed down). Requires VVR 11.4+.

  • cache – Dim-table cache policy. "None" (the default) disables caching; "LRU" enables an in-memory LRU cache.

  • cache_size – Maximum number of rows held by the LRU cache. Default is 100000, matching the Hologres connector’s own default for native SQL DDL.

  • cache_ttl_ms – Cache entry TTL (milliseconds). None (the default) lets the connector pick based on cache.

  • cache_empty – When True (default), also cache empty lookup results. Disable when the upstream Hologres table is constantly receiving new rows that joins should pick up.

  • async – Specifies whether to return results asynchronously. Note, async lookup is controlled by both this and lookup_async. To disable async lookup, you need to set both to False.

  • extra_options – Additional connector options forwarded through to the underlying Hologres connector. This is for options not exposed as named parameters of read_hologres, e.g., connection.direct.enabled, connection.max-alive-ms. If a key matches an option generated by a named parameter of read_hologres, the value in extra_options takes precedence. "connector" is reserved and must not be supplied.

Returns:

A DataFrame backed by the Hologres source.

Raises:

ValueError – If schema is missing; if any enum value is invalid (including cache); if binlog_startup_mode='TIMESTAMP' without start_time; if start_time is an aware datetime (with tzinfo); if binlog_change_log_mode='UPSERT' without primary_key; if connection_ssl_mode is "verify-ca" or "verify-full" without connection_ssl_root_cert_location; if lookup_filter_push_down is enabled together with either filter_push_down or binlog_filter_push_down, or if filter_push_down and binlog_filter_push_down are both enabled; if partition_values_to_read is set outside STATIC mode; or if extra_options contains a "connector" key.

Examples

Streaming binlog source (INITIAL mode, defaults):

import pyflink.dataframe as pf
df = pf.read_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.orders",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    schema={
        "order_id": pf.DataType.bigint(),
        "buyer": pf.DataType.string(),
        "amount": pf.DataType.decimal(20, 4),
    },
    primary_key="order_id",
)

Bulk scan only (no binlog), with column projection:

df = pf.read_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.user_profiles",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    schema={
        "user_id": pf.DataType.bigint(),
        "city": pf.DataType.string(),
        "country": pf.DataType.string(),
    },
    primary_key="user_id",
    binlog=False,
    columns=["user_id", "city"],
    filter_push_down=True,
)

Time-bounded binlog replay (TIMESTAMP startup):

from datetime import datetime
df = pf.read_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.events",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    schema={"event_id": pf.DataType.bigint()},
    binlog_startup_mode="TIMESTAMP",
    start_time=datetime(2026, 1, 15, 0, 0, 0),
    binlog_change_log_mode="ALL",
)

Static-partitioned binlog (consume only two partitions):

df = pf.read_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.daily_logs",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    schema={"day": pf.DataType.string(), "msg": pf.DataType.string()},
    partition_binlog_mode="STATIC",
    partition_values_to_read=["2026-01-15", "2026-01-16"],
)

Dim-table source for a downstream lookup join (LRU cache, async lookup tuned):

dim = pf.read_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.user_profiles",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    schema={
        "user_id": pf.DataType.bigint(),
        "country": pf.DataType.string(),
    },
    primary_key="user_id",
    binlog=False,
    cache="LRU",
    cache_size=50000,
    cache_ttl_ms=600000,
    lookup_async=True,
    lookup_batch_size=512,
)

Forward-compat: tune a non-named option (and override a named one) via extra_options:

df = pf.read_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.orders",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    schema={"order_id": pf.DataType.bigint()},
    fetch_size=512,
    extra_options={
        "hologres.server.version": "3.1",
        "source.scan.fetch-size": "8192",  # overrides default
    },
)

previous

pyflink.dataframe.io.read_sls

next

pyflink.dataframe.io.read_custom

On this page
  • read_hologres()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.