Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • DataFrame
  • pyflink.dataframe.dataframe.DataFrame.write_hologres

pyflink.dataframe.dataframe.DataFrame.write_hologres#

DataFrame.write_hologres(endpoint: str, *, db_name: str, table_name: str, username: str, password: str, primary_key: str | List[str] | None = None, connection_pool_size: int = 5, connection_pool_name: str = 'default', connection_fixed: bool | None = None, connection_max_idle_ms: int = 60000, connection_ssl_mode: Literal['disable', 'require', 'verify-ca', 'verify-full'] = 'disable', connection_ssl_root_cert_location: str | None = None, retry_count: int = 10, retry_sleep_step_ms: int = 5000, meta_cache_ttl_ms: int = 600000, serverless_computing: bool = False, write_mode: Literal['AUTO', 'INSERT', 'COPY_STREAM', 'COPY_BULK_LOAD', 'COPY_BULK_LOAD_ON_CONFLICT'] = 'AUTO', on_conflict_action: Literal['INSERT_OR_IGNORE', 'INSERT_OR_UPDATE', 'INSERT_OR_REPLACE'] = 'INSERT_OR_UPDATE', create_missing_partition: bool = True, delete_strategy: Literal['IGNORE_DELETE', 'NON_PK_FIELD_TO_NULL', 'DELETE_ROW_ON_PK', 'CHANGELOG_STANDARD'] = 'CHANGELOG_STANDARD', ignore_null_when_update: bool = False, ignore_null_when_update_by_expr: bool = False, default_for_not_null_column: bool = True, remove_u0000_in_text: bool = True, partial_insert: bool = False, deduplication: bool = True, aggressive_flush: bool = False, check_and_put_column: str | None = None, check_and_put_operator: str = 'GREATER', check_and_put_null_as: str | None = None, insert_batch_size: int = 512, insert_batch_byte_size: int = 2097152, insert_flush_interval_ms: int = 10000, copy_format: Literal['binary', 'text', 'binaryrow'] = 'binary', insert_conflict_update_set: str | None = None, insert_conflict_where: str | None = None, parallelism: int | None = None, extra_options: Dict[str, Any] | None = None) → None[source]#

Write the DataFrame to a Hologres table.

Wraps the Ververica Hologres sink connector. The DataFrame’s resolved schema is propagated to the connector automatically — use primary_key to declare a primary key constraint.

Parameters:
  • endpoint – Hologres service endpoint (use the VPC endpoint for VVR).

  • db_name – Hologres database name. May include a compute-group suffix (e.g. "my_db/my_group").

  • table_name – Target Hologres table name. Use "schema.tableName" for non-public schemas.

  • username – Hologres account username (or AccessKey ID).

  • password – Hologres account password (or AccessKey Secret).

  • primary_key – Primary key column or list of columns. Required for update/upsert semantics; should match the Hologres table’s primary key definition.

  • connection_pool_size – JDBC connection pool size for the sink.

  • connection_pool_name – Connection pool name; sinks sharing the same name within a Flink TaskManager share connections.

  • connection_fixed – When True, enable Hologres lightweight (fixed) connection mode. None (the default) lets the connector pick a value based on the Hologres engine version.

  • connection_max_idle_ms – Idle time (milliseconds) before a JDBC connection in the pool is released.

  • connection_ssl_mode – SSL transport mode. One of "disable", "require", "verify-ca", or "verify-full".

  • connection_ssl_root_cert_location – Path to the CA certificate file (under /flink/usrlib/<name> on VVP).

  • retry_count – Number of retries on connection failures.

  • retry_sleep_step_ms – Linear back-off step (milliseconds).

  • meta_cache_ttl_ms – TTL (milliseconds) of the local Hologres TableSchema cache.

  • serverless_computing – When True, route bulk imports through Hologres serverless compute.

  • write_mode – Write protocol. "AUTO" (default) lets the Hologres connector pick the protocol based on the Flink runtime mode and Hologres engine version — it selects a COPY_* path for batch jobs against Hologres 3.1+ and otherwise falls back to "INSERT". "INSERT" forces JDBC INSERTs. "COPY_STREAM" uses streaming COPY. The two COPY_BULK_LOAD* modes perform bulk loads (PK conflicts throw with COPY_BULK_LOAD).

  • on_conflict_action – PK conflict strategy. "INSERT_OR_UPDATE" (default) updates conflicting rows, "INSERT_OR_IGNORE" skips them, and "INSERT_OR_REPLACE" deletes-then-inserts.

  • create_missing_partition – When True (the default), the sink auto-creates a partition child table on first write if it doesn’t already exist. Set to False to require that partitions be created out of band, in which case writes to a missing partition will fail. Only valid in INSERT mode.

  • delete_strategy – How to handle retraction (-D, -U) records. "CHANGELOG_STANDARD" (default) processes them per Flink’s normal change-log semantics; the other values apply to partial-update scenarios. "NON_PK_FIELD_TO_NULL" and "DELETE_ROW_ON_PK" require primary_key to be set.

  • ignore_null_when_update – When True, ignore null values during updates (effective only in INSERT mode).

  • ignore_null_when_update_by_expr – Same as above but for expression-based updates (requires Hologres 4.0+).

  • default_for_not_null_column – When True (the default), auto- fill defaults when writing null into NOT NULL columns.

  • remove_u0000_in_text – When True (the default), strip the illegal \u0000 character from string columns.

  • partial_insert – When True, only write the columns declared in the Flink schema.

  • deduplication – When True (the default), deduplicate rows by primary key inside each batch.

  • aggressive_flush – When True, flush even partially-filled batches when the sink is idle.

  • check_and_put_column – Column used for conditional updates. The update only happens when new.<column> <operator> old.<column> evaluates true. Not supported with any COPY_* write_mode. "AUTO" is accepted because it normally resolves to "INSERT", but be aware that batch jobs on Hologres 3.1+ may resolve "AUTO" to "COPY_BULK_LOAD*"; in that case the Hologres connector will reject the combination at submit time. Set write_mode="INSERT" explicitly if you need check-and-put in a batch job.

  • check_and_put_operator – Comparison operator for conditional updates. Default is "GREATER".

  • check_and_put_null_as – Value used in place of NULL during the comparison.

  • insert_batch_size – Maximum rows per INSERT batch.

  • insert_batch_byte_size – Maximum bytes per INSERT batch (default: 2 MB).

  • insert_flush_interval_ms – Maximum wait time (milliseconds) before flushing an INSERT batch.

  • copy_format – Wire format used in COPY modes. "binary" (the default) is the only value accepted with write_mode='AUTO' or write_mode='INSERT' (AUTO may resolve to INSERT at runtime). The COPY_* modes additionally accept "text" (typically used with COPY_BULK_LOAD*) and "binaryrow" (a COPY_STREAM optimization, Hologres 4.1+); the Hologres connector may impose further constraints based on the engine version.

  • insert_conflict_update_set – SET clause executed on PK conflict (a Hologres expression, e.g. "col=excluded.col + 1"). Conflicts with the check_and_put_* options.

  • insert_conflict_where – WHERE clause limiting which conflicting rows are updated. Conflicts with the check_and_put_* options.

  • parallelism – Custom parallelism for the Hologres sink operator. None (the default) leaves the option unset, letting Flink inherit parallelism from the upstream operator.

  • extra_options – Additional connector options forwarded through to the underlying Hologres connector. This is for options not exposed as named parameters of write_hologres, e.g., connection.direct.enabled, connection.max-alive-ms. If a key matches an option generated by a named parameter of write_hologres, the value in extra_options takes precedence. "connector" is reserved and must not be supplied.

Raises:

ValueError – If any enum value is invalid; if copy_format is non-binary with write_mode='AUTO' or write_mode='INSERT'; if connection_ssl_mode is "verify-ca" or "verify-full" without connection_ssl_root_cert_location; if the check_and_put_* options are combined with an incompatible on_conflict_action or with insert_conflict_where; if check_and_put_column is combined with an explicit COPY_* write_mode; if delete_strategy is "NON_PK_FIELD_TO_NULL" or "DELETE_ROW_ON_PK" without primary_key; or if extra_options contains a "connector" key.

Examples

Streaming upsert sink (default AUTO write_mode — the connector picks INSERT for streaming jobs and COPY paths for batch/Holo 3.1+):

import pyflink.dataframe as pf
events = pf.read_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.events",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    schema={
        "event_id": pf.DataType.bigint(),
        "user": pf.DataType.string(),
    },
    primary_key="event_id",
)
events.write_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.events_processed",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    primary_key="event_id",
)

High-throughput bulk load (COPY_BULK_LOAD on append-only data):

df.write_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.daily_summary",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    write_mode="COPY_BULK_LOAD",
    copy_format="text",
    insert_batch_size=4096,
    insert_batch_byte_size=8 * 1024 * 1024,
)

Conditional update with check-and-put (only overwrite when the new version is greater):

df.write_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.profiles",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    primary_key="user_id",
    on_conflict_action="INSERT_OR_UPDATE",
    check_and_put_column="version",
    check_and_put_operator="GREATER",
    check_and_put_null_as="0",
)

Partial update (write only listed columns), tuned batch size:

df.write_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.user_state",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    primary_key="user_id",
    partial_insert=True,
    insert_batch_size=1024,
    insert_flush_interval_ms=2000,
    ignore_null_when_update=True,
)

Forward-compat: tune a non-named option (and override a named one) via extra_options:

df.write_hologres(
    "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
    db_name="my_db",
    table_name="public.events",
    username=ALIYUN_AK_ID,
    password=ALIYUN_AK_SECRET,
    extra_options={
        "hologres.server.version": "3.1",
        "sink.insert.batch-size": "2048",  # overrides default
    },
)

previous

pyflink.dataframe.dataframe.DataFrame.write_sls

next

pyflink.dataframe.dataframe.DataFrame.write_custom

On this page
  • DataFrame.write_hologres()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.