Hello, i'm new to prefect and i feel it very easy ...
# prefect-getting-started
s
Hello, i'm new to prefect and i feel it very easy to use 🙂. I'm launching several flows in parallel, and inside each of them the tasks are sequential, and it works. However, i don't succeed to make the overall task parallelism to more 2 tasks at same time as you can see in the attached image. Is there something i missed to do ?
Copy code
@task(task_run_name="task-{id}-1")
def task_1(id: str):
    time.sleep(1)

@task(task_run_name="task-{id}-2")
def task_2(id: str):
    time.sleep(1)

@flow(task_runner=ConcurrentTaskRunner(),
      flow_run_name="my_flow-{name}")
async def my_flow(name: str):
    return(task_1.submit(name).wait(), task_2.submit(name))

@flow(task_runner=DaskTaskRunner())
async def master_flow():
    futures = []

    for name in ['A', 'B', 'C', 'D', 'E']:
        futures.append(my_flow(name))

    await asyncio.gather(*futures)

if __name__ == "__main__":
    asyncio.run(master_flow())
I hope you can help me.
✅ 1
j
hey you're blocking for task1 to return with
.wait()
in
return(task_1.submit(name).wait(), task_2.submit(name))
If I run your code without it I get:
s
Yes, i already tried this, but i want the tasks inside each flow to be sequential. Nevertheless, what you tried shows that more than 2 tasks can run at the same time.
j
ah I see my mistake. I think what you're looking to do is make your tasks async and then call them sequentially
Copy code
@task(task_run_name="task-{id}-1")
async def task_1(id: str):
    time.sleep(1)


@task(task_run_name="task-{id}-2")
async def task_2(id: str):
    time.sleep(1)


@flow(task_runner=ConcurrentTaskRunner(), flow_run_name="my_flow-{name}")
async def my_flow(name: str):
    await task_1(name)
    await task_2(name)
s
Sorry, i already had this behavior, but as you can see, most of the second tasks are running with an arbitrary delay. Maybe you can explain me the reason.
j
Okay to be honest I don't understand the arbitrary delay in when only calling/waiting the flows. To do what you want you can accomplish it with something like this:
Copy code
@task(task_run_name="task-{name}-1")
def task_1(name: str):
    time.sleep(1)


@task(task_run_name="task-{name}-2")
def task_2(name: str):
    time.sleep(1)


@flow(flow_run_name="my_flow-{name}")
async def sub_flow(name: str):
    task1_future = task_1.submit(name=name)
    task_2.submit(name=name, wait_for=[task1_future])
✅ 1
will file an issue for the delay
s
P*er*fect ! Thank you so much, exactly what i want ! Now, i have to deal with the result of task1 which task2 needs 🙂
Ok, i don't know if it's a legit way, but i succeeded to make it working, giving future of task1 to task2 argument (i'm discovering 🙂 it's easy, but not well documented i think) :
Copy code
@task(task_run_name="task-{id}-2")
def task_2(id: str, task1_result):
    time.sleep(1)
    print(task1_result)

@flow
async def my_flow(name: str):
    task1_future = task_1.submit(name)
    return(task_2.submit(name, task1_future, wait_for=[task1_future]))
Thank you again for your time, i started to have a headache 😄