Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • Input/Output
  • pyflink.dataframe.io.read_sls

pyflink.dataframe.io.read_sls#

read_sls(endpoint: str, *, project: str, logstore: str, access_id: str | None = None, access_key: str | None = None, schema: Dict[str, DataType] | None = None, columns: List[str] | None = None, enable_new_source: bool = True, shard_discovery_interval_ms: int = 60000, startup_mode: Literal['timestamp', 'latest', 'earliest', 'consumer_group'] = 'timestamp', start_time: str | datetime | None = None, stop_time: str | datetime | None = None, time_zone: str | None = None, consumer_group: str | None = None, max_retries: int = 3, batch_get_size: int = 100, exit_after_finish: bool = False, processor: str | None = None, extra_options: Dict[str, str] | None = None) → DataFrame[source]#

Read an SLS (Aliyun Log Service) logstore into a DataFrame.

Wraps the Ververica SLS source connector. The connector options are exposed with Pythonic snake_case names, in the same order as the SLS documentation’s “WITH parameters” table.

Parameters:
  • endpoint – SLS private-network service endpoint address.

  • project – Name of the SLS project.

  • logstore – SLS LogStore (or MetricStore) name.

  • access_id – Alibaba Cloud account AccessKey ID. If both access_id and access_key are None (the default), the SLS connector falls back to the VVP STS token.

  • access_key – Alibaba Cloud account AccessKey Secret. See access_id for the STS fallback behaviour.

  • schema – Dict of {column_name: DataType} specifying the source’s schema. Required.

  • columns – Optional list of column names to read. If None, all columns from schema are read.

  • enable_new_source – When True (the default), use the FLIP-27 SLS source that auto-adapts to shard changes. When False, use the legacy source implementation.

  • shard_discovery_interval_ms – Polling interval (milliseconds) for shard-change discovery, applied when enable_new_source is True. A non-positive value disables shard discovery.

  • startup_mode –

    Where the source begins consuming.

    • "timestamp" (default): start from start_time; if unset, the SLS connector uses the current time.

    • "latest": skip to the most recent log group.

    • "earliest": start from the oldest available log group.

    • "consumer_group": resume from the offset recorded for consumer_group (which is required in this mode).

  • start_time – Consumption start time, only valid when startup_mode='timestamp'. Accepts a naive datetime (formatted as "yyyy-MM-dd HH:mm:ss") or a string in that format. datetime with tzinfo is rejected; use the time_zone parameter to specify the timezone explicitly.

  • stop_time – Consumption end time. Use for historical replay only. Accepts the same forms as start_time.

  • time_zone – Timezone in which start_time / stop_time strings are interpreted by the SLS connector (e.g. "Asia/Shanghai", "UTC"). When unset, the SLS connector falls back to the JobManager JVM’s default timezone.

  • consumer_group – Consumer-group name used to record consumption progress. Required when startup_mode='consumer_group'. In other startup modes, consumer_group is still forwarded to the connector and used only for commit tracking — it does NOT change where consumption starts.

  • max_retries – Number of retries after a failed SLS read.

  • batch_get_size – Number of LogGroups fetched per request. Must be in [1, 1000]; exceeding 1000 errors out at the Java side.

  • exit_after_finish – When True, the Flink job exits after the source finishes consuming.

  • processor – SLS consumer processor (SPL) used to pre-filter data before ingest.

  • extra_options – Additional connector options forwarded through to the underlying SLS connector. This is for options not exposed as named parameters of read_sls, e.g., regionId, signVersion, shardModDivisor, shardModRemainder. If a key matches an option generated by a named parameter of read_sls, the value in extra_options takes precedence. "connector" is reserved and must not be supplied.

Returns:

A DataFrame backed by the SLS source.

Raises:

ValueError – If schema is None, startup_mode is invalid, batch_get_size is out of range, start_time is set with a non-timestamp startup_mode, consumer_group is missing when startup_mode='consumer_group', start_time / stop_time is an aware datetime (with tzinfo), or extra_options contains a connector key.

Examples

Minimal source (timestamp mode, defaults):

import pyflink.dataframe as pf
df = pf.read_sls(
    "cn-hangzhou-intranet.log.aliyuncs.com",
    project="my-project",
    logstore="my-logstore",
    access_id=ALIYUN_AK_ID,
    access_key=ALIYUN_AK_SECRET,
    schema={
        "__timestamp__": pf.DataType.bigint(),
        "level": pf.DataType.string(),
        "message": pf.DataType.string(),
    }
)

Consumer-group mode (resume from recorded offset):

df = pf.read_sls(
    "cn-hangzhou-intranet.log.aliyuncs.com",
    project="my-project",
    logstore="my-logstore",
    access_id=ALIYUN_AK_ID,
    access_key=ALIYUN_AK_SECRET,
    schema={"level": pf.DataType.string()},
    startup_mode="consumer_group",
    consumer_group="my-analytics-job"
)

Historical replay with datetime bounds:

from datetime import datetime
df = pf.read_sls(
    "cn-hangzhou-intranet.log.aliyuncs.com",
    project="my-project",
    logstore="my-logstore",
    access_id=ALIYUN_AK_ID,
    access_key=ALIYUN_AK_SECRET,
    schema={"message": pf.DataType.string()},
    start_time=datetime(2026, 1, 15, 0, 0, 0),
    stop_time=datetime(2026, 1, 16, 0, 0, 0),
    time_zone="Asia/Shanghai",
    exit_after_finish=True
)

Forward-compat: passing a non-named option (and overriding a named one) via extra_options:

df = pf.read_sls(
    "cn-hangzhou-intranet.log.aliyuncs.com",
    project="my-project",
    logstore="my-logstore",
    access_id=ALIYUN_AK_ID,
    access_key=ALIYUN_AK_SECRET,
    schema={"message": pf.DataType.string()},
    extra_options={
        "regionId": "cn-hangzhou",
        "compressType": "lz4",
        "batchGetSize": "256",  # overrides default 100
    }
)

previous

pyflink.dataframe.io.read_paimon

next

pyflink.dataframe.io.read_hologres

On this page
  • read_sls()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.