https://prefect.io logo
Title
s

Sanjay Patel

04/29/2020, 1:08 AM
Hi, can I please get some assistance with my flow runs. I have a process where I need to run a series of mapped tasks. This first section of mapped tasks all need to complete before running my final 'summarize' task. My problem is that I'm writing results from the mapped tasks back to a database and then the summarize task reads from that database. There's no dependency that can be deciphered from the flow code. Should I be restructuring my tasks, or have a dummy variable as output from the last mapped task feeding into the final summarize task or are there other simple things that I am overlooking. Example code is here:
simids = [1, 2, 3]
with Flow('example') as flow:
    
x = task1.map(simids)
    
y = task2.map(x)
    
z = task3.map(x, y)
    
#start a DB session
    
write_session = create_db_session.map()
    
task_write_to_db.map(write_session, x , z)
      
#summarize task - after completion of all mapped results - this part executes too early before all mapped tasks are complete
    
read_session = create_db_session()
    
PostProcessResults()
n

nicholas

04/29/2020, 1:15 AM
Hi @Sanjay Patel! You don't need any dummy output but can instead set the upstream dependencies explicitly in your summary task like such:
simids = [1, 2, 3]
with Flow('example') as flow:
    x = task1.map(simids)
    y = task2.map(x)
    z = task3.map(x, y)
    #start a DB session
    write_session = create_db_session.map()
    task_write_to_db.map(write_session, x , z) 
    #summarize task - after completion of all mapped results - this part executes too early before all mapped tasks are complete
    read_session = create_db_session()
    PostProcessResults(upstream_tasks=[x, y, z])
:upvote: 1
👍 1
You can read more about task args and dependencies here 😄
s

Sanjay Patel

04/29/2020, 2:18 AM
thank you so much for the quick response! that makes sense and works as expected.
n

nicholas

04/29/2020, 2:27 AM
Great to hear! 😄