Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • DataFrame
  • pyflink.dataframe.dataframe.DataFrame.write_sls

pyflink.dataframe.dataframe.DataFrame.write_sls#

DataFrame.write_sls(endpoint: str, *, project: str, logstore: str, access_id: str | None = None, access_key: str | None = None, topic_field: str | None = None, time_field: str | None = None, source_field: str | None = None, partition_field: str | None = None, buckets: int = 64, flush_interval_ms: int = 2000, write_null_properties: bool = True, extra_options: Dict[str, str] | None = None) → None[source]#

Write the DataFrame to an SLS (Aliyun Log Service) logstore.

Wraps the Ververica SLS sink connector. The DataFrame’s resolved schema is propagated to the connector automatically.

Parameters:
  • endpoint – SLS private-network service endpoint address.

  • project – Name of the SLS project.

  • logstore – SLS LogStore (or MetricStore) name.

  • access_id – Alibaba Cloud account AccessKey ID. If both access_id and access_key are None (the default), the SLS connector falls back to the VVP STS token.

  • access_key – Alibaba Cloud account AccessKey Secret. See access_id for the STS fallback behaviour.

  • topic_field – Name of a column whose value overrides the SLS __topic__ attribute for each record.

  • time_field – Name of an INT column whose value overrides the SLS __timestamp__ attribute. If unset, the current write time is used.

  • source_field – Name of a column whose value overrides the SLS __source__ attribute.

  • partition_field – Name of a column to hash for shard routing so records with identical hash land on the same shard.

  • buckets – Hash-bucket count when partition_field is set. Must be a power of 2 in [1, 256] and should be at least the shard count of the logstore.

  • flush_interval_ms – Time interval (milliseconds) that triggers writes to SLS.

  • write_null_properties – When True (the default), null field values are written as empty strings. When False, null fields are omitted entirely.

  • extra_options – Additional connector options forwarded through to the underlying SLS connector. This is for options not exposed as named parameters of write_sls, e.g., IOThreadNum, baseRetryBackOffTimeMs, maxRetryBackOffTimeMs. If a key matches an option generated by a named parameter of write_sls, the value in extra_options takes precedence. "connector" is reserved and must not be supplied.

Raises:

ValueError – If buckets is not a power of 2 in [1, 256], or extra_options contains a "connector" key.

Examples

Minimal sink (uses default partitioning and write_null behavior):

import pyflink.dataframe as pf
pf.from_records(
    [("INFO", "started"), ("WARN", "slow")],
    schema=["level", "message"],
).write_sls(
    "cn-hangzhou-intranet.log.aliyuncs.com",
    project="my-project",
    logstore="my-logstore",
    access_id=ALIYUN_AK_ID,
    access_key=ALIYUN_AK_SECRET
)

Partitioned writes (route by user_id into 128 buckets):

df.write_sls(
    "cn-hangzhou-intranet.log.aliyuncs.com",
    project="my-project",
    logstore="my-logstore",
    access_id=ALIYUN_AK_ID,
    access_key=ALIYUN_AK_SECRET,
    partition_field="user_id",
    buckets=128
)

Field overrides (use columns as SLS metadata attributes):

df.write_sls(
    "cn-hangzhou-intranet.log.aliyuncs.com",
    project="my-project",
    logstore="my-logstore",
    access_id=ALIYUN_AK_ID,
    access_key=ALIYUN_AK_SECRET,
    topic_field="category",
    source_field="host",
    time_field="event_time_seconds"
)

Escape hatch via extra_options (tunes a non-named option and overrides buckets):

df.write_sls(
    "cn-hangzhou-intranet.log.aliyuncs.com",
    project="my-project",
    logstore="my-logstore",
    access_id=ALIYUN_AK_ID,
    access_key=ALIYUN_AK_SECRET,
    partition_field="user_id",
    extra_options={
        "IOThreadNum": "16",
        "buckets": "256",  # overrides default 64
    }
)

previous

pyflink.dataframe.dataframe.DataFrame.write_paimon

next

pyflink.dataframe.dataframe.DataFrame.write_hologres

On this page
  • DataFrame.write_sls()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.