Danny Vilela
08/12/2021, 10:15 PMsnapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
and pass that into another (class-based) task, how should we do that? Should we pass it to the task initialization (e.g., my_task = MyTask(snapshot_date=snapshot_date)
) or task call (i.e., my_task(snapshot_date=snapshot_date)
)?
This is assuming that MyTask
uses snapshot_date
within its run
method.Kevin Kho
Danny Vilela
08/12/2021, 10:46 PMsnapshot_date
column that serves as some partition. Hence, I need some way to know when the flow started running, the same way the weekly interval scheduler knows when to run the flow…does that make sense?Kevin Kho
Kevin Kho
"today"
and then pass that to an intermediate task that computes dt.datetime.today()
for deferred executionKevin Kho
my_task = MyTask(snapshot_date=snapshot_date)
if this exists in your flow for example:
with Flow(...) as flow:
snapshot_date = Parameter("snapshot_date", ...)
my_task = MyTask(snapshot_date=snapshot_date)()
Danny Vilela
08/12/2021, 10:55 PMcheckpoint_dirname
that can be undefined or defined based on some parameter dataclass params.checkpoint_dirname
. If undefined, I’d want that parameter to be snapshot_date
. Is something like this valid within a Flow
context?
snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
checkpoint_dirname = Parameter(name="checkpoint_dirname", default=params.checkpoint_dirname or snapshot_date)
Kevin Kho
()
is the init. The second one is the run…so at this point it doesn’t really matter right? You can use the run
.
What you can’t do is:
my_task = MyTask(snapshot_date=snapshot_date)()
with Flow(...) as flow:
snapshot_date = Parameter("snapshot_date", ...)
Danny Vilela
08/12/2021, 10:58 PMPipelineConfig
that wraps all the parameters in a flow. Has that come up before? For example, if PipelineConfig
is some dataclass that wraps a bunch of flow parameters and sub-configurations, each task in my flow would be initialized via something like:
with Flow(...) as flow:
config = build_pipeline_config(...)
my_task = MyTask(config=config)
my_task2 = MyTask2(config=config)(my_task=my_task)
Kevin Kho
params.checkpoint_dirname
is). As long as that evaluates correctly, I would personally do a one-liner task for that.
snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
checkpoint_dirname = Parameter(name="checkpoint_dirname", default=params.checkpoint_dirname)
checkpoint_dirname = task(lambda x, y: (x or y))(checkpoint_dirname, params.checkpoint_dirname)
Or you can use the case
task to make an if-else and then assign it inside the FlowKevin Kho
PipelineConfig
is a dictionary, you could pass that whole dictionary as a Parameter, or if it is a file say of S3, you can load it in as a task and the path would be the Parameter.Kevin Kho
PipelineConfig
to work if you use script based storage? It may be a global and you might not need to pass it into tasks, but I think the thing is when you scale out to Dask, everything gets serialized together and send to a worker so I think being explicit about inputs to a task help with mapping. (Not 100% sure)Kevin Kho