Your First Pipeline
This guide walks through building a minimal pipeline that demonstrates datapipe's core behaviour: running only the work that needs to be done.
What we'll build
A pipeline with two steps:
- Generate a small table of words.
- Transform each word into its character count.
When a word changes, only its downstream computation re-runs. Everything unchanged is skipped.
Prerequisites
Install datapipe with the SQLite extra for local development:
pip install "datapipe-core[sqlite]"
The pipeline
Create a file app.py:
import pandas as pd
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from datapipe.compute import Catalog, DatapipeApp, Pipeline
from datapipe.datatable import DataStore
from datapipe.step.batch_generate import BatchGenerate
from datapipe.step.batch_transform import BatchTransform
from datapipe.store.database import DBConn
class Base(DeclarativeBase):
pass
class Word(Base):
__tablename__ = "words"
word_id: Mapped[int] = mapped_column(primary_key=True)
text: Mapped[str]
class WordLength(Base):
__tablename__ = "word_lengths"
word_id: Mapped[int] = mapped_column(primary_key=True)
length: Mapped[int]
def generate_words():
yield pd.DataFrame([
{"word_id": 1, "text": "hello"},
{"word_id": 2, "text": "world"},
{"word_id": 3, "text": "datapipe"},
])
def compute_lengths(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(length=df["text"].str.len())[["word_id", "length"]]
pipeline = Pipeline([
BatchGenerate(generate_words, outputs=[Word]),
BatchTransform(
compute_lengths,
inputs=[Word],
outputs=[WordLength],
),
])
dbconn = DBConn("sqlite+pysqlite3:///first_pipeline.sqlite", sqla_metadata=Base.metadata)
ds = DataStore(dbconn)
app = DatapipeApp(ds, Catalog({}), pipeline)
Run it
Create the database tables (do this once):
datapipe db create-all
Run the pipeline:
datapipe run
You should see both steps execute: generate_words fills the words table, then compute_lengths produces a row in word_lengths for each word.
Run again:
datapipe run
This time nothing is reprocessed — datapipe sees that the source data hasn't changed and skips both steps. This is the core behaviour: work is only done when it needs to be.
See the step list
datapipe step list
This shows all steps in your pipeline and how many records are pending for each.
What just happened
BatchGenerateis a special step that populates a table from an external source (here, a Python generator). It runs in full each time and datapipe detects which rows changed.BatchTransformreceives apd.DataFrameof the rows that need processing and returns apd.DataFrameof results. Datapipe tracks theupdate_ts/process_tspair for every record to determine what to pass in.- The metadata (which rows were processed, when, with what result) is stored in the SQLite file alongside your data tables.
Next steps
- Read Concepts to understand how incremental tracking works.
- See How to pull data from external sources for more
BatchGeneratepatterns. - See How to run model inference for multi-input transforms.