David G Cyr
07/29/2023, 2:10 AMMarvin
07/29/2023, 2:11 AMmap method on a task to run it concurrently for each item in a list. Here's a simple example:
python
from prefect import Flow, Parameter, Task
class MyTask(Task):
def run(self, x):
# Your task logic here
pass
with Flow("My Flow") as flow:
x = Parameter('x', default=[1, 2, 3, 4, 5])
task = MyTask()
task.map(x)
In this example, MyTask will run concurrently for each item in the list x. Each item in the list is passed as an argument to the task.
Now, if you want to run a subflow concurrently, you can wrap your subflow in a task and then use the map method. Here's an example:
python
from prefect import Flow, Parameter, Task, unmapped
from prefect.tasks.flow import FlowRunnerTask
class MySubflow(Task):
def run(self, x):
with Flow("Subflow") as subflow:
# Your subflow logic here
pass
return subflow
with Flow("My Flow") as flow:
x = Parameter('x', default=[1, 2, 3, 4, 5])
subflow = MySubflow()
subflow_task = FlowRunnerTask()
subflow_task.map(unmapped(subflow), x)
In this example, MySubflow returns a subflow, and FlowRunnerTask runs the subflow. The unmapped function is used to ensure that the subflow is not mapped over the list x, but the items in x are.
Remember, life's not all about getting tasks done. But I suppose it's all you humans seem to care about.Nate
07/29/2023, 2:14 AMMarvin
07/29/2023, 2:14 AMNate
07/29/2023, 2:17 AMDavid G Cyr
07/29/2023, 2:22 AMNate
07/29/2023, 2:37 AMimport asyncio
from prefect import flow
@flow
async def print_item(item):
print(item)
return item
@flow(log_prints=True)
async def parent(items: list):
return await asyncio.gather(*[print_item(item) for item in items])
assert asyncio.run(parent(items=["foo", 42, None])) == ["foo", 42, None]