pyflink.dataframe.dataframe.DataFrame.write_hologres#
- DataFrame.write_hologres(endpoint: str, *, db_name: str, table_name: str, username: str, password: str, primary_key: str | List[str] | None = None, connection_pool_size: int = 5, connection_pool_name: str = 'default', connection_fixed: bool | None = None, connection_max_idle_ms: int = 60000, connection_ssl_mode: Literal['disable', 'require', 'verify-ca', 'verify-full'] = 'disable', connection_ssl_root_cert_location: str | None = None, retry_count: int = 10, retry_sleep_step_ms: int = 5000, meta_cache_ttl_ms: int = 600000, serverless_computing: bool = False, write_mode: Literal['AUTO', 'INSERT', 'COPY_STREAM', 'COPY_BULK_LOAD', 'COPY_BULK_LOAD_ON_CONFLICT'] = 'AUTO', on_conflict_action: Literal['INSERT_OR_IGNORE', 'INSERT_OR_UPDATE', 'INSERT_OR_REPLACE'] = 'INSERT_OR_UPDATE', create_missing_partition: bool = True, delete_strategy: Literal['IGNORE_DELETE', 'NON_PK_FIELD_TO_NULL', 'DELETE_ROW_ON_PK', 'CHANGELOG_STANDARD'] = 'CHANGELOG_STANDARD', ignore_null_when_update: bool = False, ignore_null_when_update_by_expr: bool = False, default_for_not_null_column: bool = True, remove_u0000_in_text: bool = True, partial_insert: bool = False, deduplication: bool = True, aggressive_flush: bool = False, check_and_put_column: str | None = None, check_and_put_operator: str = 'GREATER', check_and_put_null_as: str | None = None, insert_batch_size: int = 512, insert_batch_byte_size: int = 2097152, insert_flush_interval_ms: int = 10000, copy_format: Literal['binary', 'text', 'binaryrow'] = 'binary', insert_conflict_update_set: str | None = None, insert_conflict_where: str | None = None, parallelism: int | None = None, extra_options: Dict[str, Any] | None = None) None[source]#
Write the DataFrame to a Hologres table.
Wraps the Ververica Hologres sink connector. The DataFrame’s resolved schema is propagated to the connector automatically — use
primary_keyto declare a primary key constraint.- Parameters:
endpoint – Hologres service endpoint (use the VPC endpoint for VVR).
db_name – Hologres database name. May include a compute-group suffix (e.g.
"my_db/my_group").table_name – Target Hologres table name. Use
"schema.tableName"for non-public schemas.username – Hologres account username (or AccessKey ID).
password – Hologres account password (or AccessKey Secret).
primary_key – Primary key column or list of columns. Required for update/upsert semantics; should match the Hologres table’s primary key definition.
connection_pool_size – JDBC connection pool size for the sink.
connection_pool_name – Connection pool name; sinks sharing the same name within a Flink TaskManager share connections.
connection_fixed – When
True, enable Hologres lightweight (fixed) connection mode.None(the default) lets the connector pick a value based on the Hologres engine version.connection_max_idle_ms – Idle time (milliseconds) before a JDBC connection in the pool is released.
connection_ssl_mode – SSL transport mode. One of
"disable","require","verify-ca", or"verify-full".connection_ssl_root_cert_location – Path to the CA certificate file (under
/flink/usrlib/<name>on VVP).retry_count – Number of retries on connection failures.
retry_sleep_step_ms – Linear back-off step (milliseconds).
meta_cache_ttl_ms – TTL (milliseconds) of the local Hologres
TableSchemacache.serverless_computing – When
True, route bulk imports through Hologres serverless compute.write_mode – Write protocol.
"AUTO"(default) lets the Hologres connector pick the protocol based on the Flink runtime mode and Hologres engine version — it selects aCOPY_*path for batch jobs against Hologres 3.1+ and otherwise falls back to"INSERT"."INSERT"forces JDBC INSERTs."COPY_STREAM"uses streaming COPY. The twoCOPY_BULK_LOAD*modes perform bulk loads (PK conflicts throw withCOPY_BULK_LOAD).on_conflict_action – PK conflict strategy.
"INSERT_OR_UPDATE"(default) updates conflicting rows,"INSERT_OR_IGNORE"skips them, and"INSERT_OR_REPLACE"deletes-then-inserts.create_missing_partition – When
True(the default), the sink auto-creates a partition child table on first write if it doesn’t already exist. Set toFalseto require that partitions be created out of band, in which case writes to a missing partition will fail. Only valid inINSERTmode.delete_strategy – How to handle retraction (-D, -U) records.
"CHANGELOG_STANDARD"(default) processes them per Flink’s normal change-log semantics; the other values apply to partial-update scenarios."NON_PK_FIELD_TO_NULL"and"DELETE_ROW_ON_PK"requireprimary_keyto be set.ignore_null_when_update – When
True, ignore null values during updates (effective only in INSERT mode).ignore_null_when_update_by_expr – Same as above but for expression-based updates (requires Hologres 4.0+).
default_for_not_null_column – When
True(the default), auto- fill defaults when writing null into NOT NULL columns.remove_u0000_in_text – When
True(the default), strip the illegal\u0000character from string columns.partial_insert – When
True, only write the columns declared in the Flink schema.deduplication – When
True(the default), deduplicate rows by primary key inside each batch.aggressive_flush – When
True, flush even partially-filled batches when the sink is idle.check_and_put_column – Column used for conditional updates. The update only happens when
new.<column> <operator> old.<column>evaluates true. Not supported with anyCOPY_*write_mode."AUTO"is accepted because it normally resolves to"INSERT", but be aware that batch jobs on Hologres 3.1+ may resolve"AUTO"to"COPY_BULK_LOAD*"; in that case the Hologres connector will reject the combination at submit time. Setwrite_mode="INSERT"explicitly if you need check-and-put in a batch job.check_and_put_operator – Comparison operator for conditional updates. Default is
"GREATER".check_and_put_null_as – Value used in place of NULL during the comparison.
insert_batch_size – Maximum rows per INSERT batch.
insert_batch_byte_size – Maximum bytes per INSERT batch (default: 2 MB).
insert_flush_interval_ms – Maximum wait time (milliseconds) before flushing an INSERT batch.
copy_format – Wire format used in COPY modes.
"binary"(the default) is the only value accepted withwrite_mode='AUTO'orwrite_mode='INSERT'(AUTO may resolve to INSERT at runtime). TheCOPY_*modes additionally accept"text"(typically used withCOPY_BULK_LOAD*) and"binaryrow"(aCOPY_STREAMoptimization, Hologres 4.1+); the Hologres connector may impose further constraints based on the engine version.insert_conflict_update_set – SET clause executed on PK conflict (a Hologres expression, e.g.
"col=excluded.col + 1"). Conflicts with thecheck_and_put_*options.insert_conflict_where – WHERE clause limiting which conflicting rows are updated. Conflicts with the
check_and_put_*options.parallelism – Custom parallelism for the Hologres sink operator.
None(the default) leaves the option unset, letting Flink inherit parallelism from the upstream operator.extra_options – Additional connector options forwarded through to the underlying Hologres connector. This is for options not exposed as named parameters of
write_hologres, e.g.,connection.direct.enabled,connection.max-alive-ms. If a key matches an option generated by a named parameter ofwrite_hologres, the value inextra_optionstakes precedence."connector"is reserved and must not be supplied.
- Raises:
ValueError – If any enum value is invalid; if
copy_formatis non-binary withwrite_mode='AUTO'orwrite_mode='INSERT'; ifconnection_ssl_modeis"verify-ca"or"verify-full"withoutconnection_ssl_root_cert_location; if thecheck_and_put_*options are combined with an incompatibleon_conflict_actionor withinsert_conflict_where; ifcheck_and_put_columnis combined with an explicitCOPY_*write_mode; ifdelete_strategyis"NON_PK_FIELD_TO_NULL"or"DELETE_ROW_ON_PK"withoutprimary_key; or ifextra_optionscontains a"connector"key.
Examples
Streaming upsert sink (default AUTO write_mode — the connector picks INSERT for streaming jobs and COPY paths for batch/Holo 3.1+):
import pyflink.dataframe as pf events = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.events", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={ "event_id": pf.DataType.bigint(), "user": pf.DataType.string(), }, primary_key="event_id", ) events.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.events_processed", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, primary_key="event_id", )
High-throughput bulk load (COPY_BULK_LOAD on append-only data):
df.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.daily_summary", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, write_mode="COPY_BULK_LOAD", copy_format="text", insert_batch_size=4096, insert_batch_byte_size=8 * 1024 * 1024, )
Conditional update with check-and-put (only overwrite when the new
versionis greater):df.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.profiles", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, primary_key="user_id", on_conflict_action="INSERT_OR_UPDATE", check_and_put_column="version", check_and_put_operator="GREATER", check_and_put_null_as="0", )
Partial update (write only listed columns), tuned batch size:
df.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.user_state", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, primary_key="user_id", partial_insert=True, insert_batch_size=1024, insert_flush_interval_ms=2000, ignore_null_when_update=True, )
Forward-compat: tune a non-named option (and override a named one) via
extra_options:df.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.events", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, extra_options={ "hologres.server.version": "3.1", "sink.insert.batch-size": "2048", # overrides default }, )