pyflink.dataframe.io.read_hologres#
- read_hologres(endpoint: str, *, db_name: str, table_name: str, username: str, password: str, schema: Dict[str, DataType] | None = None, primary_key: str | List[str] | None = None, columns: List[str] | None = None, connection_pool_size: int = 5, connection_pool_name: str = 'default', connection_fixed: bool | None = None, connection_max_idle_ms: int = 60000, connection_ssl_mode: Literal['disable', 'require', 'verify-ca', 'verify-full'] = 'disable', connection_ssl_root_cert_location: str | None = None, retry_count: int = 10, retry_sleep_step_ms: int = 5000, meta_cache_ttl_ms: int = 600000, serverless_computing: bool = False, binlog: bool = True, binlog_read_mode: Literal['AUTO', 'JDBC', 'HOLOHUB'] = 'AUTO', binlog_change_log_mode: Literal['ALL', 'UPSERT', 'ALL_AS_APPEND_ONLY'] = 'ALL', binlog_startup_mode: Literal['INITIAL', 'EARLIEST_OFFSET', 'TIMESTAMP', 'LATEST_OFFSET'] = 'INITIAL', binlog_batch_size: int = 512, binlog_request_timeout_ms: int = 300000, binlog_project_columns: bool | None = None, binlog_compression: bool | None = None, partition_binlog_mode: Literal['DISABLE', 'STATIC', 'DYNAMIC'] = 'DISABLE', partition_binlog_lateness_timeout_minutes: int = 60, partition_values_to_read: str | List[str] | None = None, start_time: str | datetime | None = None, fetch_size: int = 512, scan_timeout_seconds: int = 600, filter_push_down: bool = False, binlog_filter_push_down: bool = False, prefer_physical_column_over_metadata_column: bool = False, lookup_batch_size: int = 256, lookup_timeout_ms: int = 0, lookup_column_table: bool = False, lookup_insert_if_not_exists: bool = False, lookup_async: bool = True, lookup_filter_push_down: bool = False, cache: Literal['None', 'LRU'] = 'None', cache_size: int = 100000, cache_ttl_ms: int | None = None, cache_empty: bool = True, async_: bool = False, extra_options: Dict[str, Any] | None = None) DataFrame[source]#
Read a Hologres table into a DataFrame.
Wraps the Ververica Hologres source connector. Hologres source supports both bulk scans and binlog streaming; the consumption mode is controlled by
binlogand the binlog-related parameters. The connector options are exposed with Pythonic snake_case names, in the order the Hologres documentation’s “WITH parameters” table presents them.- Parameters:
endpoint – Hologres service endpoint (use the VPC endpoint for VVR).
db_name – Hologres database name. Optionally suffixed with a compute group (e.g.
"my_db/my_group").table_name – Hologres table name. Use
"schema.tableName"to read from a non-public schema.username – Hologres account username (or AccessKey ID).
password – Hologres account password (or AccessKey Secret).
schema – Dict of
{column_name: DataType}specifying the source schema. Required.primary_key – Optional primary key column name or list of column names. Set this when the upstream Hologres table has a primary key — the streaming change-log behavior depends on it.
columns – Optional list of column names to read. If
None, all columns fromschemaare read.connection_pool_size – JDBC connection pool size. Applies to both sink and dim-table workloads.
connection_pool_name – Connection pool name; tables sharing the same name within a Flink TaskManager share connections.
connection_fixed – When
True, enable Hologres lightweight (fixed) connection mode. WhenNone(the default), the connector picks a value based on the Hologres engine version.connection_max_idle_ms – Idle time (milliseconds) before a JDBC connection in the pool is released.
connection_ssl_mode – SSL transport mode. One of
"disable","require","verify-ca", or"verify-full". The verify-* modes requireconnection_ssl_root_cert_location.connection_ssl_root_cert_location – Path to the CA certificate file. Files must be uploaded to VVP under
/flink/usrlib/<name>.retry_count – Number of retries on connection failures.
retry_sleep_step_ms – Linear back-off step (milliseconds) added per retry attempt.
meta_cache_ttl_ms – TTL (milliseconds) of the local Hologres
TableSchemacache.serverless_computing – When
True, run scans on Hologres serverless compute (only available for batch reads).binlog – When
True(the default), consume binlog changes; whenFalse, run a bulk scan only.binlog_read_mode – Binlog read transport.
"AUTO"lets the connector pick between JDBC and HoloHub.binlog_change_log_mode – How binlog records are mapped to Flink change-log rows.
"ALL"(the default) emits +I/-U/+U/-D;"UPSERT"emits +I/+U/-D and requires aprimary_key;"ALL_AS_APPEND_ONLY"treats every event as +I.binlog_startup_mode – Where the binlog consumer starts.
"INITIAL"first snapshots, then tails the binlog;"EARLIEST_OFFSET"and"LATEST_OFFSET"start at the binlog ends;"TIMESTAMP"starts atstart_time(required in this mode). Settingstart_timealone is equivalent — the connector promotes the startup mode to TIMESTAMP wheneverstart_timeis provided.binlog_batch_size – Number of rows read per binlog batch.
binlog_request_timeout_ms – Binlog request timeout (milliseconds).
binlog_project_columns – When
True, request only the declared columns from the binlog stream.None(default) lets the server pick.binlog_compression – When
True, enable LZ4 compression on the binlog transport.None(default) lets the server pick.partition_binlog_mode – How to consume binlog for partitioned tables.
"DISABLE"(default) treats partition tables as ordinary tables."STATIC"reads a fixed set of partitions (combine withpartition_values_to_read)."DYNAMIC"adapts to new partitions automatically.partition_binlog_lateness_timeout_minutes – Maximum lateness, in minutes, tolerated when
partition_binlog_mode='DYNAMIC'.partition_values_to_read – Partition values to consume when
partition_binlog_mode='STATIC'. Accepts a list of strings (e.g.["ds=20240101", "ds=20240102"]) or a comma-separated string (e.g."ds=20240101,ds=20240102"). Wildcards / regex are not supported.start_time – Binlog consumption start time. When set, the Hologres connector forces TIMESTAMP startup and ignores
binlog_startup_mode, so it’s fine to leave that argument at its default. Accepts a naivedatetimewithouttzinfo(formatted as"yyyy-MM-dd HH:mm:ss") or a string already in that format. The Hologres connector parses this string in the JobManager JVM’s default timezone, so the wall-clock value you pass must already be expressed in that zone.fetch_size – JDBC fetch size for bulk scans.
scan_timeout_seconds – Bulk-scan request timeout, in seconds. Default is 600 (10 minutes), matching the Hologres connector’s own default for native SQL DDL.
filter_push_down – When
True, push filters down during bulk scans. Mutually exclusive withbinlog_filter_push_down.binlog_filter_push_down – When
True, push filters down on the binlog stream.prefer_physical_column_over_metadata_column – When
True, prefer a physical column over a connector-defined metadata column of the same name. Effective only in binlog mode.lookup_batch_size – Maximum number of rows per dim-table point-query batch. Effective only when the DataFrame is used as a dim table in a downstream lookup join.
lookup_timeout_ms – Per-lookup timeout (milliseconds).
0means no timeout.lookup_column_table – When
True, allow using a column-store Hologres table as a dim table. Performance is typically poor and the connector logs a warning.lookup_insert_if_not_exists – When
True, insert the current join key into Hologres if the dim-table lookup misses.lookup_async – When
True(default), enable asynchronous dim-table lookups. Async lookup is enabled when eitherlookup_asyncorasync_isTrue.lookup_filter_push_down – When
True, push dim-table filters down to Hologres (only column-vs-constant comparisons are pushed down). Requires VVR 11.4+.cache – Dim-table cache policy.
"None"(the default) disables caching;"LRU"enables an in-memory LRU cache.cache_size – Maximum number of rows held by the LRU cache. Default is 100000, matching the Hologres connector’s own default for native SQL DDL.
cache_ttl_ms – Cache entry TTL (milliseconds).
None(the default) lets the connector pick based oncache.cache_empty – When
True(default), also cache empty lookup results. Disable when the upstream Hologres table is constantly receiving new rows that joins should pick up.async – Specifies whether to return results asynchronously. Note, async lookup is controlled by both this and
lookup_async. To disable async lookup, you need to set both toFalse.extra_options – Additional connector options forwarded through to the underlying Hologres connector. This is for options not exposed as named parameters of
read_hologres, e.g.,connection.direct.enabled,connection.max-alive-ms. If a key matches an option generated by a named parameter ofread_hologres, the value inextra_optionstakes precedence."connector"is reserved and must not be supplied.
- Returns:
A DataFrame backed by the Hologres source.
- Raises:
ValueError – If
schemais missing; if any enum value is invalid (includingcache); ifbinlog_startup_mode='TIMESTAMP'withoutstart_time; ifstart_timeis an awaredatetime(withtzinfo); ifbinlog_change_log_mode='UPSERT'withoutprimary_key; ifconnection_ssl_modeis"verify-ca"or"verify-full"withoutconnection_ssl_root_cert_location; iflookup_filter_push_downis enabled together with eitherfilter_push_downorbinlog_filter_push_down, or iffilter_push_downandbinlog_filter_push_downare both enabled; ifpartition_values_to_readis set outside STATIC mode; or ifextra_optionscontains a"connector"key.
Examples
Streaming binlog source (INITIAL mode, defaults):
import pyflink.dataframe as pf df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.orders", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={ "order_id": pf.DataType.bigint(), "buyer": pf.DataType.string(), "amount": pf.DataType.decimal(20, 4), }, primary_key="order_id", )
Bulk scan only (no binlog), with column projection:
df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.user_profiles", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={ "user_id": pf.DataType.bigint(), "city": pf.DataType.string(), "country": pf.DataType.string(), }, primary_key="user_id", binlog=False, columns=["user_id", "city"], filter_push_down=True, )
Time-bounded binlog replay (TIMESTAMP startup):
from datetime import datetime df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.events", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={"event_id": pf.DataType.bigint()}, binlog_startup_mode="TIMESTAMP", start_time=datetime(2026, 1, 15, 0, 0, 0), binlog_change_log_mode="ALL", )
Static-partitioned binlog (consume only two partitions):
df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.daily_logs", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={"day": pf.DataType.string(), "msg": pf.DataType.string()}, partition_binlog_mode="STATIC", partition_values_to_read=["2026-01-15", "2026-01-16"], )
Dim-table source for a downstream lookup join (LRU cache, async lookup tuned):
dim = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.user_profiles", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={ "user_id": pf.DataType.bigint(), "country": pf.DataType.string(), }, primary_key="user_id", binlog=False, cache="LRU", cache_size=50000, cache_ttl_ms=600000, lookup_async=True, lookup_batch_size=512, )
Forward-compat: tune a non-named option (and override a named one) via
extra_options:df = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.orders", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={"order_id": pf.DataType.bigint()}, fetch_size=512, extra_options={ "hologres.server.version": "3.1", "source.scan.fetch-size": "8192", # overrides default }, )