Danny Vilela

    Danny Vilela

    1 year ago
    Hi! If we want to have a parameter within a flow like
    snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
    and pass that into another (class-based) task, how should we do that? Should we pass it to the task initialization (e.g.,
    my_task = MyTask(snapshot_date=snapshot_date)
    ) or task call (i.e.,
    my_task(snapshot_date=snapshot_date)
    )? This is assuming that
    MyTask
    uses
    snapshot_date
    within its
    run
    method.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Danny Vilela, is snapshot date something like a watermark for “lsat processed time”? Are you aware we have the KV Store where you can persist this kind of stuff? So in the task you would retrieve it from there instead of using parameters
    Danny Vilela

    Danny Vilela

    1 year ago
    Hey @Kevin Kho! Not quite. You can think of the “flow” here as some ETL that writes to a table. Imagine that table has a
    snapshot_date
    column that serves as some partition. Hence, I need some way to know when the flow started running, the same way the weekly interval scheduler knows when to run the flow…does that make sense?
    Kevin Kho

    Kevin Kho

    1 year ago
    Gotcha. In my opinion just pass it during the run method right? There is one caveat here though. If you use a storage that serializes your Flow, snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())will serialize the datetime today and hardcode the default for future runs. This will work as intended if you use script-based storage.
    If you use a pickle based storage, you need the parameter to be something static like
    "today"
    and then pass that to an intermediate task that computes
    dt.datetime.today()
    for deferred execution
    You can pass it in the init like
    my_task = MyTask(snapshot_date=snapshot_date)
    if this exists in your flow for example:
    with Flow(...) as flow:
         snapshot_date = Parameter("snapshot_date", ...)
         my_task = MyTask(snapshot_date=snapshot_date)()
    Danny Vilela

    Danny Vilela

    1 year ago
    Gotcha! Another question: let’s say I have some other parameter
    checkpoint_dirname
    that can be undefined or defined based on some parameter dataclass
    params.checkpoint_dirname
    . If undefined, I’d want that parameter to be
    snapshot_date
    . Is something like this valid within a
    Flow
    context?
    snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
    checkpoint_dirname = Parameter(name="checkpoint_dirname", default=params.checkpoint_dirname or snapshot_date)
    Kevin Kho

    Kevin Kho

    1 year ago
    The first
    ()
    is the init. The second one is the run…so at this point it doesn’t really matter right? You can use the
    run
    . What you can’t do is:
    my_task = MyTask(snapshot_date=snapshot_date)()
    with Flow(...) as flow:
         snapshot_date = Parameter("snapshot_date", ...)
    Danny Vilela

    Danny Vilela

    1 year ago
    Yeah gotcha. I think what’s been somewhat painful is having to pass the exact parameters to each task, rather than passing a
    PipelineConfig
    that wraps all the parameters in a flow. Has that come up before? For example, if
    PipelineConfig
    is some dataclass that wraps a bunch of flow parameters and sub-configurations, each task in my flow would be initialized via something like:
    with Flow(...) as flow:
        config = build_pipeline_config(...)
        my_task = MyTask(config=config)
        my_task2 = MyTask2(config=config)(my_task=my_task)
    Kevin Kho

    Kevin Kho

    1 year ago
    I don’t think that is valid (and I’m not sure what
    params.checkpoint_dirname
    is). As long as that evaluates correctly, I would personally do a one-liner task for that.
    snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
    checkpoint_dirname = Parameter(name="checkpoint_dirname", default=params.checkpoint_dirname)
    
    checkpoint_dirname = task(lambda x, y: (x or y))(checkpoint_dirname, params.checkpoint_dirname)
    Or you can use the
    case
    task to make an if-else and then assign it inside the Flow
    So I have seen that said once, but if
    PipelineConfig
    is a dictionary, you could pass that whole dictionary as a Parameter, or if it is a file say of S3, you can load it in as a task and the path would be the Parameter.
    Actually you might get
    PipelineConfig
    to work if you use script based storage? It may be a global and you might not need to pass it into tasks, but I think the thing is when you scale out to Dask, everything gets serialized together and send to a worker so I think being explicit about inputs to a task help with mapping. (Not 100% sure)
    Pickle based versus script based storage docs