Jack Roper
07/31/2024, 1:35 PMMarvin
07/31/2024, 1:35 PMwait_for
parameter explicitly. Prefect recognizes the upstream dependencies either automatically or manually. Here's a simple guide:
1. Automatic Dependencies: When a result from a task is used as the input for another task, Prefect automatically recognizes this dependency.
python
from prefect import flow, task
@flow
async def async_flow():
result = await upstream_task()
await downstream_task(result)
@task
async def upstream_task():
return "Hello, upstream!"
@task
async def downstream_task(data):
print(data)
2. Manual Dependencies: You can manually specify the upstream dependencies using the wait_for
parameter.
python
from prefect import flow, task
@flow
async def async_flow():
upstream_result = await upstream_task.submit()
await downstream_task.submit(wait_for=[upstream_result])
@task
async def upstream_task():
return "Hello again, upstream!"
@task
async def downstream_task():
print("Hello again, downstream!")
For more detailed information, you can refer to the Prefect documentation on specifying upstream dependencies.
If you encounter any issues while implementing this, here are some previously discussed GitHub issues that might be relevant:
- Flow could not be retrieved from deployment with `s3fs==2023.3.0`
- Timeout Error when mapping with the `DaskKubernetesEnvironment`
Feel free to reach out if you have any other questions.