I have: - task1 mapped over list1 and task2 mapped...
# ask-community
n
I have: • task1 mapped over list1 and task2 mapped over list2 • task2 isn't mapped over the output of task1 • I don't want task2 to start until after task1 has completed. I tried adding
upstream_tasks=[task1]
to task2. My gut told me this probably wasn't going to do what I wanted, but it was worth a try. My gut was right as task2 only process the number of elements in list2 that are in list1.
Copy code
import prefect
from prefect import Flow, task


@task
def mapped(value: int) -> int:
    prefect.context.get("logger").info(f"{value}")
    return value


if __name__ == "__main__":
    with Flow("mapped_upstream") as flow:
        list1 = [1, 2]
        task1 = mapped.map(value=list1)

        list2 = [10, 20, 30]
        task2 = mapped.map(value=list2, upstream_tasks=[task1])
    flow.run()
What is the correct way to get mapped task2 to have an upstream dependency on task1? [2021-04-28 143831-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped_upstream' [2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run... [2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped' [2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run... [2021-04-28 143831-0600] INFO - prefect.mapped[0] | 1 [2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success' [2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run... [2021-04-28 143831-0600] INFO - prefect.mapped[1] | 2 [2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success' [2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run... [2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped' [2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run... [2021-04-28 143832-0600] INFO - prefect.mapped[0] | 10 [2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success' [2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run... [2021-04-28 143832-0600] INFO - prefect.mapped[1] | 20 [2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success' [2021-04-28 143832-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
k
Hi @Nathan Atkins! Just a friendly reminder if we can move code blocks and logs to the thread to keep the main channel for compact. I know what you are saying. The issue here is that consecutive mapping in designing to perform iteratively where each downstream task is chained. In order to get this to work, you probably need a reduce step to workaround like this:
Copy code
import prefect
from prefect import Flow, task
@task
def mapped(value: int) -> int:
    prefect.context.get("logger").info(f"{value}")
    return value

@task
def reduced(x):
    return "out"

if __name__ == "__main__":
    with Flow("mapped_upstream") as flow:
        list1 = [1, 2]
        task1 = mapped.map(value=list1)
        reduced1 = reduced(task1)
        list2 = [10, 20, 30]
        task2 = mapped.map(value=list2, upstream_tasks=[reduced1])
    flow.run()
n
👍 on moving code and logs. I've been away for a bit, so hadn't really picked up on that best practice. I tried a reduce/gather task earlier. Let me give it another try now that I see what is going on.
k
No worries at all 🙂
n
This is what I had tried.
Copy code
import prefect
from prefect import Flow, task


@task
def mapped(value: int) -> int:
    prefect.context.get("logger").info(f"{value}")
    return value

@task
def gather(_):
    return True


if __name__ == "__main__":
    with Flow("mapped_upstream") as flow:
        list1 = [1, 2]
        task1 = mapped.map(value=list1)

        gather1 = gather(task1)

        list2 = [10, 20, 30]
        task2 = mapped.map(value=list2, upstream_tasks=[gather1])
    flow.run()
task2 mapped fails. Not sure how to easily figure out what is going wrong here. [2021-04-28 151637-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped_upstream' [2021-04-28 151637-0600] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor [2021-04-28 151637-0600] DEBUG - prefect.FlowRunner | Flow 'mapped_upstream': Handling state change from Scheduled to Running [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run... [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped': Handling state change from Pending to Mapped [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped' [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run... [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[0]': Handling state change from Pending to Running [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[0]': Calling task.run() method... [2021-04-28 151637-0600] INFO - prefect.mapped[0] | 1 [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[0]': Handling state change from Running to Success [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success' [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run... [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[1]': Handling state change from Pending to Running [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[1]': Calling task.run() method... [2021-04-28 151637-0600] INFO - prefect.mapped[1] | 2 [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[1]': Handling state change from Running to Success [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success' [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'gather': Starting task run... [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'gather': Handling state change from Pending to Running [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'gather': Calling task.run() method... [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'gather': Handling state change from Running to Success [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'gather': Finished task run for task with final state: 'Success' [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run... [2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped': Handling state change from Pending to Failed [2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Failed' [2021-04-28 151637-0600] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed. [2021-04-28 151637-0600] DEBUG - prefect.FlowRunner | Flow 'mapped_upstream': Handling state change from Running to Failed
k
Oh actually my example doesn’t work well. It only worked because it seems that passing a list with more elements helps in this case (and the string I gave was being treated as an Iterable)
Yeah I guess the mapped task doesn’t like a directly upstream reduced task. I believe this is a more accurate example:
Copy code
import prefect
from prefect import Flow, task

@task
def mapped1(value: int) -> int:
    prefect.context.get("logger").info(f"{value}")
    return value

@task
def reduced1(x):
    return sum(x)

@task
def get_values2():
    return [1,2,3]

with Flow("mapped_upstream") as flow:
        list1 = [1,2]
        task1 = mapped1.map(list1)
        reduc = reduced1(task1)
        list2 = get_values2(upstream_tasks=[reduc])
        task2 = mapped1.map(list2)

flow.run()
n
That does the trick! Not intuitive from a "how-to" or "error message" standpoint, but glad to have a way to make it work. Thanks!
While we are on the topic of mapped flows. When we have a mapped task that does consume the results of a previous mapped task they will be run breadth first. Is there a way to make this depth first or have the mapped task run a "sub flow"? I blow dask up if I have a large map and all the intermediate values hang around until the next task in the flow starts working on the mapped results.
k
It will be depth first execution by default, but it may sometimes pick up tasks along the same breadth. There is no way to coerce it to purely prioritize depth first execution.
You could use a “subflow” with the
StartFlowRun
task so it gets called all in one go.