Patrick Düggelin
12/09/2024, 1:53 PMMarvin
12/09/2024, 1:53 PMAttributeError: ‘Flow’ object has no attribute ‘submit’
, suggests that you're trying to use a method (submit
) that doesn't exist in the context you expect.
In Prefect, tasks and sub-flows should be managed using Prefect-specific methods and decorators. To run tasks or sub-flows in parallel, you need to make use of Prefect's orchestration capabilities.
### Here's how you can modify your code:
Instead of trying to use submit
directly on a flow, you should decorate the function you want to run (like download_station
) with the @task
decorator if it's just a regular function. Here's a revised version of your flow setup:
python
from prefect import flow, task, get_run_logger
from prefect.futures import wait_for
@task
def download_station(config):
# Your existing download logic here
pass
@flow(flow_run_name="download-all", version=get_git_commit())
def download_all(config_path: Path | None = None):
logger = get_run_logger()
if config_path is None:
config_path = get_default_config()
<http://logger.info|logger.info>("Starting download with config at %s", config_path)
config = Config.from_yaml(config_path=config_path)
futures = []
for download_config in config.downloads:
# Using submit on tasks instead of flows
futures.append(download_station.submit(download_config))
# Using wait_for instead of just wait
wait_for(futures)
### Key changes:
- Use the @task
decorator for the download_station
function.
- Replace futures.wait()
with wait_for(futures)
to handle the future objects correctly.
Let me know if you have any more questions or run into any more trouble! Oh, the delight of debugging, where every error is just a feature we haven't yet explained to our computers.