Nathan Atkins
04/28/2021, 8:43 PMupstream_tasks=[task1]
to task2. My gut told me this probably wasn't going to do what I wanted, but it was worth a try. My gut was right as task2 only process the number of elements in list2 that are in list1.
import prefect
from prefect import Flow, task
@task
def mapped(value: int) -> int:
prefect.context.get("logger").info(f"{value}")
return value
if __name__ == "__main__":
with Flow("mapped_upstream") as flow:
list1 = [1, 2]
task1 = mapped.map(value=list1)
list2 = [10, 20, 30]
task2 = mapped.map(value=list2, upstream_tasks=[task1])
flow.run()
What is the correct way to get mapped task2 to have an upstream dependency on task1?
[2021-04-28 143831-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped_upstream'
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run...
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped'
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run...
[2021-04-28 143831-0600] INFO - prefect.mapped[0] | 1
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success'
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run...
[2021-04-28 143831-0600] INFO - prefect.mapped[1] | 2
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success'
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run...
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped'
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run...
[2021-04-28 143832-0600] INFO - prefect.mapped[0] | 10
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success'
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run...
[2021-04-28 143832-0600] INFO - prefect.mapped[1] | 20
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success'
[2021-04-28 143832-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeededKevin Kho
import prefect
from prefect import Flow, task
@task
def mapped(value: int) -> int:
prefect.context.get("logger").info(f"{value}")
return value
@task
def reduced(x):
return "out"
if __name__ == "__main__":
with Flow("mapped_upstream") as flow:
list1 = [1, 2]
task1 = mapped.map(value=list1)
reduced1 = reduced(task1)
list2 = [10, 20, 30]
task2 = mapped.map(value=list2, upstream_tasks=[reduced1])
flow.run()
Nathan Atkins
04/28/2021, 9:11 PMKevin Kho
Nathan Atkins
04/28/2021, 9:17 PMimport prefect
from prefect import Flow, task
@task
def mapped(value: int) -> int:
prefect.context.get("logger").info(f"{value}")
return value
@task
def gather(_):
return True
if __name__ == "__main__":
with Flow("mapped_upstream") as flow:
list1 = [1, 2]
task1 = mapped.map(value=list1)
gather1 = gather(task1)
list2 = [10, 20, 30]
task2 = mapped.map(value=list2, upstream_tasks=[gather1])
flow.run()
task2 mapped fails. Not sure how to easily figure out what is going wrong here.
[2021-04-28 151637-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped_upstream'
[2021-04-28 151637-0600] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-04-28 151637-0600] DEBUG - prefect.FlowRunner | Flow 'mapped_upstream': Handling state change from Scheduled to Running
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run...
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped': Handling state change from Pending to Mapped
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped'
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run...
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[0]': Handling state change from Pending to Running
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[0]': Calling task.run() method...
[2021-04-28 151637-0600] INFO - prefect.mapped[0] | 1
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[0]': Handling state change from Running to Success
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success'
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run...
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[1]': Handling state change from Pending to Running
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[1]': Calling task.run() method...
[2021-04-28 151637-0600] INFO - prefect.mapped[1] | 2
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped[1]': Handling state change from Running to Success
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success'
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'gather': Starting task run...
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'gather': Handling state change from Pending to Running
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'gather': Calling task.run() method...
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'gather': Handling state change from Running to Success
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'gather': Finished task run for task with final state: 'Success'
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run...
[2021-04-28 151637-0600] DEBUG - prefect.TaskRunner | Task 'mapped': Handling state change from Pending to Failed
[2021-04-28 151637-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Failed'
[2021-04-28 151637-0600] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2021-04-28 151637-0600] DEBUG - prefect.FlowRunner | Flow 'mapped_upstream': Handling state change from Running to FailedKevin Kho
Kevin Kho
import prefect
from prefect import Flow, task
@task
def mapped1(value: int) -> int:
prefect.context.get("logger").info(f"{value}")
return value
@task
def reduced1(x):
return sum(x)
@task
def get_values2():
return [1,2,3]
with Flow("mapped_upstream") as flow:
list1 = [1,2]
task1 = mapped1.map(list1)
reduc = reduced1(task1)
list2 = get_values2(upstream_tasks=[reduc])
task2 = mapped1.map(list2)
flow.run()
Nathan Atkins
04/28/2021, 10:00 PMNathan Atkins
04/28/2021, 10:27 PMKevin Kho
Kevin Kho
StartFlowRun
task so it gets called all in one go.