pyflink.dataframe.io.read_sls#
- read_sls(endpoint: str, *, project: str, logstore: str, access_id: str | None = None, access_key: str | None = None, schema: Dict[str, DataType] | None = None, columns: List[str] | None = None, enable_new_source: bool = True, shard_discovery_interval_ms: int = 60000, startup_mode: Literal['timestamp', 'latest', 'earliest', 'consumer_group'] = 'timestamp', start_time: str | datetime | None = None, stop_time: str | datetime | None = None, time_zone: str | None = None, consumer_group: str | None = None, max_retries: int = 3, batch_get_size: int = 100, exit_after_finish: bool = False, processor: str | None = None, extra_options: Dict[str, str] | None = None) DataFrame[source]#
Read an SLS (Aliyun Log Service) logstore into a DataFrame.
Wraps the Ververica SLS source connector. The connector options are exposed with Pythonic snake_case names, in the same order as the SLS documentation’s “WITH parameters” table.
- Parameters:
endpoint – SLS private-network service endpoint address.
project – Name of the SLS project.
logstore – SLS LogStore (or MetricStore) name.
access_id – Alibaba Cloud account AccessKey ID. If both
access_idandaccess_keyareNone(the default), the SLS connector falls back to the VVP STS token.access_key – Alibaba Cloud account AccessKey Secret. See
access_idfor the STS fallback behaviour.schema – Dict of
{column_name: DataType}specifying the source’s schema. Required.columns – Optional list of column names to read. If
None, all columns fromschemaare read.enable_new_source – When
True(the default), use the FLIP-27 SLS source that auto-adapts to shard changes. WhenFalse, use the legacy source implementation.shard_discovery_interval_ms – Polling interval (milliseconds) for shard-change discovery, applied when
enable_new_sourceisTrue. A non-positive value disables shard discovery.startup_mode –
Where the source begins consuming.
"timestamp"(default): start fromstart_time; if unset, the SLS connector uses the current time."latest": skip to the most recent log group."earliest": start from the oldest available log group."consumer_group": resume from the offset recorded forconsumer_group(which is required in this mode).
start_time – Consumption start time, only valid when
startup_mode='timestamp'. Accepts a naivedatetime(formatted as"yyyy-MM-dd HH:mm:ss") or a string in that format.datetimewithtzinfois rejected; use thetime_zoneparameter to specify the timezone explicitly.stop_time – Consumption end time. Use for historical replay only. Accepts the same forms as
start_time.time_zone – Timezone in which
start_time/stop_timestrings are interpreted by the SLS connector (e.g."Asia/Shanghai","UTC"). When unset, the SLS connector falls back to the JobManager JVM’s default timezone.consumer_group – Consumer-group name used to record consumption progress. Required when
startup_mode='consumer_group'. In other startup modes,consumer_groupis still forwarded to the connector and used only for commit tracking — it does NOT change where consumption starts.max_retries – Number of retries after a failed SLS read.
batch_get_size – Number of LogGroups fetched per request. Must be in [1, 1000]; exceeding 1000 errors out at the Java side.
exit_after_finish – When
True, the Flink job exits after the source finishes consuming.processor – SLS consumer processor (SPL) used to pre-filter data before ingest.
extra_options – Additional connector options forwarded through to the underlying SLS connector. This is for options not exposed as named parameters of
read_sls, e.g.,regionId,signVersion,shardModDivisor,shardModRemainder. If a key matches an option generated by a named parameter of read_sls, the value inextra_optionstakes precedence."connector"is reserved and must not be supplied.
- Returns:
A DataFrame backed by the SLS source.
- Raises:
ValueError – If
schemais None,startup_modeis invalid,batch_get_sizeis out of range,start_timeis set with a non-timestampstartup_mode,consumer_groupis missing whenstartup_mode='consumer_group',start_time/stop_timeis an awaredatetime(withtzinfo), orextra_optionscontains aconnectorkey.
Examples
Minimal source (timestamp mode, defaults):
import pyflink.dataframe as pf df = pf.read_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, schema={ "__timestamp__": pf.DataType.bigint(), "level": pf.DataType.string(), "message": pf.DataType.string(), } )
Consumer-group mode (resume from recorded offset):
df = pf.read_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, schema={"level": pf.DataType.string()}, startup_mode="consumer_group", consumer_group="my-analytics-job" )
Historical replay with
datetimebounds:from datetime import datetime df = pf.read_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, schema={"message": pf.DataType.string()}, start_time=datetime(2026, 1, 15, 0, 0, 0), stop_time=datetime(2026, 1, 16, 0, 0, 0), time_zone="Asia/Shanghai", exit_after_finish=True )
Forward-compat: passing a non-named option (and overriding a named one) via
extra_options:df = pf.read_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, schema={"message": pf.DataType.string()}, extra_options={ "regionId": "cn-hangzhou", "compressType": "lz4", "batchGetSize": "256", # overrides default 100 } )