Thread
#prefect-community
    Aiden Price

    Aiden Price

    2 years ago
    Hi Prefecters! I have an odd problem where I just need a bit of clarification. I’m trying to merge the results of two or more branches of execution, each one should return a Pandas Dataframe. So I had made a task that looks like this;
    @task(skip_on_upstream_skip=True)
    def concat(histories: List[pd.Dataframe]):
        return pd.concat(histories)
    But when I call it like
    concatted = concat([task1, task2])
    (note the manually built list from two branches of execution) then nothing happens, the
    concat()
    function is just not called. So trying to be clever I tried this instead;
    @task(skip_on_upstream_skip=True)
    def concat(*args: pd.Dataframe):
        return pd.concat(args)
    But it seems I’m fighting the framework there because I get this error when I try concatted = concat(task1, task2);
    ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
    So is it best to go with the advice in the warning message or try a different way? Say something like;
    @task(skip_on_upstream_skip=True)
    def concat(**kwargs: pd.Dataframe):
        return pd.concat(x for x in kwargs.values())
    Then call with
    concatted = concat({"one": task1, "two": task2})
    Chris White

    Chris White

    2 years ago
    Hi @Aiden Price! Your first pattern should work --> could you describe a little more what you mean by “concat is just not called”? Does the task run end in a
    Skipped
    state? As an FYI for the second pattern, if you choose to go with the
    **kwargs
    approach, you can then call your
    concat
    task like this:
    concatted = concat(one=task1, two=task2)
    Aiden Price

    Aiden Price

    2 years ago
    Hi Chris, from the logs I can’t see any messages like
    Task 'concat': Starting task run...
    and none of the downstream affects (like the database writes) happen.
    Chris White

    Chris White

    2 years ago
    could you run the flow like this and report back the state:
    flow_state = flow.run()
    flow_state.result[concatted]
    Aiden Price

    Aiden Price

    2 years ago
    Will do
    Sigh, of course it decided to skip downstream tasks after a really long execution, see below. At least I can see the
    List
    task which I assume Prefect called automatically, I didn’t notice that last time;
    downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.317867+0000 | INFO | RPM | DV | Task 'List': Starting task run...
    downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.376805+0000 | INFO | RPM | DV | Task 'List': finished task run for task with final state: 'Skipped'
    downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.391894+0000 | INFO | RPM | DV | Task 'concat_history': Starting task run...
    downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.397777+0000 | INFO | RPM | DV | Task 'concat_history': finished task run for task with final state: 'Skipped'
    downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.412296+0000 | INFO | RPM | DV | Task 'load_to_timescale': Starting task run...
    downstream-svc-dv-1585025400-bsswx downstream-svc 2020-03-24T05:35:20.420421+0000 | INFO | RPM | DV | Task 'load_to_timescale': finished task run for task with final state: 'Skipped'
    downstream-svc-dv-1585025400-bsswx downstream-svc <Skipped: "Upstream task was skipped; if this was not the intended behavior, consider changing `skip_on_upstream_skip=False` for this task.">
    Chris White

    Chris White

    2 years ago
    What was the result of
    flow_state.result[concatted]
    ? Based on this I assume it was in a
    Skipped
    state?
    Aiden Price

    Aiden Price

    2 years ago
    Yeah it skipped due to an upstream task finishing in a state where the
    concat
    task wasn’t required.