Skip to main content

Pipelines

Submit long-running bioinformatics jobs, monitor their progress, and download results when complete — all asynchronously.

Upload → create pipeline → wait → download

The canonical async pipeline pattern: upload inputs, submit the job, poll until done, then download all outputs.

from smartsbio import SmartsBio
import time

client = SmartsBio(api_key="sk_live_...")
ws_id = client.workspaces.list()[0].id

# 1. Upload inputs
print("[1/4] Uploading BAM + index...")
bam = client.files.upload("sample.bam", workspace_id=ws_id, path="input/")
bai = client.files.upload("sample.bam.bai", workspace_id=ws_id, path="input/")

# 2. Create pipeline (returns immediately)
print("[2/4] Submitting pipeline...")
pipeline = client.pipelines.create(
    tool_id="gatk_variant_calling",
    workspace_id=ws_id,
    input={
        "bam": bam["key"],
        "bai": bai["key"],
        "reference": "GRCh38",
        "intervals": "wgs",
        "output_path": "variants/",
    },
)
print(f"  Pipeline {pipeline['id']} queued")

# 3. Poll until completed / failed
print("[3/4] Waiting for completion...")
while pipeline["status"] in ("queued", "running"):
    time.sleep(20)
    pipeline = client.pipelines.get(pipeline["id"], workspace_id=ws_id)
    print(f"  {pipeline['status']}  {pipeline['progress_pct']}%")

if pipeline["status"] != "completed":
    raise RuntimeError(f"Pipeline failed: {pipeline['error']}")

# 4. Download all output files
print("[4/4] Downloading outputs...")
for key in pipeline["output_paths"]:
    local = client.files.download(key, workspace_id=ws_id, dest="./output/")
    print(f"  Saved: {local}")

print("\nDone.")

Using the .wait() helper

Both SDKs include a .wait() convenience method that handles polling and raises on failure, reducing boilerplate.

from smartsbio import SmartsBio

client = SmartsBio(api_key="sk_live_...")
ws_id = client.workspaces.list()[0].id

# Upload inputs
vcf = client.files.upload("sample.vcf", workspace_id=ws_id, path="input/")

# Submit + wait in two lines
pipeline = client.pipelines.create(
    tool_id="gatk_variant_annotation",
    workspace_id=ws_id,
    input={
        "file_path": vcf["key"],
        "annotations": ["ClinVar", "gnomAD_AF", "CADD"],
        "output_path": "annotated/",
    },
)

# .wait() blocks until completed, polling every 15s
pipeline = client.pipelines.wait(
    pipeline["id"],
    workspace_id=ws_id,
    poll_interval=15,
    on_progress=lambda p: print(f"  {p['status']}  {p['progress_pct']}%"),
)

# Download outputs
for key in pipeline["output_paths"]:
    local = client.files.download(key, workspace_id=ws_id, dest="./annotated/")
    print(f"Saved: {local}")

Monitor all active pipelines

List all running or queued pipelines in a workspace and report their status.

from smartsbio import SmartsBio

client = SmartsBio(api_key="sk_live_...")
ws_id = client.workspaces.list()[0].id

active = client.pipelines.list(workspace_id=ws_id, status="running") + \
         client.pipelines.list(workspace_id=ws_id, status="queued")

if not active:
    print("No active pipelines.")
else:
    print(f"{'ID':<15}  {'Tool':<30}  {'Status':<10}  {'Progress'}")
    for p in active:
        bar = '█' * (p['progress_pct'] // 10) + '░' * (10 - p['progress_pct'] // 10)
        print(f"{p['id']:<15}  {p['tool_id']:<30}  {p['status']:<10}  [{bar}] {p['progress_pct']}%")

Batch pipeline submission

Submit multiple samples concurrently, then wait for all to complete and download their outputs.

from smartsbio import SmartsBio
import os, time, concurrent.futures

client = SmartsBio(api_key="sk_live_...")
ws_id = client.workspaces.list()[0].id

vcf_files = [f for f in os.listdir("./samples/") if f.endswith(".vcf")]

# Upload all files
def upload_and_submit(vcf_filename: str) -> dict:
    upload = client.files.upload(
        f"./samples/{vcf_filename}", workspace_id=ws_id, path="batch/input/"
    )
    return client.pipelines.create(
        tool_id="gatk_variant_annotation",
        workspace_id=ws_id,
        input={
            "file_path": upload["key"],
            "annotations": ["ClinVar", "gnomAD_AF"],
            "output_path": f"batch/annotated/{vcf_filename}/",
        },
    )

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool:
    pipelines = list(pool.map(upload_and_submit, vcf_files))

print(f"Submitted {len(pipelines)} pipelines")

# Poll all until done
active = {p["id"]: p for p in pipelines}
while any(p["status"] in ("queued", "running") for p in active.values()):
    time.sleep(30)
    for pid in list(active):
        p = client.pipelines.get(pid, workspace_id=ws_id)
        active[pid] = p
        print(f"  {pid}  {p['status']}  {p['progress_pct']}%")

# Download completed outputs
for p in active.values():
    if p["status"] == "completed":
        for key in p["output_paths"]:
            client.files.download(key, workspace_id=ws_id, dest="./results/")
    else:
        print(f"  FAILED: {p['id']}  {p['error']}")