diff --git a/docs/external-stream.md b/docs/external-stream.md index 7b9fea9c..ac2881aa 100644 --- a/docs/external-stream.md +++ b/docs/external-stream.md @@ -4,10 +4,11 @@ You can create **External Streams** in Timeplus to query data in the external sy You can run streaming analytics with the external streams in the similar way as other streams. -Timeplus supports 5 types of external streams: +Timeplus supports 6 types of external streams: * [Kafka External Stream](/kafka-source) * [Pulsar External Stream](/pulsar-source) * [NATS JetStream External Stream](/nats-jetstream-source) +* [Python External Stream Source](/python-external-stream-source) and [Sink](/python-external-stream-sink), only available in Timeplus Enterprise * [Timeplus External Stream](/timeplus-source), only available in Timeplus Enterprise * [Log External Stream](/log-stream) (experimental) diff --git a/docs/python-external-stream-sink.mdx b/docs/python-external-stream-sink.mdx new file mode 100644 index 00000000..d529e7d7 --- /dev/null +++ b/docs/python-external-stream-sink.mdx @@ -0,0 +1,10 @@ +--- +id: python-external-stream-sink +title: Python Sink +--- + +import ExternalPythonBasics from './shared/python-external-stream.md'; +import ExternalPythonWrite from './shared/python-external-stream-write.md'; + + + diff --git a/docs/python-external-stream-source.mdx b/docs/python-external-stream-source.mdx new file mode 100644 index 00000000..d21489a4 --- /dev/null +++ b/docs/python-external-stream-source.mdx @@ -0,0 +1,10 @@ +--- +id: python-external-stream-source +title: Python Source +--- + +import ExternalPythonBasics from './shared/python-external-stream.md'; +import ExternalPythonRead from './shared/python-external-stream-read.md'; + + + diff --git a/docs/shared/python-external-stream-read.md b/docs/shared/python-external-stream-read.md new file mode 100644 index 00000000..caa0454e --- /dev/null +++ b/docs/shared/python-external-stream-read.md @@ -0,0 +1,112 @@ +## Read Data from a Python External Stream + +The read function is the entry point Timeplus calls to pull rows from your Python code. It is **synchronous** (no `async`/`await`) and receives **no implicit arguments** — any configuration must arrive through the init function. Each value it produces is a row whose columns match the stream's schema in declared order; for a single-column stream you may yield bare scalars. + +### Streaming source (generator) + +Yield a row or a batch of rows at a time. The query stays alive as long as the generator does, which makes generators the right shape for clocks, polling loops, websocket feeds, message-bus consumers, and other long-lived sources. + +```sql +CREATE EXTERNAL STREAM py_clock (tick uint32, label string) +AS $$ +import time + +def py_clock(): + n = 0 + while True: + yield (n, "tick") + n += 1 + time.sleep(1) +$$ +SETTINGS + type = 'python', + mode = 'streaming'; +``` + +`read_function_name` is omitted, so it defaults to the stream name `py_clock`. Setting `mode = 'streaming'` makes the engine reject a non-generator return value, which catches mistakes like returning a list early. + +### Batch source (list) + +Return a list of rows once. Use this shape for one-shot pulls — REST snapshots, file scans, or any source where a single call yields the full result. + +```sql +CREATE EXTERNAL STREAM py_users (id int32, name string) +AS $$ +import json +import urllib.request + +def py_users(): + with urllib.request.urlopen("https://api.example.com/users") as r: + payload = json.load(r) + return [(u["id"], u["name"]) for u in payload] +$$ +SETTINGS + type = 'python', + mode = 'batch'; +``` + +### Long-lived setup with init / deinit + +Open a client once, stash it on `builtins`, and tear it down at the end of the query. Init parameters arrive as a single string, so JSON is convenient when you have more than one value to pass. + +```sql +CREATE EXTERNAL STREAM py_cookie_counter +( + previous_cleanup_count int32, + secret_flavor string +) +AS $$ +import builtins, json + +def open_bakery(config): + builtins._tp_cookie_secret_flavor = json.loads(config)["flavor"] + +def close_bakery(): + if hasattr(builtins, "_tp_cookie_secret_flavor"): + del builtins._tp_cookie_secret_flavor + +def serve_cookie_report(): + return [(0, getattr(builtins, "_tp_cookie_secret_flavor", ""))] +$$ +SETTINGS + type = 'python', + read_function_name = 'serve_cookie_report', + init_function_name = 'open_bakery', + init_function_parameters = '{"flavor":"double-chocolate"}', + deinit_function_name = 'close_bakery'; +``` + +Remember that init and deinit run **per query**, not once per stream creation — the `builtins` state above is set up and torn down each time a query reads from `py_cookie_counter`. Use stream-specific `builtins` attribute names and delete them in deinit so later Python sessions do not see stale state. + +### Calling back to `timeplusd` + +The injected `__timeplus_local_api_user` and `__timeplus_local_api_password` globals let the read function authenticate to the same server without hard-coded credentials. The example below queries an internal stream over the REST interface and turns the result into a row. + +```sql +CREATE EXTERNAL STREAM py_user_count (total int64) +AS $$ +import base64, urllib.request + +def py_user_count(): + creds = base64.b64encode( + f"{__timeplus_local_api_user}:{__timeplus_local_api_password}".encode() + ).decode() + req = urllib.request.Request( + "http://localhost:8123/?query=SELECT+count()+FROM+table(users)", + headers={"Authorization": f"Basic {creds}"}, + ) + with urllib.request.urlopen(req) as r: + return [(int(r.read().strip()),)] +$$ +SETTINGS + type = 'python', + mode = 'batch'; +``` + +Treat `__timeplus_local_api_password` as a secret — do not log it, do not echo it back into output rows, and do not pass it into subprocesses. + +### Cancellation and errors + +When a query is cancelled (for example by `KILL QUERY` or by closing the client), the running Python code receives a `KeyboardInterrupt`. Streaming generators stop at the next yield point; long-blocking calls inside C extensions may delay the interrupt until they return. + +If the read function raises, the query fails and the Python traceback is included in the error response — wrap recoverable errors inside the function and decide explicitly whether to re-raise or continue. diff --git a/docs/shared/python-external-stream-write.md b/docs/shared/python-external-stream-write.md new file mode 100644 index 00000000..8030d887 --- /dev/null +++ b/docs/shared/python-external-stream-write.md @@ -0,0 +1,87 @@ +## Write Data to a Python External Stream + +The write function is invoked once per chunk, not once per row. Its arguments are **column-oriented**: one Python list per output column, in declared order, all of equal length. Iterate with `zip` to recover row tuples. + +Column values follow the same Python type mapping as [Python UDF](/py-udf#data-type-mapping). One detail worth highlighting for sinks: a `string` (or `fixed_string`) column arrives as Python `bytes`, not `str`. Decode with `.decode()` (UTF-8) before passing values into APIs that require text. + +### Sink basics + +```sql +CREATE EXTERNAL STREAM py_metric_sink (host string, value float32) +AS $$ +def py_metric_sink(host, value): + for h, v in zip(host, value): + print(f"{h.decode()}={v}") +$$ +SETTINGS type = 'python'; +``` + +Insert a few rows: + +```sql +INSERT INTO py_metric_sink (host, value) VALUES ('a', 1.0), ('b', 2.0); +``` + +Behind the scenes Timeplus calls `py_metric_sink([b'a', b'b'], [1.0, 2.0])` — one call carrying both rows, with the `string` column delivered as `bytes`. A larger INSERT or a downstream query that delivers many chunks results in one call per chunk. + +If `write_function_name` is omitted Timeplus uses `read_function_name` (which itself defaults to the stream name), so the Python function above only needs to be named once. + +### Materialized view → external stream + +Routing a continuous query into a sink is the most common production pattern. Define the sink once, then point a materialized view at it: + +```sql +CREATE EXTERNAL STREAM py_alert_sink (host string, value float32) +AS $$ +def py_alert_sink(host, value): + for h, v in zip(host, value): + notify(h.decode(), v) # your notifier +$$ +SETTINGS type = 'python'; + +CREATE MATERIALIZED VIEW high_value_alerts INTO py_alert_sink AS + SELECT host, value FROM metrics WHERE value > 100; +``` + +The materialized view feeds chunks into the sink as they are produced; each chunk becomes one call to `py_alert_sink`. + +### Custom protocol example: webhook POST + +Load the destination URL in init, reuse that configuration for every chunk, and clear it in deinit. Init parameters carry the URL so the Python body is reusable across environments. To pool an actual HTTP connection, swap `urllib` for a session-aware client (for example `requests.Session()`) and stash the session itself on `builtins`. + +```sql +CREATE EXTERNAL STREAM py_webhook (event_id string, body string) +AS $$ +import builtins, json, urllib.request + +def open_client(config): + builtins._tp_webhook = json.loads(config)["url"] + +def close_client(): + if hasattr(builtins, "_tp_webhook"): + del builtins._tp_webhook + +def post_event(event_id, body): + for eid, b in zip(event_id, body): + payload = {"id": eid.decode(), "body": b.decode()} + req = urllib.request.Request( + builtins._tp_webhook, + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, + method="POST", + ) + urllib.request.urlopen(req).read() +$$ +SETTINGS + type = 'python', + init_function_name = 'open_client', + init_function_parameters = '{"url":"https://hooks.example.com/notify"}', + deinit_function_name = 'close_client', + write_function_name = 'post_event'; +``` + +Replace `urllib` with any HTTP, S3, queue, or proprietary client your environment ships with. Manage Python dependencies through the [Python UDF](/py-udf) library configuration — the same runtime backs both features. + +### Failure behavior + +If the write function raises, the INSERT fails and the Python traceback is included in the error response. Side effects already performed by your Python code (HTTP requests sent, files written, queue messages published) are **not** rolled back by Timeplus — design idempotent writes, or batch your side effect inside a single transactional call your downstream system controls. diff --git a/docs/shared/python-external-stream.md b/docs/shared/python-external-stream.md new file mode 100644 index 00000000..e67a3a33 --- /dev/null +++ b/docs/shared/python-external-stream.md @@ -0,0 +1,75 @@ +## Overview + +Python External Stream lets you read from and write to arbitrary sources by embedding a Python body directly in the DDL. It is available in **Timeplus Enterprise 3.2.2+**. + +Unlike the Kafka, Pulsar, and NATS JetStream external streams — which speak a specific wire protocol — a Python External Stream is a generic escape hatch: you bring the protocol, the client library, and the logic. Timeplus calls your functions inside the embedded CPython runtime. When reading, return values become row batches; when writing, the sink function receives column batches. The same DDL object can serve as both a source (via `read_function_name`) and a sink (via `write_function_name`). + +## Create a Python External Stream + +```sql +CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name ( ) +AS $$ +def read_fn(): + ... + +def write_fn(col1, ...): + ... + +def init_fn(config): # optional + ... + +def deinit_fn(): # optional + ... +$$ +SETTINGS + type = 'python', -- required + read_function_name = '..', -- defaults to the stream name + write_function_name = '..', -- defaults to read_function_name + init_function_name = '..', + init_function_parameters = '..', -- requires init_function_name + deinit_function_name = '..', + mode = 'auto' -- 'auto' (default), 'streaming', or 'batch' +``` + +### Settings + +* **type**: must be `'python'`. Required. +* **read_function_name**: name of the Python function used when the stream is read from. Defaults to the stream name. +* **write_function_name**: name of the Python function used when the stream is written to (sink). Defaults to `read_function_name`. +* **init_function_name**: name of a Python function called once before read/write processing begins. Use it to open connections, warm caches, or prepare state for the entry function to consume. +* **init_function_parameters**: a single string passed as the only argument to the init function. Any format works (JSON, `key=value`, or a plain string) — parsing is up to your Python code. Requires `init_function_name`; otherwise the stream fails to create with `Setting 'init_function_parameters' requires 'init_function_name' to be configured`. +* **deinit_function_name**: name of a Python function called once after read/write processing completes, for cleanup. +* **mode**: Python execution mode — `'auto'` (default), `'streaming'`, or `'batch'`. See [Modes](#modes). + +## Modes + +The `mode` setting controls how Timeplus interprets your read function's return value: + +* **`auto`** (default) — Timeplus inspects the return value at runtime. A generator drives a streaming read; a list is consumed as a single batch. +* **`streaming`** — the read function must return a generator. Timeplus pulls rows as they are yielded and the query stays alive until the generator stops. Returning a list from a streaming-mode function fails the query. +* **`batch`** — the read function must return a list. Timeplus consumes the list once and the query finishes. Returning a generator from a batch-mode function fails the query. + +Set `mode` explicitly when you want the engine to enforce the expected shape. Leave it as `auto` when you want flexibility. + +## Lifecycle + +Each query that reads from or writes to a Python External Stream creates its own Python module. The lifecycle for one query is: + +1. The DDL body is compiled into a fresh module. +2. Local API credential globals are injected into the module (see [Local API credentials](#local-api-credentials)). +3. If `init_function_name` is set, the init function is called once. When `init_function_parameters` is non-empty, it is passed as the only argument; otherwise init receives no arguments. +4. The read or write entry function is called as data flows. +5. When the query ends — normally or via cancellation — `deinit_function_name`, if set, is called. + +Each query gets its own module, so ordinary module globals created by the DDL body are not reused across queries. If you stash state on Python's `builtins` module, use a stream-specific attribute name and remove it in deinit; `builtins` is shared by the embedded interpreter, so leftover attributes can be visible to later Python sessions in the same server process. Treat clients or caches opened in init as per-query resources and close them in deinit. + +If the init function raises, the query fails before any read or write happens, and `deinit_function_name` is **not** called. If init opens more than one external resource, clean up already-opened resources before re-raising. + +## Local API credentials {#local-api-credentials} + +When the local API user is enabled on the server, Timeplus injects two module-level globals into every Python External Stream module so your code can authenticate back to the same `timeplusd` over the native TCP protocol or the REST HTTP interface without hard-coding credentials: + +* `__timeplus_local_api_user` — the ephemeral local API username. +* `__timeplus_local_api_password` — the matching token. Treat this as a secret; do not log it. + +Both globals are available as bare names inside the Python body — no `os.environ` lookup needed. They are regenerated on every server restart and never written to disk. diff --git a/docs/sql-create-external-stream.md b/docs/sql-create-external-stream.md index 43207cd8..6bc5948d 100644 --- a/docs/sql-create-external-stream.md +++ b/docs/sql-create-external-stream.md @@ -55,6 +55,28 @@ Please check the [Pulsar External Stream](/pulsar-source) for more details. Please check the [NATS JetStream External Stream](/nats-jetstream-source) for more details. +## Python External Stream + +```sql +CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name ( ) +AS $$ +def read_fn(): + ... +$$ +SETTINGS + type = 'python', -- required + read_function_name = '..', + write_function_name = '..', + init_function_name = '..', + init_function_parameters = '..', + deinit_function_name = '..', + mode = 'auto' -- 'auto' (default), 'streaming', or 'batch' +``` + +Available in **Timeplus Enterprise 3.2.2+**. + +Please check the [Python External Stream Source](/python-external-stream-source) for read-side settings, generator/batch sources, and lifecycle hooks, and the [Python External Stream Sink](/python-external-stream-sink) for write-side semantics, materialized-view sinks, and custom-protocol examples. + ## Timeplus External Stream ```sql CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name ( ) diff --git a/docusaurus.config.js b/docusaurus.config.js index 27f8428c..7ecef858 100644 --- a/docusaurus.config.js +++ b/docusaurus.config.js @@ -113,6 +113,10 @@ const config = { from: '/tiered-storage', to: '/append-stream-tiered-storage', }, + { + from: '/python-external-stream', + to: '/python-external-stream-source', + }, ], }, ], diff --git a/sidebars.js b/sidebars.js index 21b9f455..3115b440 100644 --- a/sidebars.js +++ b/sidebars.js @@ -165,6 +165,11 @@ const sidebars = { label: "PostgreSQL", id: "pg-external-table", }, + { + type: "doc", + label: "Python", + id: "python-external-stream-source", + }, { type: "doc", label: "S3", @@ -545,6 +550,11 @@ const sidebars = { label: "NATS JetStream", id: "nats-jetstream-sink", }, + { + type: "doc", + label: "Python", + id: "python-external-stream-sink", + }, { type: "doc", label: "S3", diff --git a/tools/spellchecker/dic.txt b/tools/spellchecker/dic.txt index c651dd75..8fc752c5 100644 --- a/tools/spellchecker/dic.txt +++ b/tools/spellchecker/dic.txt @@ -886,6 +886,8 @@ DefaultTableEngine defaultValue defaultValueOfArgumentType defaultValueOfTypeName +deinit +deinit_function_name DelayedInserts DELETEs delim @@ -1316,6 +1318,7 @@ globalNotIn GlobalThread GlobalThreadActive globalVariable +globals globbing glushkovds gofakeit @@ -1526,6 +1529,8 @@ infty Ingestions ingressClassName init +init_function_name +init_function_parameters init_sql initcap initcapUTF