https://prefect.io logo
Title
a

Aiden Price

03/24/2020, 2:30 AM
Hi Prefecters! I have an odd problem where I just need a bit of clarification. I’m trying to merge the results of two or more branches of execution, each one should return a Pandas Dataframe. So I had made a task that looks like this;
@task(skip_on_upstream_skip=True)
def concat(histories: List[pd.Dataframe]):
    return pd.concat(histories)
But when I call it like
concatted = concat([task1, task2])
(note the manually built list from two branches of execution) then nothing happens, the
concat()
function is just not called. So trying to be clever I tried this instead;
@task(skip_on_upstream_skip=True)
def concat(*args: pd.Dataframe):
    return pd.concat(args)
But it seems I’m fighting the framework there because I get this error when I try `concatted = concat(task1, task2)`;
ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
So is it best to go with the advice in the warning message or try a different way? Say something like;
@task(skip_on_upstream_skip=True)
def concat(**kwargs: pd.Dataframe):
    return pd.concat(x for x in kwargs.values())
Then call with
concatted = concat({"one": task1, "two": task2})
c

Chris White

03/24/2020, 2:34 AM
Hi @Aiden Price! Your first pattern should work --> could you describe a little more what you mean by “concat is just not called”? Does the task run end in a
Skipped
state? As an FYI for the second pattern, if you choose to go with the
**kwargs
approach, you can then call your
concat
task like this:
concatted = concat(one=task1, two=task2)
a

Aiden Price

03/24/2020, 2:37 AM
Hi Chris, from the logs I can’t see any messages like
Task 'concat': Starting task run...
and none of the downstream affects (like the database writes) happen.
c

Chris White

03/24/2020, 2:38 AM
could you run the flow like this and report back the state:
flow_state = flow.run()
flow_state.result[concatted]
a

Aiden Price

03/24/2020, 2:39 AM
Will do
Sigh, of course it decided to skip downstream tasks after a really long execution, see below. At least I can see the
List
task which I assume Prefect called automatically, I didn’t notice that last time;
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.317867+0000 | INFO | RPM | DV | Task 'List': Starting task run...
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.376805+0000 | INFO | RPM | DV | Task 'List': finished task run for task with final state: 'Skipped'
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.391894+0000 | INFO | RPM | DV | Task 'concat_history': Starting task run...
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.397777+0000 | INFO | RPM | DV | Task 'concat_history': finished task run for task with final state: 'Skipped'
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.412296+0000 | INFO | RPM | DV | Task 'load_to_timescale': Starting task run...
downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.420421+0000 | INFO | RPM | DV | Task 'load_to_timescale': finished task run for task with final state: 'Skipped'
downstream-svc-dv-1585025400-bsswx downstream-svc <Skipped: "Upstream task was skipped; if this was not the intended behavior, consider changing `skip_on_upstream_skip=False` for this task.">
c

Chris White

03/24/2020, 1:55 PM
What was the result of
flow_state.result[concatted]
? Based on this I assume it was in a
Skipped
state?
a

Aiden Price

03/24/2020, 10:19 PM
Yeah it skipped due to an upstream task finishing in a state where the
concat
task wasn’t required.