Transport Streaming Settings
This document describes the structured streaming settings that control buffering, flow control, ordering, and replay for real-time transports.
Who this is for
- Connector/transport authors configuring runtime behavior.
- Operators tuning backpressure and replay limits.
What you'll get
- The full transport settings schema.
- The merge/override rules for settings.
- Guidance for safe defaults in production.
At a glance
- Structured settings live under
streamingonTransportandStory.spec.transports[]. - Settings are merged with transport-level defaults first, then story-level overrides.
- The hub enforces buffering, flow control, ordering/replay, routing, fan-in, partitioning, and recording.
Merge and override rules
Structured settings are merged in this order (later entries override earlier ones):
Transport.spec.streaming(cluster-wide defaults)Story.spec.transports[].streaming(per-story overrides)
Provider-specific settings can still be supplied via:
Transport.spec.defaultSettings(raw JSON)Story.spec.transports[].settings(raw JSON)
Structured streaming fields overlay the raw JSON settings at the same level.
All duration strings use Go duration format (for example 250ms, 1s, 5m).
Validation
- Structured
streamingfields are validated by admission webhooks (enums, ranges, required fields). - Raw JSON settings are treated as provider-owned and are not validated by the operator.
Defaults and guardrails
- If
flowControlis omitted, the system defaults tocreditswithackEvery.messages=10,ackEvery.maxDelay=250ms,pauseThreshold=90, andresumeThreshold=50. Setmode: noneto disable. - If
routing.maxDownstreamsis omitted, it defaults to32. - If
lanesis omitted on the Transport, defaults are injected foraudio,video,binary, andpayloadwithmaxMessages=100andmaxBytes=1MiB. If lanes are provided, missing per-lane caps are defaulted, but lanes are not auto-added. - If buffer limits are omitted or set to
0, the hub falls back toBUBU_HUB_BUFFER_MAX_MESSAGES(default 100),BUBU_HUB_BUFFER_MAX_BYTES(default 10MiB), anddrop_newest. - Operator guardrails can cap active streams, buffer count, and downstream fan-out regardless of per-transport settings.
Example: Transport defaults
apiVersion: transport.bubustack.io/v1alpha1
kind: Transport
metadata:
name: livekit
spec:
provider: livekit
driver: livekit
supportedAudio:
- name: opus
streaming:
backpressure:
buffer:
maxMessages: 500
maxBytes: 10485760
maxAgeSeconds: 10
dropPolicy: drop_oldest
flowControl:
mode: credits
initialCredits:
messages: 100
bytes: 1048576
ackEvery:
messages: 10
maxDelay: 250ms
pauseThreshold:
bufferPct: 90
resumeThreshold:
bufferPct: 50
delivery:
ordering: per_stream
semantics: at_least_once
replay:
mode: memory
retentionSeconds: 30
checkpointInterval: 1s
routing:
mode: auto
fanOut: sequential
maxDownstreams: 8
rules:
- name: route-english
when: "{{ packet.metadata.lang == \"en\" }}"
action: allow
target:
steps: ["translate"]
- name: drop-noisy
when: "{{ packet.metadata.noisy == true }}"
action: deny
target:
steps: ["transcribe"]
lanes:
- name: audio
kind: audio
direction: bidirectional
maxMessages: 250
maxBytes: 1048576
- name: payload
kind: payload
direction: bidirectional
maxMessages: 500
maxBytes: 4194304
fanIn:
mode: all
timeoutSeconds: 30
maxEntries: 1000
partitioning:
mode: hash
key: metadata.user_id
partitions: 16
sticky: true
lifecycle:
strategy: drain_cutover
drainTimeoutSeconds: 15
maxInFlight: 200
observability:
metrics:
enabled: true
tracing:
enabled: true
sampleRate: 5
samplePolicy: rate
watermark:
enabled: true
timestampSource: metadata.event_time_ms
recording:
mode: metadata
sampleRate: 1
retentionSeconds: 300
redactFields:
- payload.api_key
Example: Story override
apiVersion: bubustack.io/v1alpha1
kind: Story
metadata:
name: realtime-demo
spec:
transports:
- name: audio
transportRef: livekit
streaming:
backpressure:
buffer:
maxMessages: 1000
dropPolicy: drop_newest
delivery:
semantics: best_effort
Settings reference
streaming.backpressure.buffer
maxMessages(int, >= 0): cap the number of buffered messages.maxBytes(int, >= 0): cap total buffered bytes.maxAgeSeconds(int, >= 0): max time a message can stay buffered.dropPolicy(drop_newest|drop_oldest): which messages to drop on overflow.
streaming.flowControl
mode(none|credits|window): flow control strategy.initialCredits.messages/initialCredits.bytes: starting credits.ackEvery.messages/ackEvery.bytes: ack/credit interval thresholds.ackEvery.maxDelay(duration string): max time between ack/credit updates.pauseThreshold.bufferPct(0..100): pause upstream when buffer crosses this percent.resumeThreshold.bufferPct(0..100): resume upstream below this percent.
streaming.delivery
ordering(none|per_stream|per_partition)semantics(best_effort|at_least_once)replay.mode(none|memory|durable)replay.retentionSeconds(int, >= 0)replay.checkpointInterval(duration string)
streaming.routing
mode(auto|hub|p2p): routing topology selection.fanOut(sequential|parallel): dispatch policy when multiple downstream steps consume a stream.maxDownstreams(int, >= 1): guardrail on downstream fan-out per step.rules[].name(string): optional identifier for a routing rule.rules[].when(string): Go template expression evaluated in streaming runtime scope.rules[].action(allow|deny): how the rule affects matching targets.rules[].target.steps(list): downstream step names the rule applies to (empty means all).
streaming.lanes
lanes[].name(string): unique lane identifier (e.g.,audio,payload).lanes[].kind(audio|video|binary|payload|control): lane media type.lanes[].direction(publish|subscribe|bidirectional).lanes[].maxMessages(int, >= 0): per-lane buffered message cap (per downstream).lanes[].maxBytes(int, >= 0): per-lane buffered byte cap (per downstream).
streaming.fanIn
mode(all|any|quorum): join behavior when multiple upstream steps feed a step.quorum(int, >= 1): minimum number of upstreams whenmode=quorum.timeoutSeconds(int, >= 0): max time to wait before abandoning a fan-in join.maxEntries(int, >= 1): cap concurrent fan-in joins held in memory.
streaming.partitioning
mode(none|preserve|hash)key(string): value used for hashing (e.g.,metadata.user_id,payload.session,inputs.tenant_id).partitions(int, >= 1): number of partitions for hash mode.sticky(bool): keep existing partition assignment if present.
streaming.lifecycle
strategy(rolling|drain_cutover|blue_green)drainTimeoutSeconds(int, >= 0)maxInFlight(int, >= 0): cap in-flight messages during upgrades/reconnects.
streaming.observability
metrics.enabled(bool): enable per-stream metrics.tracing.enabled(bool)tracing.sampleRate(0..100)tracing.samplePolicy(string):rate,always, ornever.watermark.enabled(bool)watermark.timestampSource(string): e.g.metadata.event_time_ms,metadata.bubu.envelope.timestamp_ms,audio.timestamp_ms.
streaming.recording
mode(off|metadata|payload)sampleRate(0..100)retentionSeconds(int, >= 0)redactFields(list of JSON paths relative to the packet, e.g.,payload.api_key,metadata.authorization)
Defaults and behavior
at_least_oncesemantics implyper_streamordering if not explicitly set.- Replay modes automatically enable at-least-once semantics.
- If
replay.modeis set andcheckpointIntervalis omitted, the hub uses a default checkpoint interval. - Buffer thresholds are interpreted as utilization percentages (0..100).
routing.fanOut=paralleldispatches downstream engram deliveries concurrently.routing.mode=p2pis honored only when the Story topology supports direct connections; otherwise the hub is used.- Routing rules are evaluated per downstream step; deny rules override allow rules.
- If any
allowrules exist, routing becomes an allowlist (only allowed targets route). lifecycle.drainTimeoutSecondsis enforced whenstrategyisdrain_cutoverorblue_green(and also when a drain timeout is set without an explicit strategy).- When drain mode is enabled, the hub issues
flowControl.pausewhile buffering for reconnect/cutover and resumes once buffers drain. partitioning.mode=hashuses the configured key to selectpartitionin the stream envelope.recordingwrites JSON payloads to the configured storage backend; retention enforcement is backend-dependent.- Lane caps apply per downstream stream buffer and are enforced before global buffer caps.
- Fan-in joins default to
all(wait for every upstream listed instep.needs) unless overridden.