Pipelines
Submit long-running bioinformatics jobs, monitor their progress, and download results when complete — all asynchronously.
Upload → create pipeline → wait → download
The canonical async pipeline pattern: upload inputs, submit the job, poll until done, then download all outputs.
from smartsbio import SmartsBio
import time
client = SmartsBio(api_key="sk_live_...")
ws_id = client.workspaces.list()[0].id
# 1. Upload inputs
print("[1/4] Uploading BAM + index...")
bam = client.files.upload("sample.bam", workspace_id=ws_id, path="input/")
bai = client.files.upload("sample.bam.bai", workspace_id=ws_id, path="input/")
# 2. Create pipeline (returns immediately)
print("[2/4] Submitting pipeline...")
pipeline = client.pipelines.create(
tool_id="gatk_variant_calling",
workspace_id=ws_id,
input={
"bam": bam["key"],
"bai": bai["key"],
"reference": "GRCh38",
"intervals": "wgs",
"output_path": "variants/",
},
)
print(f" Pipeline {pipeline['id']} queued")
# 3. Poll until completed / failed
print("[3/4] Waiting for completion...")
while pipeline["status"] in ("queued", "running"):
time.sleep(20)
pipeline = client.pipelines.get(pipeline["id"], workspace_id=ws_id)
print(f" {pipeline['status']} {pipeline['progress_pct']}%")
if pipeline["status"] != "completed":
raise RuntimeError(f"Pipeline failed: {pipeline['error']}")
# 4. Download all output files
print("[4/4] Downloading outputs...")
for key in pipeline["output_paths"]:
local = client.files.download(key, workspace_id=ws_id, dest="./output/")
print(f" Saved: {local}")
print("\nDone.")Using the .wait() helper
Both SDKs include a .wait() convenience method that handles polling and raises on failure, reducing boilerplate.
from smartsbio import SmartsBio
client = SmartsBio(api_key="sk_live_...")
ws_id = client.workspaces.list()[0].id
# Upload inputs
vcf = client.files.upload("sample.vcf", workspace_id=ws_id, path="input/")
# Submit + wait in two lines
pipeline = client.pipelines.create(
tool_id="gatk_variant_annotation",
workspace_id=ws_id,
input={
"file_path": vcf["key"],
"annotations": ["ClinVar", "gnomAD_AF", "CADD"],
"output_path": "annotated/",
},
)
# .wait() blocks until completed, polling every 15s
pipeline = client.pipelines.wait(
pipeline["id"],
workspace_id=ws_id,
poll_interval=15,
on_progress=lambda p: print(f" {p['status']} {p['progress_pct']}%"),
)
# Download outputs
for key in pipeline["output_paths"]:
local = client.files.download(key, workspace_id=ws_id, dest="./annotated/")
print(f"Saved: {local}")Monitor all active pipelines
List all running or queued pipelines in a workspace and report their status.
from smartsbio import SmartsBio
client = SmartsBio(api_key="sk_live_...")
ws_id = client.workspaces.list()[0].id
active = client.pipelines.list(workspace_id=ws_id, status="running") + \
client.pipelines.list(workspace_id=ws_id, status="queued")
if not active:
print("No active pipelines.")
else:
print(f"{'ID':<15} {'Tool':<30} {'Status':<10} {'Progress'}")
for p in active:
bar = '█' * (p['progress_pct'] // 10) + '░' * (10 - p['progress_pct'] // 10)
print(f"{p['id']:<15} {p['tool_id']:<30} {p['status']:<10} [{bar}] {p['progress_pct']}%")Batch pipeline submission
Submit multiple samples concurrently, then wait for all to complete and download their outputs.
from smartsbio import SmartsBio
import os, time, concurrent.futures
client = SmartsBio(api_key="sk_live_...")
ws_id = client.workspaces.list()[0].id
vcf_files = [f for f in os.listdir("./samples/") if f.endswith(".vcf")]
# Upload all files
def upload_and_submit(vcf_filename: str) -> dict:
upload = client.files.upload(
f"./samples/{vcf_filename}", workspace_id=ws_id, path="batch/input/"
)
return client.pipelines.create(
tool_id="gatk_variant_annotation",
workspace_id=ws_id,
input={
"file_path": upload["key"],
"annotations": ["ClinVar", "gnomAD_AF"],
"output_path": f"batch/annotated/{vcf_filename}/",
},
)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool:
pipelines = list(pool.map(upload_and_submit, vcf_files))
print(f"Submitted {len(pipelines)} pipelines")
# Poll all until done
active = {p["id"]: p for p in pipelines}
while any(p["status"] in ("queued", "running") for p in active.values()):
time.sleep(30)
for pid in list(active):
p = client.pipelines.get(pid, workspace_id=ws_id)
active[pid] = p
print(f" {pid} {p['status']} {p['progress_pct']}%")
# Download completed outputs
for p in active.values():
if p["status"] == "completed":
for key in p["output_paths"]:
client.files.download(key, workspace_id=ws_id, dest="./results/")
else:
print(f" FAILED: {p['id']} {p['error']}")