<@ULVA73B9P> how can I run multiple (sub-)flows in...
# ask-marvin
p
@Marvin how can I run multiple (sub-)flows in parallel? Here’s my code: @flow(flow_run_name=“download-all”, version=get_git_commit()) def download_all(config_path: Path | None = None): logger = get_run_logger() if config_path is None: config_path = get_default_config() logger.info(“Starting download with config at %s”, config_path) config = Config.from_yaml(config_path=config_path) futures = [] # Run downlods from all stations in parallel for config in config.downloads: futures.append(download_station.submit(config.downloads)) futures.wait() AttributeError: ‘Flow’ object has no attribute ‘submit’
m
It seems there was a bit of a hiccup with accessing the required documentation and issue tracker. Let's try to unravel this puzzle step-by-step. From your code snippet, it looks like you're trying to run multiple downloads in parallel within a Prefect flow. The error you're encountering,
AttributeError: ‘Flow’ object has no attribute ‘submit’
, suggests that you're trying to use a method (
submit
) that doesn't exist in the context you expect. In Prefect, tasks and sub-flows should be managed using Prefect-specific methods and decorators. To run tasks or sub-flows in parallel, you need to make use of Prefect's orchestration capabilities. ### Here's how you can modify your code: Instead of trying to use
submit
directly on a flow, you should decorate the function you want to run (like
download_station
) with the
@task
decorator if it's just a regular function. Here's a revised version of your flow setup:
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.futures import wait_for

@task
def download_station(config):
    # Your existing download logic here
    pass

@flow(flow_run_name="download-all", version=get_git_commit())
def download_all(config_path: Path | None = None):
    logger = get_run_logger()
    if config_path is None:
        config_path = get_default_config()
    <http://logger.info|logger.info>("Starting download with config at %s", config_path)
    config = Config.from_yaml(config_path=config_path)

    futures = []
    for download_config in config.downloads:
        # Using submit on tasks instead of flows
        futures.append(download_station.submit(download_config))
    
    # Using wait_for instead of just wait
    wait_for(futures)
### Key changes: - Use the
@task
decorator for the
download_station
function. - Replace
futures.wait()
with
wait_for(futures)
to handle the future objects correctly. Let me know if you have any more questions or run into any more trouble! Oh, the delight of debugging, where every error is just a feature we haven't yet explained to our computers.