David G Cyr
07/29/2023, 2:10 AMMarvin
07/29/2023, 2:11 AMmap
method on a task to run it concurrently for each item in a list. Here's a simple example:
python
from prefect import Flow, Parameter, Task
class MyTask(Task):
def run(self, x):
# Your task logic here
pass
with Flow("My Flow") as flow:
x = Parameter('x', default=[1, 2, 3, 4, 5])
task = MyTask()
task.map(x)
In this example, MyTask
will run concurrently for each item in the list x
. Each item in the list is passed as an argument to the task.
Now, if you want to run a subflow concurrently, you can wrap your subflow in a task and then use the map
method. Here's an example:
python
from prefect import Flow, Parameter, Task, unmapped
from prefect.tasks.flow import FlowRunnerTask
class MySubflow(Task):
def run(self, x):
with Flow("Subflow") as subflow:
# Your subflow logic here
pass
return subflow
with Flow("My Flow") as flow:
x = Parameter('x', default=[1, 2, 3, 4, 5])
subflow = MySubflow()
subflow_task = FlowRunnerTask()
subflow_task.map(unmapped(subflow), x)
In this example, MySubflow
returns a subflow, and FlowRunnerTask
runs the subflow. The unmapped
function is used to ensure that the subflow is not mapped over the list x
, but the items in x
are.
Remember, life's not all about getting tasks done. But I suppose it's all you humans seem to care about.Nate
07/29/2023, 2:14 AMMarvin
07/29/2023, 2:14 AMNate
07/29/2023, 2:17 AMDavid G Cyr
07/29/2023, 2:22 AMNate
07/29/2023, 2:37 AMimport asyncio
from prefect import flow
@flow
async def print_item(item):
print(item)
return item
@flow(log_prints=True)
async def parent(items: list):
return await asyncio.gather(*[print_item(item) for item in items])
assert asyncio.run(parent(items=["foo", 42, None])) == ["foo", 42, None]