Aiden Price
03/24/2020, 2:30 AM@task(skip_on_upstream_skip=True)
def concat(histories: List[pd.Dataframe]):
return pd.concat(histories)
But when I call it like concatted = concat([task1, task2])
(note the manually built list from two branches of execution) then nothing happens, the concat()
function is just not called.
So trying to be clever I tried this instead;
@task(skip_on_upstream_skip=True)
def concat(*args: pd.Dataframe):
return pd.concat(args)
But it seems I’m fighting the framework there because I get this error when I try `concatted = concat(task1, task2)`;
ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
So is it best to go with the advice in the warning message or try a different way?
Say something like;
@task(skip_on_upstream_skip=True)
def concat(**kwargs: pd.Dataframe):
return pd.concat(x for x in kwargs.values())
Then call with concatted = concat({"one": task1, "two": task2})
Chris White
03/24/2020, 2:34 AMSkipped
state?
As an FYI for the second pattern, if you choose to go with the **kwargs
approach, you can then call your concat
task like this:
concatted = concat(one=task1, two=task2)
Aiden Price
03/24/2020, 2:37 AMTask 'concat': Starting task run...
and none of the downstream affects (like the database writes) happen.Chris White
03/24/2020, 2:38 AMflow_state = flow.run()
flow_state.result[concatted]
Aiden Price
03/24/2020, 2:39 AMList
task which I assume Prefect called automatically, I didn’t notice that last time;
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.317867+0000 | INFO | RPM | DV | Task 'List': Starting task run...
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.376805+0000 | INFO | RPM | DV | Task 'List': finished task run for task with final state: 'Skipped'
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.391894+0000 | INFO | RPM | DV | Task 'concat_history': Starting task run...
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.397777+0000 | INFO | RPM | DV | Task 'concat_history': finished task run for task with final state: 'Skipped'
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.412296+0000 | INFO | RPM | DV | Task 'load_to_timescale': Starting task run...
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.420421+0000 | INFO | RPM | DV | Task 'load_to_timescale': finished task run for task with final state: 'Skipped'
downstream-svc-dv-1585025400-bsswx downstream-svc <Skipped: "Upstream task was skipped; if this was not the intended behavior, consider changing `skip_on_upstream_skip=False` for this task.">
Chris White
03/24/2020, 1:55 PMflow_state.result[concatted]
? Based on this I assume it was in a Skipped
state?Aiden Price
03/24/2020, 10:19 PMconcat
task wasn’t required.