Hi! If we want to have a parameter within a flow l...
# ask-community
d
Hi! If we want to have a parameter within a flow like
snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
and pass that into another (class-based) task, how should we do that? Should we pass it to the task initialization (e.g.,
my_task = MyTask(snapshot_date=snapshot_date)
) or task call (i.e.,
my_task(snapshot_date=snapshot_date)
)? This is assuming that
MyTask
uses
snapshot_date
within its
run
method.
k
Hey @Danny Vilela, is snapshot date something like a watermark for “lsat processed time”? Are you aware we have the KV Store where you can persist this kind of stuff? So in the task you would retrieve it from there instead of using parameters
d
Hey @Kevin Kho! Not quite. You can think of the “flow” here as some ETL that writes to a table. Imagine that table has a
snapshot_date
column that serves as some partition. Hence, I need some way to know when the flow started running, the same way the weekly interval scheduler knows when to run the flow…does that make sense?
k
Gotcha. In my opinion just pass it during the run method right? There is one caveat here though. If you use a storage that serializes your Flow, `snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())`will serialize the datetime today and hardcode the default for future runs. This will work as intended if you use script-based storage.
If you use a pickle based storage, you need the parameter to be something static like
"today"
and then pass that to an intermediate task that computes
dt.datetime.today()
for deferred execution
You can pass it in the init like
my_task = MyTask(snapshot_date=snapshot_date)
if this exists in your flow for example:
Copy code
with Flow(...) as flow:
     snapshot_date = Parameter("snapshot_date", ...)
     my_task = MyTask(snapshot_date=snapshot_date)()
🙌 1
1
d
Gotcha! Another question: let’s say I have some other parameter
checkpoint_dirname
that can be undefined or defined based on some parameter dataclass
params.checkpoint_dirname
. If undefined, I’d want that parameter to be
snapshot_date
. Is something like this valid within a
Flow
context?
Copy code
snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
checkpoint_dirname = Parameter(name="checkpoint_dirname", default=params.checkpoint_dirname or snapshot_date)
k
The first
()
is the init. The second one is the run…so at this point it doesn’t really matter right? You can use the
run
. What you can’t do is:
Copy code
my_task = MyTask(snapshot_date=snapshot_date)()
with Flow(...) as flow:
     snapshot_date = Parameter("snapshot_date", ...)
1
d
Yeah gotcha. I think what’s been somewhat painful is having to pass the exact parameters to each task, rather than passing a
PipelineConfig
that wraps all the parameters in a flow. Has that come up before? For example, if
PipelineConfig
is some dataclass that wraps a bunch of flow parameters and sub-configurations, each task in my flow would be initialized via something like:
Copy code
with Flow(...) as flow:
    config = build_pipeline_config(...)
    my_task = MyTask(config=config)
    my_task2 = MyTask2(config=config)(my_task=my_task)
k
I don’t think that is valid (and I’m not sure what
params.checkpoint_dirname
is). As long as that evaluates correctly, I would personally do a one-liner task for that.
Copy code
snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
checkpoint_dirname = Parameter(name="checkpoint_dirname", default=params.checkpoint_dirname)

checkpoint_dirname = task(lambda x, y: (x or y))(checkpoint_dirname, params.checkpoint_dirname)
Or you can use the
case
task to make an if-else and then assign it inside the Flow
So I have seen that said once, but if
PipelineConfig
is a dictionary, you could pass that whole dictionary as a Parameter, or if it is a file say of S3, you can load it in as a task and the path would be the Parameter.
Actually you might get
PipelineConfig
to work if you use script based storage? It may be a global and you might not need to pass it into tasks, but I think the thing is when you scale out to Dask, everything gets serialized together and send to a worker so I think being explicit about inputs to a task help with mapping. (Not 100% sure)
Pickle based versus script based storage docs