https://prefect.io logo
#prefect-community
Title
# prefect-community
p

Philip MacMenamin

04/19/2022, 7:41 PM
Strange behaviour with multiple
upstream_tasks
on a ShellTask This works:
Copy code
brt_commands = create_brt_command.map(adoc_fp=updated_adocs)
        brt_commands_logged = log(item=brt_commands, desc="BRT commands")
        brts = shell_task.map(
            command=brt_commands, upstream_tasks=[tomogram_fps]
        )
This fails:
Copy code
brt_commands = create_brt_command.map(adoc_fp=updated_adocs)
        brt_commands_logged = log(item=brt_commands, desc="BRT commands")
        brts = shell_task.map(
            command=brt_commands, upstream_tasks=[tomogram_fps, brt_commands_logged]
        )
discourse 1
k

Kyle McChesney

04/19/2022, 7:47 PM
Not 100% sure what the issue is, but the top example likely works if you add
unmapped(tomogram_fps)
to the upstream_task. Its kind of an interesting edge case between
.map
and
upstream_tasks
n

Nate

04/19/2022, 7:50 PM
can you describe the failure or share logs?
p

Philip MacMenamin

04/19/2022, 7:50 PM
Just to be clear - the top example appears to work already.
Copy code
[2022-04-19 12:56:09-0600] INFO - prefect.TaskRunner | Task 'create_brt_command[0]': Starting task run...
[2022-04-19 12:56:09-0600] INFO - prefect.TaskRunner | Task 'create_brt_command[0]': Finished task run for task with final state: 'Success'
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'log': Starting task run...
[2022-04-19 12:56:10-0600] INFO - prefect.log | BRT commands : ['/opt/rml/imod/bin/batchruntomo -di /gs1/home/macmenaminpe/tmp/tmpovrvi7nb/dirTemplate.adoc -cp 8 -gpu 1']
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'log': Finished task run for task with final state: 'Success'
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'ShellTask': Finished task run for task with final state: 'Failed'
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'log': Starting task run...
[2022-04-19 12:56:10-0600] INFO - prefect.log | updated Adoc files : [PosixPath('/gs1/home/macmenaminpe/tmp/tmpovrvi7nb/dirTemplate.adoc')]
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'log': Finished task run for task with final state: 'Success'
[2022-04-19 12:56:10-0600] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
@Kyle McChesney - this idea of doing
unmapped()
stuff in the upstream_tasks is interesting. I think I have to get my head around that for a second.
k

Kyle McChesney

04/19/2022, 7:54 PM
@Kevin Kho we discussed this at somepoint, do you happen to have the link around
p

Philip MacMenamin

04/19/2022, 7:54 PM
ahh... ok, so the upstream_tasks should not be included within the map
n

Nate

04/19/2022, 7:54 PM
hold on, because if your first example works then I'm wrong, one sec
k

Kyle McChesney

04/19/2022, 7:55 PM
upstream_tasks=[unmnapped(tomogram_fps), unmapped(brt_commands_logged)]
?
upvote 2
does that work? If I recall the issue was that an upstream task parameter needed to get passed to each mapped instance. By specifying unmapped, you ensure that the same one is used.
p

Philip MacMenamin

04/19/2022, 7:56 PM
right - within the map? OK, hold on. I have to get into an annoying VPN and it kills Slack. BRB.
k

Kevin Kho

04/19/2022, 7:59 PM
Kyle is right
Copy code
mapped_task = task_one.map(.., upstream_tasks=[unmnapped(tomogram_fps), unmapped(brt_commands_logged)])
should work if
tomogram_fps
and
brt_commands_logged
are not mapped tasks
Making them mapped will map element 1 of the upstream list to element 1 of the downstream list as a dependency so you get weirdness is the lists are not the same length
p

Philip MacMenamin

04/19/2022, 8:01 PM
can confirm. @Kyle McChesney is correct. Nice
d

Dan Morris

04/25/2022, 8:35 PM
Needed this for my own project so thank you.
4 Views