Is the code below the best way to only pass a port...
# prefect-getting-started
b
Is the code below the best way to only pass a portion of a result to the next task? The only other way I thought up is passing the whole future to the task and then just taking a subset of it in the task, but I really don't want to do that. Example adapted from https://docs-3.prefect.io/3.0rc/develop/task-runners#access-results-from-submitted-tasks:
Copy code
@flow(name="hello-flow")
def hello_world():
    future = say_hello.submit("Marvin")
    # in this case the task would return {"key": "Marvin"} instead of the original.
    print_result.submit(future.result()["key"])
n
hi @Ben I think the short answer to this
Is the code below the best way to only pass a portion of a result to the next task?
is that you can't, you'd have to resolve the whole future and pass whatever portion of it on you care about but I'd be curious why you want to do that, ie why doesn't your
say_hello
task just
return
the part that you want to pass to the next task? perhaps there's a more convenient way to design your task
worth noting also that if you don't need concurrent work, you may not need
.submit
or
.result()
and you could just do
Copy code
@flow(name="hello-flow")
def hello_world():
    result = say_hello("Marvin")
    # in this case the task would return {"key": "Marvin"} instead of the original.
    print_result(result["key"])
b
Thanks. Yeah, the real case is I have a complicated bioinformatics pipeline. Sometimes the same task will return say:
Copy code
{'result1': link_to_side_effect_file1, 'result2' link_to_side_effect_file2}
But then that result feeds into 2 different tasks one of which uses result1, the other result2. I'm not completely certain I've got the best design. I've used luigi heavily in the past so there is the temptation to try to make Prefect look like luigi.
Toy example showing what I need to do, yeah it works great without parallel, but there are parts in there that need parallel:
Copy code
@task
def task1():
    return {'result1': 'thing for task2',
            'result2': 'thing for task3'}


@task
def task2(input_value):
    return f'task2: {input_value}'


@task
def task3(input_value):
    return f'task3: {input_value}'


@flow
async def test():
    task1_result = task1()
    task2_result = task2(task1_result['result1'])
    task3_result = task3(task1_result['result2'])
    print([task1_result, task2_result, task3_result])
n
so here, you'd prefer task2 and task3 to run concurrently?
b
Yeah. The DAG for this type of bioinformatics pipeline can get very complex. Lots of branching, grabbing things from multiple steps etc. Sending results from one task to multiple. No one section has everything, but by the time you have a reasonably complex bioinformatics pipeline there will be examples of just about everything that can be represented by a DAG.
n
you can do that like this
Copy code
@flow
def test():
    task1_result = task1()
    task2_future = task2.submit(task1_result["result1"])
    task3_future = task3.submit(task1_result["result2"])
    print([task1_result, task2_future.result(), task3_future.result()])
in prefect you're free to put these
.submit
or
.map
calls inside
if
/
else
branches etc because we build the graph as your code runs, ie you only get a node in your "DAG" once you call / submit the task
b
I think I understand reasonably well how to do what I want to do. It just works a bit differently. Thank you.
n
no problem! feel free to pop back in here if you have any questions