Pipelines
Create and monitor long-running bioinformatics jobs asynchronously. Pipelines run on managed AWS infrastructure — you submit, poll for status, and download outputs when done. Requires the tools scope.
Predefined Pipelines
SmartsBio ships 12 ready-to-run multi-step workflows. Instead of specifying a tool_id, pass a pipeline_id and the system handles orchestration, step ordering, and intermediate file handoff automatically. Use the list_pipelines tool to fetch the current list and required inputs at runtime.
# Discover all predefined pipelines and their required inputs
result = client.tools.run(tool_id="list_pipelines", input={})
for p in result["pipelines"]:
print(f"{p['id']:40s} ~{p['estimated_runtime']}")
for param in p["required_inputs"]:
print(f" {param['name']:30s} {param['description']}")Pipeline Reference
| pipeline_id | Name | Required Inputs | Main Outputs | Est. Runtime |
|---|---|---|---|---|
| alignment-wes | WES Alignment | fastq_r1, fastq_r2, reference (GRCh38 / GRCh37) | sorted.markdup.bqsr.bam, .bai, fastqc_report.html, multiqc_report.html | 2–4 hours |
| whole-genome-sequencing | Whole Genome Sequencing | fastq_r1, fastq_r2, reference | sorted.markdup.bqsr.bam, .bai, qc_report.html | 4–8 hours |
| rna-seq-analysis | RNA-seq Analysis | fastq_r1, fastq_r2 (optional), gtf, reference | counts_matrix.csv, alignment_summary.txt, multiqc_report.html | 1–3 hours |
| atac-seq | ATAC-seq | fastq_r1, fastq_r2, reference, genome (hg38 / mm10) | peaks.narrowPeak, peaks.bed, fragment_sizes.pdf, multiqc_report.html | 1–2 hours |
| chip-seq | ChIP-seq | fastq_chip, fastq_input, reference, genome | peaks.narrowPeak, motifs/, bigwig/ | 1–3 hours |
| gatk-variant-calling | GATK Variant Calling | bam, bai, reference (GRCh38 / GRCh37) | variants.vcf.gz, variants.vcf.gz.tbi, genotyping_summary.txt | 1–4 hours |
| somatic-variant-calling | Somatic Variant Calling | tumor_bam, tumor_bai, normal_bam, normal_bai, reference | somatic.filtered.vcf.gz, somatic.filtered.vcf.gz.tbi, contamination.table | 2–5 hours |
| quality-control | Quality Control | fastq (list of one or more FASTQ files) | multiqc_report.html, per_file_fastqc/ | 10–30 minutes |
| protein-binder-design-validated | Protein Binder Design (Validated) | target_pdb or target_sequence, binding_site (optional), n_designs | designs.pdb (top N), affinity_scores.csv, design_report.pdf | 30–90 minutes |
| nanobody-discovery | Nanobody Discovery | antigen_pdb or antigen_sequence, epitope_residues (optional), n_candidates | nanobody_candidates.pdb, binding_scores.csv, screening_report.pdf | 1–2 hours |
| enzyme-engineering | Enzyme Engineering | scaffold_pdb or scaffold_sequence, reaction_smiles, optimization_target | engineered_variants.pdb, activity_predictions.csv, stability_scores.csv | 1–3 hours |
| structure-based-drug-discovery | Structure-Based Drug Discovery | target_pdb or uniprot_id, ligand_library or smiles_list | docking_results.csv, top_poses.sdf, binding_site_report.pdf | 30–120 minutes |
Run a Predefined Pipeline
Pass pipeline_id instead of tool_id in the request body. All file references use workspace-relative S3 keys returned by the Files API.
# 1. Upload inputs
fastq_r1 = client.files.upload("sample_R1.fastq.gz", workspace_id=ws_id, path="input/")
fastq_r2 = client.files.upload("sample_R2.fastq.gz", workspace_id=ws_id, path="input/")
# 2. Start the predefined WES alignment pipeline
pipeline = client.pipelines.create(
pipeline_id="alignment-wes", # ← predefined pipeline ID
workspace_id=ws_id,
input={
"fastq_r1": fastq_r1["key"],
"fastq_r2": fastq_r2["key"],
"reference": "GRCh38",
"output_path": "results/wes/",
},
)
print(f"Started {pipeline['id']} status={pipeline['status']}")
# 3. Wait and download
pipeline = client.pipelines.wait(
pipeline["id"],
workspace_id=ws_id,
poll_interval=30,
on_progress=lambda p: print(f" {p['progress_pct']}%"),
)
for key in pipeline["output_paths"]:
client.files.download(key, workspace_id=ws_id, dest="./output/")Example — Protein Binder Design
# Upload target structure
target = client.files.upload("target.pdb", workspace_id=ws_id, path="input/")
# Start protein binder design pipeline
pipeline = client.pipelines.create(
pipeline_id="protein-binder-design-validated",
workspace_id=ws_id,
input={
"target_pdb": target["key"],
"binding_site": "A:45-60,A:102-115", # optional chain:residue ranges
"n_designs": 10,
"output_path": "results/binders/",
},
)
# Wait for completion (~30-90 min)
pipeline = client.pipelines.wait(
pipeline["id"],
workspace_id=ws_id,
poll_interval=60,
on_progress=lambda p: print(f" Step: {p.get('current_step', '?')} {p['progress_pct']}%"),
)
# Download top designs and affinity scores
for key in pipeline["output_paths"]:
client.files.download(key, workspace_id=ws_id, dest="./binders/")Custom Tool Pipelines
Run any single pipeline-capable tool by specifying tool_id instead of pipeline_id. See Available Tools for the full list — tools marked Pipeline or Both support this mode.
The Pipeline object
| Field | Type | Description |
|---|---|---|
| id | string | Unique pipeline ID (e.g. pipe_abc123). |
| tool_id | string | Tool that was run — or pipeline:{id} for predefined pipelines. |
| workspace_id | string | Workspace the pipeline belongs to. |
| status | string | queued → running → completed | failed | cancelled. |
| progress_pct | integer | Completion percentage 0–100. Updated during execution. |
| current_step | string | null | Human-readable name of the step currently executing (predefined pipelines only). |
| input | object | The exact input parameters submitted. |
| output_paths | string[] | Paths of output files relative to workspace root (populated when completed). |
| logs_path | string | Path to the execution log file, relative to workspace root. |
| error | string | null | Error message if failed, otherwise null. |
| created_at | string | ISO 8601 creation timestamp. |
| started_at | string | null | When execution began. |
| completed_at | string | null | When execution finished (success or failure). |
/v1/pipelinestools scopeCreate and queue a pipeline run. Returns immediately with the pipeline ID and status: "queued". Supply either pipeline_id (predefined) or tool_id (custom tool) — not both.
| Body field | Type | Description |
|---|---|---|
| pipeline_id | string | ID of a predefined pipeline (e.g. alignment-wes). Use this or tool_id. |
| tool_id | string | Pipeline-capable tool ID (e.g. gatk_toolkit). Use this or pipeline_id. |
| workspace_id * | string | Workspace for input/output files. |
| input * | object | Pipeline or tool-specific input parameters (file keys, settings, etc.). |
# Predefined pipeline
pipeline = client.pipelines.create(
pipeline_id="gatk-variant-calling",
workspace_id=ws_id,
input={
"bam": bam["key"],
"bai": bai["key"],
"reference": "GRCh38",
"output_path": "variants/",
},
)
# Custom tool pipeline
pipeline = client.pipelines.create(
tool_id="bwa_toolkit",
workspace_id=ws_id,
input={"fastq": fastq["key"], "reference": "GRCh38"},
)
print(f"Pipeline {pipeline['id']} is {pipeline['status']}")/v1/pipelines/:idtools scopeGet the current status of a pipeline. Poll until status is completed, failed, or cancelled. Recommended interval: 15–30 seconds.
| Parameter | Type | Description |
|---|---|---|
| id * | string (path) | Pipeline ID. |
| workspace_id * | string (query) | Workspace the pipeline belongs to. |
pipeline = client.pipelines.get("pipe_abc123", workspace_id=ws_id)
print(f"Step: {pipeline.get('current_step', '?')}")
print(f"Status: {pipeline['status']} {pipeline['progress_pct']}%")/v1/pipelinestools scopeList pipelines in a workspace, optionally filtered by status.
| Parameter | Type | Description |
|---|---|---|
| workspace_id * | string | Workspace to list pipelines from. |
| status | string | Filter: queued, running, completed, failed. |
| limit | integer | Max results (default 20, max 100). |
pipelines = client.pipelines.list(workspace_id=ws_id)
for p in pipelines:
print(f" {p['id']} {p['tool_id']:<35} {p['status']} {p['progress_pct']}%")
running = client.pipelines.list(workspace_id=ws_id, status="running")/v1/pipelines/:idtools scopeCancel a queued or running pipeline. Has no effect on completed or already-cancelled pipelines.
result = client.pipelines.cancel("pipe_abc123", workspace_id=ws_id)
print(result["message"]) # "Pipeline cancelled."Polling with .wait()
Both SDKs include a convenience .wait() method that polls until the pipeline finishes and raises on failure.
pipeline = client.pipelines.create(
pipeline_id="rna-seq-analysis",
workspace_id=ws_id,
input={"fastq_r1": r1["key"], "fastq_r2": r2["key"],
"gtf": gtf["key"], "reference": "GRCh38"},
)
pipeline = client.pipelines.wait(
pipeline["id"],
workspace_id=ws_id,
poll_interval=30, # seconds
timeout=14400, # 4 hours
on_progress=lambda p: print(f" [{p.get('current_step','?')}] {p['progress_pct']}%"),
)
if pipeline["status"] == "completed":
for key in pipeline["output_paths"]:
client.files.download(key, workspace_id=ws_id, dest="./output/")
else:
print(f"Failed: {pipeline['error']}")Accessing logs
The logs_path field is populated as soon as a pipeline starts running.
pipeline = client.pipelines.get(pipeline_id, workspace_id=ws_id)
if pipeline.get("logs_path"):
client.files.download(pipeline["logs_path"], workspace_id=ws_id, dest="./logs/")