and pass that into another (class-based) task, how should we do that? Should we pass it to the task initialization (e.g.,
snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
) or task call (i.e.,
my_task = MyTask(snapshot_date=snapshot_date)
)? This is assuming that
column that serves as some partition. Hence, I need some way to know when the flow started running, the same way the weekly interval scheduler knows when to run the flow…does that make sense?
and then pass that to an intermediate task that computes
for deferred execution
that can be undefined or defined based on some parameter dataclass
. If undefined, I’d want that parameter to be
. Is something like this valid within a
snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today()) checkpoint_dirname = Parameter(name="checkpoint_dirname", default=params.checkpoint_dirname or snapshot_date)
is the init. The second one is the run…so at this point it doesn’t really matter right? You can use the
. What you can’t do is:
my_task = MyTask(snapshot_date=snapshot_date)() with Flow(...) as flow: snapshot_date = Parameter("snapshot_date", ...)
that wraps all the parameters in a flow. Has that come up before? For example, if
is some dataclass that wraps a bunch of flow parameters and sub-configurations, each task in my flow would be initialized via something like:
with Flow(...) as flow: config = build_pipeline_config(...) my_task = MyTask(config=config) my_task2 = MyTask2(config=config)(my_task=my_task)
is). As long as that evaluates correctly, I would personally do a one-liner task for that.
Or you can use the
snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today()) checkpoint_dirname = Parameter(name="checkpoint_dirname", default=params.checkpoint_dirname) checkpoint_dirname = task(lambda x, y: (x or y))(checkpoint_dirname, params.checkpoint_dirname)
task to make an if-else and then assign it inside the Flow
is a dictionary, you could pass that whole dictionary as a Parameter, or if it is a file say of S3, you can load it in as a task and the path would be the Parameter.
to work if you use script based storage? It may be a global and you might not need to pass it into tasks, but I think the thing is when you scale out to Dask, everything gets serialized together and send to a worker so I think being explicit about inputs to a task help with mapping. (Not 100% sure)