Copy, adapt, and run the reference scripts that live alongside this documentation.
Provision a pipeline, await readiness, and emit ingest plus signed manifests.
"""Example script that provisions a Stegawave pipeline and
prints ingest + playback details.
"""
import sys
from typing import Iterable
from stegawave import StegawaveClient, models
def build_pipeline_request() -> models.CreatePipelineRequest:
"""Return a sample pipeline configuration mirroring the UI defaults."""
return models.CreatePipelineRequest(
name="launch-stream",
description="Product launch livestream",
segmentDuration=4,
autoStart=True,
input=models.InputConfig(
Type="SRT_LISTENER",
whitelist=["0.0.0.0/0"],
SrtListenerSettings=models.SrtListenerSettings(
IngestPort=9000,
MinLatency=2000,
PassphraseEnabled=True,
),
),
encoder=models.EncoderConfig(
vodArchive=False,
Outputs=[
models.OutputConfig(
OutputName="cmaf-1080p",
resolution="1920x1080",
FramerateNumerator=30,
FramerateDenominator=1,
VideoBitrate=7_500_000,
AudioBitrate=128_000,
SampleRate=48_000,
),
models.OutputConfig(
OutputName="cmaf-720p",
resolution="1280x720",
FramerateNumerator=30,
FramerateDenominator=1,
VideoBitrate=5_200_000,
AudioBitrate=96_000,
SampleRate=48_000,
),
],
),
packager=models.PackagerConfig(
originEndpoints=[
models.OriginEndpoint(
name="cmaf-hybrid",
ContainerType="CMAF",
description="Public CMAF endpoint",
HlsManifests=[models.HlsManifest(ManifestName="index")],
DashManifests=[models.DashManifest(ManifestName="index")],
StartoverWindowSeconds=86_400,
),
models.OriginEndpoint(
name="cmaf-live",
ContainerType="TS",
description="Live TS endpoint",
HlsManifests=[models.HlsManifest(ManifestName="index")],
StartoverWindowSeconds=86_400,
)
]
),
)
def main():
with StegawaveClient() as client:
request = build_pipeline_request()
response = client.create_pipeline(request)
if not response.valid():
print("Pipeline creation request was rejected", file=sys.stderr)
return 1
event_id = response.eventID
print(f"Pipeline provisioning started. eventID={event_id}")
client.wait_for_event(event_id, poll_interval=5.0)
session = client.get_pipeline_session(event_id)
status = session.status
input_details = session.get_input()
manifests = session.get_manifests()
manifest_urls: Iterable[str] = session.signed_manifest_uris("john_doe")
print(f"Session details for eventID={event_id}:")
print(f" Status: {status.status}")
print(f" Created: {status.createdAt}")
print(f" Last updated: {status.lastUpdated}")
print(f" Input protocol: {input_details.protocol}")
print(f" Input URIs: {', '.join(input_details.uris) if input_details.uris else 'Unavailable'}")
if manifests:
print(" Manifests:")
for manifest in manifests:
print(f" - {manifest.uri}")
else:
print(" Manifests: Unavailable")
print("\nInput endpoint:")
print(f" {input_details.uri or 'Unavailable'}")
print("\nPlayback manifests:")
if manifest_urls:
for url in manifest_urls:
print(f" {url}")
else:
print(" No CDN endpoints reported yet")
return 0
if __name__ == "__main__":
raise SystemExit(main())
"""Example script that demonstrates the /get-pipeline endpoint.
Run with no arguments to list every event:
python get_pipelines.py
Pass an event ID to fetch a single event:
python get_pipelines.py example-event-id
"""
import sys
from stegawave import StegawaveClient
def main():
event_id = sys.argv[1] if len(sys.argv) > 1 else None
with StegawaveClient() as client:
if not event_id:
# No CLI argument means hit /get-pipeline without an eventID query string to list everything.
listing = client.list_pipelines()
pipelines = listing.pipelines or []
if not pipelines:
print("No events returned from /get-pipeline.")
return 0
print(f"Retrieved {len(pipelines)} event(s) from /get-pipeline:")
for pipeline in pipelines:
descriptor = pipeline.eventID or pipeline.name or "unknown"
status = pipeline.status or "unknown"
created = pipeline.createdAt or "unknown"
print(f"- {descriptor} (status={status}, createdAt={created})")
return 0
# When an event ID is provided we include it as the query string to narrow the response.
status = client.get_pipeline(event_id)
print(f"Details for eventID={status.eventID}:")
print(f" Status: {status.status}")
if status.input and status.input.endpoints:
print(f" Input endpoints: {', '.join(status.input.endpoints)}")
if status.input and status.input.passphrase:
print(f" Passphrase: {status.input.passphrase}")
if status.cdn and status.cdn.endpoints:
print(" CDN endpoints:")
for uri in status.cdn.endpoints:
print(f" - {uri}")
if status.detected_users:
print(f" Detected users: {status.detected_users}")
if status.lastUpdated:
print(f" Last updated: {status.lastUpdated}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
"""Example script that sends a start command for an existing Stegawave pipeline."""
import sys
from stegawave import StegawaveClient
def main():
if len(sys.argv) < 2:
print("Usage: python start_pipeline.py ", file=sys.stderr)
return 1
event_id = sys.argv[1]
WAIT_READY = True
TIMEOUT = 60.0
POLL_INTERVAL = 5.0
with StegawaveClient() as client:
response = client.start_pipeline(event_id)
print(f"Start requested for eventID={response.eventID} (state={response.state or 'unknown'})")
if WAIT_READY:
status = client.wait_for_event(
event_id,
timeout=TIMEOUT,
poll_interval=POLL_INTERVAL,
)
print("Pipeline is ready:")
print(f" Status: {status.status}")
if status.input and status.input.endpoints:
print(f" Input endpoints: {', '.join(status.input.endpoints)}")
if status.cdn and status.cdn.endpoints:
print(" CDN endpoints:")
for uri in status.cdn.endpoints:
print(f" - {uri}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
"""Example script that deletes a Stegawave pipeline by eventID."""
import sys
from stegawave import StegawaveClient
def main():
if len(sys.argv) < 2:
print("Usage: python delete_pipeline.py ", file=sys.stderr)
return 1
event_id = sys.argv[1]
with StegawaveClient() as client:
response = client.delete_pipeline(event_id)
print(f"Delete requested for eventID={response.eventID} (status={response.status})")
if response.note:
print(f"Note: {response.note}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
"""Example script that fetches viewer playback tokens for one or more user keys."""
import sys
from stegawave import StegawaveClient
def main():
user_keys = ["user1", "user2", "user3"]
with StegawaveClient() as client:
response = client.fetch_token(user_keys, exp_hours=0)
for user_key, token in response.tokens.items():
print(f"{user_key}: {token}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
"""Example script that wires IPTV catalog results into a decode job."""
import sys
from stegawave import StegawaveClient, models
def main():
if len(sys.argv) < 2:
print("Usage: python iptv_decode.py ", file=sys.stderr)
return 1
event_id = sys.argv[1]
with StegawaveClient() as client:
query = models.IptvQueryRequest(
server='http://iptv.example.com',
username='user123',
password='pass123',
channelName='example_channel',
categoryId=None,
format='m3u8',
preferHD=False,
preferUK=False,
avoidVIP=False,
)
iptv_response = client.iptv_query(query)
if not iptv_response.results:
print("No IPTV streams returned for the supplied parameters", file=sys.stderr)
return 1
first_stream = iptv_response.results[0]
if not first_stream.stream_urls:
print("IPTV result did not include any stream URLs", file=sys.stderr)
return 1
selected_url = first_stream.stream_urls[0]
decode_request = models.DecodeJobRequest(eventID=event_id, input_stream=selected_url)
decode_response = client.submit_decode_job(decode_request)
# Report the end-to-end selection so operators can verify routing.
print(f"Submitted decode job for eventID={decode_response.eventID}")
print(f" IPTV stream: {first_stream.name} (stream_id={first_stream.stream_id})")
print(f" Input URL: {selected_url}")
if decode_response.clientID:
print(f" Client ID: {decode_response.clientID}")
print(f" Message: {decode_response.message}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
"""Example script that submits a decode job for a stream URL.
Usage:
python decode.py
"""
import sys
from stegawave import StegawaveClient, models
def main():
if len(sys.argv) < 3:
print("Usage: python decode.py <event_id> <stream_url>", file=sys.stderr)
return 1
event_id = sys.argv[1]
stream_url = sys.argv[2]
with StegawaveClient() as client:
decode_request = models.DecodeJobRequest(
eventID=event_id,
input_stream=stream_url,
)
decode_response = client.submit_decode_job(decode_request)
print(f"Decode job submitted successfully")
print(f" Event ID: {decode_response.eventID}")
print(f" Stream URL: {stream_url}")
if decode_response.clientID:
print(f" Client ID: {decode_response.clientID}")
print(f" Message: {decode_response.message}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
"""Example script that lists and downloads VOD archive files from a pipeline.
Usage:
# List archive files only
python vod_archive.py <event_id>
# List and download all files
python vod_archive.py <event_id> --download ./archive
"""
import sys
from stegawave import StegawaveClient
def main():
if len(sys.argv) < 2:
print("Usage: python vod_archive.py <event_id> [--download <path>]", file=sys.stderr)
return 1
event_id = sys.argv[1]
download = False
download_path = None
if len(sys.argv) >= 4 and sys.argv[2] == "--download":
download = True
download_path = sys.argv[3]
with StegawaveClient() as client:
if download:
# Download all files to the specified directory
print(f"Downloading VOD archive for eventID={event_id}...")
response = client.get_vod_archive(
event_id=event_id,
download=True,
download_path=download_path
)
print(f"\nDownloaded {response.archive.fileCount} files ({response.archive.totalSize})")
print(f"Location: {download_path}")
else:
# Just list the files with their metadata
response = client.get_vod_archive(event_id=event_id, expires_in=3600)
print(f"VOD Archive for: {response.eventName} (eventID={response.eventID})")
print(f"Total files: {response.archive.fileCount}")
print(f"Total size: {response.archive.totalSize}")
print(f"URLs expire at: {response.archive.expiresAt}")
print(f"\nFiles:")
for file in response.files:
print(f" [{file.index}] {file.filename}")
print(f" Size: {file.size}")
print(f" Modified: {file.lastModified}")
print(f" URL: {file.url[:80]}...")
print()
return 0
if __name__ == "__main__":
raise SystemExit(main())
Models Reference
Complete reference for all Pydantic models available in the SDK. Import from stegawave.models.
Pipeline Creation Models
CreatePipelineRequest
Main request model for creating a new pipeline.
| Field |
Type |
Required |
Description |
name |
str |
✓ |
Pipeline name |
description |
str |
|
Optional description |
segmentDuration |
int |
|
Segment duration in seconds (1-30, default: 4) |
autoStart |
bool |
|
Automatically start pipeline after creation (default: false) |
input |
InputConfig |
✓ |
Input configuration |
encoder |
EncoderConfig |
✓ |
Encoder configuration |
packager |
PackagerConfig |
✓ |
Packager configuration |
InputConfig
Defines the input source type and settings.
| Field |
Type |
Required |
Description |
Type |
InputType |
✓ |
"RTP", "RTP_FEC", "RIST", "ZIXI_PUSH", "SRT_LISTENER", "SRT_CALLER" |
whitelist |
List[str] |
|
CIDR blocks for IP whitelisting (defaults to [0.0.0.0/0]) |
SrtListenerSettings |
SrtListenerSettings |
|
Required for SRT_LISTENER inputs |
SrtCallerSources |
List[SrtCallerSource] |
|
Required for SRT_CALLER inputs |
SrtListenerSettings
Configuration for SRT listener inputs.
| Field |
Type |
Required |
Description |
IngestPort |
int |
✓ |
Port number (1024-65535, excluding 2077/2088) |
MinLatency |
int |
|
Min latency in ms (0-60000, default: 2000) |
MaxLatency |
int |
|
Max latency in ms (0-60000) |
PassphraseEnabled |
bool |
|
Auto-generate 32-char passphrase |
Passphrase |
str |
|
Explicit 32-character passphrase |
SrtCallerSource
Configuration for SRT caller inputs (outbound connections).
| Field |
Type |
Required |
Description |
SrtListenerAddress |
str |
✓ |
Remote SRT listener hostname/IP |
SrtListenerPort |
int |
✓ |
Remote port (1-65535) |
StreamId |
str |
|
Stream ID for routing (max 512 chars) |
SrtCallerDecryption |
SrtCallerDecryption |
|
Decryption settings for encrypted remote |
EncoderConfig
Encoder settings for video/audio transcoding.
| Field |
Type |
Required |
Description |
vodArchive |
bool |
|
Enable VOD archiving to S3 (default: false) |
Outputs |
List[OutputConfig] |
✓ |
List of output renditions |
InputSpecification |
InputSpecification |
|
Input codec/bitrate/resolution constraints |
OutputConfig
Configuration for a single output rendition (ABR tier).
| Field |
Type |
Required |
Description |
resolution |
str |
* |
Resolution as "WIDTHxHEIGHT" (e.g., "1920x1080") |
Width |
int |
* |
Width in pixels (1-7680) |
Height |
int |
* |
Height in pixels (1-4320) |
FramerateNumerator |
int |
✓ |
Framerate numerator (e.g., 30 for 30fps) |
FramerateDenominator |
int |
✓ |
Framerate denominator (e.g., 1 for 30fps) |
VideoBitrate |
int | str |
|
Video bitrate (supports "5M", "3000k") |
AudioBitrate |
int | str |
|
Audio bitrate (default: 128000) |
SampleRate |
int |
|
Audio sample rate in Hz (default: 48000) |
Profile |
H264Profile | H265Profile |
|
Codec profile (e.g., "HIGH", "MAIN") |
AdaptiveQuantization |
AdaptiveQuantization |
|
"AUTO", "OFF", "LOW", "MEDIUM", "HIGH", "HIGHER", "MAX" |
OutputName |
str |
|
Custom name for this output |
* Provide either resolution string or Width/Height, not both.
PackagerConfig
Packager and origin endpoint configuration.
| Field |
Type |
Required |
Description |
originEndpoints |
List[OriginEndpoint] |
✓ |
List of origin endpoints |
OriginEndpoint
Configuration for a packager origin endpoint.
| Field |
Type |
Required |
Description |
name |
str |
✓ |
Endpoint name |
ContainerType |
ContainerType |
|
"CMAF", "TS", "ISM" (default: "CMAF") |
description |
str |
|
Optional description |
HlsManifests |
List[HlsManifest] |
* |
HLS manifest configurations |
DashManifests |
List[DashManifest] |
* |
DASH manifest configurations |
MssManifests |
List[MssManifest] |
* |
MSS manifest configurations |
StartoverWindowSeconds |
int |
|
DVR window in seconds (60-1209600) |
TsUseAudioRenditionGroup |
bool |
|
Allow audio/video rendition mixing (TS only) |
* At least one manifest type must be provided.
HlsManifest
HLS manifest configuration.
| Field |
Type |
Required |
Description |
ManifestName |
str |
|
Manifest filename without extension (default: "index") |
ManifestWindowSeconds |
int |
|
Manifest window duration (30-3600, default: 360) |
ProgramDateTimeIntervalSeconds |
int |
|
PDT tag interval (0-3600) |
ChildManifestName |
str |
|
Child manifest name for master/child setup |
DashManifest
DASH manifest configuration.
| Field |
Type |
Required |
Description |
ManifestName |
str |
|
Manifest filename without extension (default: "index") |
ManifestWindowSeconds |
int |
|
MPD window duration (30-3600, default: 360) |
MinUpdatePeriodSeconds |
int |
|
Min manifest refresh interval (1-120) |
MinBufferTimeSeconds |
int |
|
Min player buffer requirement (1-900) |
SuggestedPresentationDelaySeconds |
int |
|
Recommended playback delay from live edge (1-900) |
Response Models
CreatePipelineResponse
| Field |
Type |
Description |
message |
str |
Response message |
eventID |
str |
Unique pipeline identifier |
status |
str |
Pipeline status ("provisioning") |
Methods: is_success(), valid() - check if creation succeeded
PipelineStatusResponse
| Field |
Type |
Description |
eventID |
str |
Pipeline ID |
name |
str |
Pipeline name |
description |
str |
Pipeline description |
status |
str |
Current status |
createdAt |
datetime |
Creation timestamp |
lastUpdated |
datetime |
Last update timestamp |
input |
PipelineInputStatus |
Input details and endpoints |
encoder |
PipelineEncoderStatus |
Encoder profiles and settings |
packager |
PipelinePackagerStatus |
Packager endpoints |
cdn |
PipelineCdnStatus |
CDN endpoints |
detected_users |
List[str] |
List of detected watermark users |
Methods: is_ready(), is_terminal_failure()
TokenResponse
| Field |
Type |
Description |
tokens |
Dict[str, str] |
User key → JWT token mapping |
StateResponse
| Field |
Type |
Description |
message |
str |
Response message |
eventID |
str |
Pipeline ID |
action |
str |
"status", "start", or "stop" |
state |
str |
Current state (e.g., "RUNNING", "STOPPED", "STARTING") |
DeleteResponse
| Field |
Type |
Description |
message |
str |
Response message |
eventID |
str |
Deleted pipeline ID |
status |
str |
Deletion status |
note |
str |
Optional note |
ResetHistoryResponse
Requirement: Pipeline must be stopped before calling reset_history(). If the pipeline is running, the API returns a 400 error.
| Field |
Type |
Description |
message |
str |
Response message |
eventID |
str |
Pipeline ID |
note |
Optional[str] |
Optional additional information |
VodArchiveResponse
| Field |
Type |
Description |
eventID |
str |
Pipeline ID |
clientID |
str |
Client ID |
eventName |
str |
Pipeline name |
archive |
VodArchiveInfo |
Archive metadata (file count, size, expiry) |
files |
List[VodArchiveFile] |
List of archive files with presigned URLs |
IPTV & Decode Models
IptvQueryRequest
| Field |
Type |
Required |
Description |
server |
HttpUrl |
✓ |
IPTV server URL |
username |
str |
✓ |
IPTV username |
password |
str |
✓ |
IPTV password |
channelName |
str |
✓ |
Channel name to search |
categoryId |
int |
|
Category filter |
format |
str |
|
Stream format preference |
preferHD |
bool |
|
Prefer HD streams |
preferUK |
bool |
|
Prefer UK streams |
avoidVIP |
bool |
|
Avoid VIP-only streams |
IptvQueryResponse
| Field |
Type |
Description |
results |
List[IptvStream] |
List of matching IPTV streams |
IptvStream
| Field |
Type |
Description |
name |
str |
Stream name |
stream_id |
int |
Stream identifier |
stream_urls |
List[HttpUrl] |
Playback URLs |
DecodeJobRequest
| Field |
Type |
Required |
Description |
eventID |
str |
✓ |
Pipeline ID to decode against |
input_stream |
HttpUrl |
✓ |
Stream URL to decode |
DecodeJobResponse
| Field |
Type |
Description |
message |
str |
Response message |
eventID |
str |
Pipeline ID |
clientID |
str |
Detected client/user ID (if found) |
Type Aliases & Enums
InputType
Literal type for input protocols:
"RTP" | "RTP_FEC" | "RIST" | "ZIXI_PUSH" | "SRT_LISTENER" | "SRT_CALLER"
ContainerType
Literal type for packager container formats:
"CMAF" | "TS" | "ISM"
AdaptiveQuantization
Literal type for adaptive quantization modes:
"AUTO" | "OFF" | "LOW" | "MEDIUM" | "HIGH" | "HIGHER" | "MAX"
H264Profile
Literal type for H.264 codec profiles:
"BASELINE" | "MAIN" | "HIGH" | "HIGH_10BIT" | "HIGH_422" | "HIGH_422_10BIT"
H265Profile
Literal type for H.265 codec profiles:
"MAIN" | "MAIN_10BIT"
Client Methods Reference
Complete API reference for StegawaveClient methods.
Pipeline Management
create_pipeline(request: CreatePipelineRequest) → CreatePipelineResponse
Create a new pipeline. Returns immediately without waiting for provisioning.
response = client.create_pipeline(request)
print(response.eventID)
create_pipeline_session(request: CreatePipelineRequest, wait: bool = True) → PipelineSession
Create pipeline and return a session wrapper. If wait=True, blocks until ready.
session = client.create_pipeline_session(request, wait=True)
print(session.input_uri)
get_pipeline(event_id: str) → PipelineStatusResponse
Get status for a specific pipeline by event ID.
status = client.get_pipeline("event-123")
print(status.status, status.createdAt)
list_pipelines() → PipelineListResponse
List all pipelines for this client.
listing = client.list_pipelines()
for pipeline in listing.pipelines:
print(pipeline.eventID, pipeline.status)
get_pipeline_session(event_id: str) → PipelineSession
Get a session wrapper for an existing pipeline.
session = client.get_pipeline_session("event-123")
print(session.status.status)
start_pipeline(event_id: str) → StateResponse
Start a stopped pipeline.
response = client.start_pipeline("event-123")
print(response.state)
stop_pipeline(event_id: str) → StateResponse
Stop a running pipeline.
response = client.stop_pipeline("event-123")
print(response.state)
delete_pipeline(event_id: str) → DeleteResponse
Delete a pipeline and all associated resources.
response = client.delete_pipeline("event-123")
print(response.status)
reset_history(event_id: str) → ResetHistoryResponse
Reset the channel history (DVR/startover window) for a pipeline. The pipeline must be stopped first or the API will return a 400 error. This operation clears the DVR window and manifest history.
# Stop pipeline, reset history, then restart
client.stop_pipeline("event-123")
response = client.reset_history("event-123")
print(response.message) # "Channel history reset successfully"
client.start_pipeline("event-123")
wait_for_event(event_id: str, timeout: float = 300.0, poll_interval: float = 5.0) → PipelineStatusResponse
Poll until pipeline is ready or timeout expires.
status = client.wait_for_event("event-123", timeout=600)
print("Ready:", status.is_ready())
Token & Playback
fetch_token(user_keys: List[str], exp_hours: int = 0) → TokenResponse
Generate viewer tokens for one or more user keys. exp_hours=0 means no expiration.
tokens = client.fetch_token(["user1", "user2"], exp_hours=4)
print(tokens.tokens["user1"])
IPTV & Decode
iptv_query(request: IptvQueryRequest) → IptvQueryResponse
Search IPTV catalogs for matching channels.
response = client.iptv_query(request)
for stream in response.results:
print(stream.name, stream.stream_urls)
submit_decode_job(request: DecodeJobRequest) → DecodeJobResponse
Submit a watermark decode job for a stream URL.
response = client.submit_decode_job(request)
if response.clientID:
print("Detected:", response.clientID)
VOD Archive
get_vod_archive(event_id: str, expires_in: int = 3600, download: bool = False, download_path: str = None) → VodArchiveResponse
List VOD archive files with presigned URLs. Optionally download all files.
# List files only
response = client.get_vod_archive("event-123", expires_in=7200)
for file in response.files:
print(file.filename, file.url)
# Download all files
response = client.get_vod_archive(
"event-123",
download=True,
download_path="./archive"
)
PipelineSession Reference
The PipelineSession class wraps a pipeline with convenience methods for status polling, input details, and signed manifest generation.
Properties
| Property |
Type |
Description |
event_id |
str |
Pipeline event ID |
status |
PipelineStatusResponse |
Latest status (cached) |
input_uri |
str | None |
Primary input endpoint URI |
Methods
refresh() → PipelineStatusResponse
Fetch latest status from API and update cache.
status = session.refresh()
print(status.status)
wait_until_ready(timeout: float = 300.0, poll_interval: float = 5.0) → PipelineStatusResponse
Poll until pipeline is ready or timeout expires.
status = session.wait_until_ready(timeout=600, poll_interval=5)
print("Ready:", status.is_ready())
get_input() → InputDetails
Normalize input details into a structured object.
input_details = session.get_input()
print(input_details.protocol)
print(input_details.uri)
print(input_details.uris)
print(input_details.allowed_ips)
get_manifests() → List[ManifestDetails] | None
Extract manifest metadata from CDN endpoints.
manifests = session.get_manifests()
if manifests:
for manifest in manifests:
print(manifest.protocol, manifest.uri)
signed_manifest_uris(user_key: str, exp_hours: int = 0, parameter: str = "token") → List[str]
Generate signed CDN manifest URLs for a viewer.
urls = session.signed_manifest_uris("viewer-123", exp_hours=4)
for url in urls:
print(url)