BatchTransform
BatchTransoform(
func: BatchTransformFunc,
inputs: List[PipelineInput],
outputs: List[TableOrName],
chunk_size: int = 1000,
kwargs: Optional[Dict[str, Any]] = None,
transform_keys: Optional[List[str]] = None,
labels: Optional[Labels] = None,
executor_config: Optional[ExecutorConfig] = None,
filters: Optional[Union[LabelDict, Callable[[], LabelDict]]] = None,
order_by: Optional[List[str]] = None,
order: Literal["asc", "desc"] = "asc",
)
Arguments
func
Function which is a body of transform, it receives the same number of
pd.DataFrame
-s in the same order as specified in inputs
It should return a single pd.DataFrame
if the output
has one element or a
tuple of pd.DataFrame
of the same length as output
which will be interpreted
as corresponding to elements in output
.
inputs
A list of input tables for a given transformation. Each element might be either:
- a string, this string will be interpreted as a name of
Table
fromCatalog
- an SQLAlchemy ORM table, this table will be added implicitly to
Catalog
and used as an input - a qualifier
Required
with parameter either a string or an SQLAlchemy table, in this case same rules apply to the inner part and qualifierRequired
tells Datapipe that rows from this table must be present at calculation of transformations to compute
Example:
# ...
BatchTransform(
func=apply_detection_model,
inputs=[
# This is a table from Catalog.
# keys: <image_id>
"images",
# This is an SQLAlchemy table defined with declarative ORM.
# keys: <model_id>
DectionModel,
# This is a table from Catalog, which contains the identifier of current
# model, entries from DetectionModel will be filtered joining on `model_id`.
# keys: <model_id>
Required("current_model"),
],
# ...
)
# ...