Migration from v0.13 to v0.14
DatatableTansform can become BatchTransform
Previously, if you had to do whole table transformation, you had to use
DatatableTransform
. Now you can substitute it with BatchTransform
which has
empty transform_keys
.
Before:
# Updates global count of input lines
def count(
ds: DataStore,
input_dts: List[DataTable],
output_dts: List[DataTable],
kwargs: Dict,
run_config: Optional[RunConfig] = None,
) -> None:
assert len(input_dts) == 1
assert len(output_dts) == 1
input_dt = input_dts[0]
output_dt = output_dts[0]
output_dt.store_chunk(
pd.DataFrame(
{"result_id": [0], "count": [len(input_dt.meta_table.get_existing_idx())]}
)
)
# ...
DatatableTransform(
count,
inputs=["input"],
outputs=["result"],
)
After:
# Updates global count of input lines
def count(
input_df: pd.DataFrame,
) -> pd.DataFrame:
return pd.DataFrame({"result_id": [0], "count": [len(input_df)]})
# ...
BatchTransform(
count,
inputs=["input"],
outputs=["result"],
# Important, we have to specify empty set in order for transformation to operate on
# the whole input at once
transform_keys=[],
)
SQLAlchemy tables can be used directly without duplication in Catalog
Starting v0.14
SQLA table can be provided directly into inputs=
or
outputs=
parameters without duplicating entry in Catalog
.
Note, that in order for datapipe db create-all
to work, we should use the same
SQLA for declarative base and in datapipe.
Example:
class Base(DeclarativeBase):
pass
class Input(Base):
__tablename__ = "input"
group_id: Mapped[int] = mapped_column(primary_key=True)
item_id: Mapped[int] = mapped_column(primary_key=True)
class Output(Base):
__tablename__ = "output"
group_id: Mapped[int] = mapped_column(primary_key=True)
count: Mapped[int]
# ...
pipeline = Pipeline(
[
BatchGenerate(
generate_data,
outputs=[Input],
),
DatatableBatchTransform(
count_tbl,
inputs=[Input],
outputs=[Output],
),
]
)
# Note! `sqla_metadata` is used from SQLAlchemy DeclarativeBase
dbconn = DBConn("sqlite+pysqlite3:///db.sqlite", sqla_metadata=Base.metadata)
ds = DataStore(dbconn)
app = DatapipeApp(ds=ds, catalog=Catalog({}), pipeline=pipeline)
Table can be provided directly without Catalog
Similar to usage pattern of SQLA tables, it is also possible to pass
datapipe.compute.Table
instance directly without registering in catalog.
from datapipe.compute import Table
from datapipe.store.filedir import PILFile, TableStoreFiledir
from datapipe.step.batch_transform import BatchTransform
from datapipe.step.update_external_table import UpdateExternalTable
input_images_tbl = Table(
name="input_images",
store=TableStoreFiledir("input/{id}.jpeg", PILFile("jpg")),
)
preprocessed_images_tbl = Table(
name="preprocessed_images",
store=TableStoreFiledir("output/{id}.png", PILFile("png")),
)
# ...
pipeline = Pipeline(
[
UpdateExternalTable(output=input_images_tbl),
BatchTransform(
batch_preprocess_images,
inputs=[input_images_tbl],
outputs=[preprocessed_images_tbl],
chunk_size=100,
),
]
)