pyflink.dataframe.dataframe.DataFrame.write_sls#
- DataFrame.write_sls(endpoint: str, *, project: str, logstore: str, access_id: str | None = None, access_key: str | None = None, topic_field: str | None = None, time_field: str | None = None, source_field: str | None = None, partition_field: str | None = None, buckets: int = 64, flush_interval_ms: int = 2000, write_null_properties: bool = True, extra_options: Dict[str, str] | None = None) None[source]#
Write the DataFrame to an SLS (Aliyun Log Service) logstore.
Wraps the Ververica SLS sink connector. The DataFrame’s resolved schema is propagated to the connector automatically.
- Parameters:
endpoint – SLS private-network service endpoint address.
project – Name of the SLS project.
logstore – SLS LogStore (or MetricStore) name.
access_id – Alibaba Cloud account AccessKey ID. If both
access_idandaccess_keyareNone(the default), the SLS connector falls back to the VVP STS token.access_key – Alibaba Cloud account AccessKey Secret. See
access_idfor the STS fallback behaviour.topic_field – Name of a column whose value overrides the SLS
__topic__attribute for each record.time_field – Name of an INT column whose value overrides the SLS
__timestamp__attribute. If unset, the current write time is used.source_field – Name of a column whose value overrides the SLS
__source__attribute.partition_field – Name of a column to hash for shard routing so records with identical hash land on the same shard.
buckets – Hash-bucket count when
partition_fieldis set. Must be a power of 2 in [1, 256] and should be at least the shard count of the logstore.flush_interval_ms – Time interval (milliseconds) that triggers writes to SLS.
write_null_properties – When
True(the default), null field values are written as empty strings. WhenFalse, null fields are omitted entirely.extra_options – Additional connector options forwarded through to the underlying SLS connector. This is for options not exposed as named parameters of
write_sls, e.g.,IOThreadNum,baseRetryBackOffTimeMs,maxRetryBackOffTimeMs. If a key matches an option generated by a named parameter of write_sls, the value inextra_optionstakes precedence."connector"is reserved and must not be supplied.
- Raises:
ValueError – If
bucketsis not a power of 2 in [1, 256], orextra_optionscontains a"connector"key.
Examples
Minimal sink (uses default partitioning and write_null behavior):
import pyflink.dataframe as pf pf.from_records( [("INFO", "started"), ("WARN", "slow")], schema=["level", "message"], ).write_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET )
Partitioned writes (route by
user_idinto 128 buckets):df.write_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, partition_field="user_id", buckets=128 )
Field overrides (use columns as SLS metadata attributes):
df.write_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, topic_field="category", source_field="host", time_field="event_time_seconds" )
Escape hatch via
extra_options(tunes a non-named option and overridesbuckets):df.write_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, partition_field="user_id", extra_options={ "IOThreadNum": "16", "buckets": "256", # overrides default 64 } )