Skip to main content

Pipelines

Create and monitor long-running bioinformatics jobs asynchronously. Pipelines run on managed AWS infrastructure — you submit, poll for status, and download outputs when done. Requires the tools scope.

1. POST /v1/pipelines
2. GET /v1/pipelines/:id
3. status = completed
4. download outputs

Predefined Pipelines

SmartsBio ships 12 ready-to-run multi-step workflows. Instead of specifying a tool_id, pass a pipeline_id and the system handles orchestration, step ordering, and intermediate file handoff automatically. Use the list_pipelines tool to fetch the current list and required inputs at runtime.

# Discover all predefined pipelines and their required inputs
result = client.tools.run(tool_id="list_pipelines", input={})
for p in result["pipelines"]:
    print(f"{p['id']:40s} ~{p['estimated_runtime']}")
    for param in p["required_inputs"]:
        print(f"  {param['name']:30s} {param['description']}")

Pipeline Reference

pipeline_idNameRequired InputsMain OutputsEst. Runtime
alignment-wesWES Alignmentfastq_r1, fastq_r2, reference (GRCh38 / GRCh37)sorted.markdup.bqsr.bam, .bai, fastqc_report.html, multiqc_report.html2–4 hours
whole-genome-sequencingWhole Genome Sequencingfastq_r1, fastq_r2, referencesorted.markdup.bqsr.bam, .bai, qc_report.html4–8 hours
rna-seq-analysisRNA-seq Analysisfastq_r1, fastq_r2 (optional), gtf, referencecounts_matrix.csv, alignment_summary.txt, multiqc_report.html1–3 hours
atac-seqATAC-seqfastq_r1, fastq_r2, reference, genome (hg38 / mm10)peaks.narrowPeak, peaks.bed, fragment_sizes.pdf, multiqc_report.html1–2 hours
chip-seqChIP-seqfastq_chip, fastq_input, reference, genomepeaks.narrowPeak, motifs/, bigwig/1–3 hours
gatk-variant-callingGATK Variant Callingbam, bai, reference (GRCh38 / GRCh37)variants.vcf.gz, variants.vcf.gz.tbi, genotyping_summary.txt1–4 hours
somatic-variant-callingSomatic Variant Callingtumor_bam, tumor_bai, normal_bam, normal_bai, referencesomatic.filtered.vcf.gz, somatic.filtered.vcf.gz.tbi, contamination.table2–5 hours
quality-controlQuality Controlfastq (list of one or more FASTQ files)multiqc_report.html, per_file_fastqc/10–30 minutes
protein-binder-design-validatedProtein Binder Design (Validated)target_pdb or target_sequence, binding_site (optional), n_designsdesigns.pdb (top N), affinity_scores.csv, design_report.pdf30–90 minutes
nanobody-discoveryNanobody Discoveryantigen_pdb or antigen_sequence, epitope_residues (optional), n_candidatesnanobody_candidates.pdb, binding_scores.csv, screening_report.pdf1–2 hours
enzyme-engineeringEnzyme Engineeringscaffold_pdb or scaffold_sequence, reaction_smiles, optimization_targetengineered_variants.pdb, activity_predictions.csv, stability_scores.csv1–3 hours
structure-based-drug-discoveryStructure-Based Drug Discoverytarget_pdb or uniprot_id, ligand_library or smiles_listdocking_results.csv, top_poses.sdf, binding_site_report.pdf30–120 minutes

Run a Predefined Pipeline

Pass pipeline_id instead of tool_id in the request body. All file references use workspace-relative S3 keys returned by the Files API.

# 1. Upload inputs
fastq_r1 = client.files.upload("sample_R1.fastq.gz", workspace_id=ws_id, path="input/")
fastq_r2 = client.files.upload("sample_R2.fastq.gz", workspace_id=ws_id, path="input/")

# 2. Start the predefined WES alignment pipeline
pipeline = client.pipelines.create(
    pipeline_id="alignment-wes",      # ← predefined pipeline ID
    workspace_id=ws_id,
    input={
        "fastq_r1": fastq_r1["key"],
        "fastq_r2": fastq_r2["key"],
        "reference": "GRCh38",
        "output_path": "results/wes/",
    },
)
print(f"Started {pipeline['id']}  status={pipeline['status']}")

# 3. Wait and download
pipeline = client.pipelines.wait(
    pipeline["id"],
    workspace_id=ws_id,
    poll_interval=30,
    on_progress=lambda p: print(f"  {p['progress_pct']}%"),
)
for key in pipeline["output_paths"]:
    client.files.download(key, workspace_id=ws_id, dest="./output/")

Example — Protein Binder Design

# Upload target structure
target = client.files.upload("target.pdb", workspace_id=ws_id, path="input/")

# Start protein binder design pipeline
pipeline = client.pipelines.create(
    pipeline_id="protein-binder-design-validated",
    workspace_id=ws_id,
    input={
        "target_pdb": target["key"],
        "binding_site": "A:45-60,A:102-115",  # optional chain:residue ranges
        "n_designs": 10,
        "output_path": "results/binders/",
    },
)

# Wait for completion (~30-90 min)
pipeline = client.pipelines.wait(
    pipeline["id"],
    workspace_id=ws_id,
    poll_interval=60,
    on_progress=lambda p: print(f"  Step: {p.get('current_step', '?')}  {p['progress_pct']}%"),
)

# Download top designs and affinity scores
for key in pipeline["output_paths"]:
    client.files.download(key, workspace_id=ws_id, dest="./binders/")

Custom Tool Pipelines

Run any single pipeline-capable tool by specifying tool_id instead of pipeline_id. See Available Tools for the full list — tools marked Pipeline or Both support this mode.

The Pipeline object

FieldTypeDescription
idstringUnique pipeline ID (e.g. pipe_abc123).
tool_idstringTool that was run — or pipeline:{id} for predefined pipelines.
workspace_idstringWorkspace the pipeline belongs to.
statusstringqueuedrunningcompleted | failed | cancelled.
progress_pctintegerCompletion percentage 0–100. Updated during execution.
current_stepstring | nullHuman-readable name of the step currently executing (predefined pipelines only).
inputobjectThe exact input parameters submitted.
output_pathsstring[]Paths of output files relative to workspace root (populated when completed).
logs_pathstringPath to the execution log file, relative to workspace root.
errorstring | nullError message if failed, otherwise null.
created_atstringISO 8601 creation timestamp.
started_atstring | nullWhen execution began.
completed_atstring | nullWhen execution finished (success or failure).
POST/v1/pipelinestools scope

Create and queue a pipeline run. Returns immediately with the pipeline ID and status: "queued". Supply either pipeline_id (predefined) or tool_id (custom tool) — not both.

Body fieldTypeDescription
pipeline_idstringID of a predefined pipeline (e.g. alignment-wes). Use this or tool_id.
tool_idstringPipeline-capable tool ID (e.g. gatk_toolkit). Use this or pipeline_id.
workspace_id *stringWorkspace for input/output files.
input *objectPipeline or tool-specific input parameters (file keys, settings, etc.).
# Predefined pipeline
pipeline = client.pipelines.create(
    pipeline_id="gatk-variant-calling",
    workspace_id=ws_id,
    input={
        "bam": bam["key"],
        "bai": bai["key"],
        "reference": "GRCh38",
        "output_path": "variants/",
    },
)

# Custom tool pipeline
pipeline = client.pipelines.create(
    tool_id="bwa_toolkit",
    workspace_id=ws_id,
    input={"fastq": fastq["key"], "reference": "GRCh38"},
)

print(f"Pipeline {pipeline['id']} is {pipeline['status']}")
GET/v1/pipelines/:idtools scope

Get the current status of a pipeline. Poll until status is completed, failed, or cancelled. Recommended interval: 15–30 seconds.

ParameterTypeDescription
id *string (path)Pipeline ID.
workspace_id *string (query)Workspace the pipeline belongs to.
pipeline = client.pipelines.get("pipe_abc123", workspace_id=ws_id)
print(f"Step: {pipeline.get('current_step', '?')}")
print(f"Status: {pipeline['status']}  {pipeline['progress_pct']}%")
GET/v1/pipelinestools scope

List pipelines in a workspace, optionally filtered by status.

ParameterTypeDescription
workspace_id *stringWorkspace to list pipelines from.
statusstringFilter: queued, running, completed, failed.
limitintegerMax results (default 20, max 100).
pipelines = client.pipelines.list(workspace_id=ws_id)
for p in pipelines:
    print(f"  {p['id']}  {p['tool_id']:<35}  {p['status']}  {p['progress_pct']}%")

running = client.pipelines.list(workspace_id=ws_id, status="running")
DELETE/v1/pipelines/:idtools scope

Cancel a queued or running pipeline. Has no effect on completed or already-cancelled pipelines.

result = client.pipelines.cancel("pipe_abc123", workspace_id=ws_id)
print(result["message"])  # "Pipeline cancelled."

Polling with .wait()

Both SDKs include a convenience .wait() method that polls until the pipeline finishes and raises on failure.

pipeline = client.pipelines.create(
    pipeline_id="rna-seq-analysis",
    workspace_id=ws_id,
    input={"fastq_r1": r1["key"], "fastq_r2": r2["key"],
           "gtf": gtf["key"], "reference": "GRCh38"},
)

pipeline = client.pipelines.wait(
    pipeline["id"],
    workspace_id=ws_id,
    poll_interval=30,           # seconds
    timeout=14400,              # 4 hours
    on_progress=lambda p: print(f"  [{p.get('current_step','?')}]  {p['progress_pct']}%"),
)

if pipeline["status"] == "completed":
    for key in pipeline["output_paths"]:
        client.files.download(key, workspace_id=ws_id, dest="./output/")
else:
    print(f"Failed: {pipeline['error']}")

Accessing logs

The logs_path field is populated as soon as a pipeline starts running.

pipeline = client.pipelines.get(pipeline_id, workspace_id=ws_id)
if pipeline.get("logs_path"):
    client.files.download(pipeline["logs_path"], workspace_id=ws_id, dest="./logs/")