Thread
#prefect-community
    Johnny Bravo

    Johnny Bravo

    2 years ago
    Hello. Noob question here. I'm trying to download a couple of files from a website and want to do the jobs in parallel and limit the workers (so I can download max 2 files at a time). I've figured out how to limit the download files with this run
    flow.run(executor=DaskExecutor(
            cluster_class=LocalCluster, cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}))
    Not sure if is the right way to do. Now I have this flow
    with Flow("Files downloader") as flow:
            files = get_files()
            downloaded_files = download_file.map(files)
            import_file.map(downloaded_files)
    The problem here is, after first two downloads, it goes to the next download task, instead of getting to import task. So, because I'm limited to 2 workers at a time, I need to prioritize
    import_file
    task over
    download_file
    task. Is there a better way to do this?
    Chris White

    Chris White

    2 years ago
    The simplest way would be to add in a dummy task in between the import map and the download map that depends on the import and has a downstream dependency of the download file. This way, the download file task can’t begin until this “blocker” task succeeds, which is dependent on both imports succeeding
    Johnny Bravo

    Johnny Bravo

    2 years ago
    I don't get about dummy task. Can you give an example please? That will be very helpful!
    Chris White

    Chris White

    2 years ago
    Sure; there might be a more elegant way of achieving this but this is the idea:
    from prefect import Task, Flow
    
        blocker = Task(name="blocker")
    
        with Flow("Files downloader") as flow:
            files = get_files()
            downloaded_files = download_file.map(files)
            blocker.set_upstream(downloaded_files)
            import_file.map(downloaded_files, upstream_tasks=[blocker])
    Because
    blocker
    depends on
    download_files
    , it won’t run until both mapped tasks are complete. And because
    import_file
    depends on
    blocker
    , it won’t run until after both files are complete as well
    Johnny Bravo

    Johnny Bravo

    2 years ago
    I've tried your approach but still getting download tasks running first. I'm running without parallelization with just
    flow.run()
    and get this
    [2020-08-29 21:11:54] INFO - prefect.FlowRunner | Beginning Flow run for 'Files downloader'
    [2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'get_files': Starting task run...
    [2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'get_files': finished task run for task with final state: 'Success'
    [2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'download_file': Starting task run...
    [2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'download_file': finished task run for task with final state: 'Mapped'
    [2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'download_file[0]': Starting task run...
    Download file 0. Sleeping for 10s
    [2020-08-29 21:12:04] INFO - prefect.TaskRunner | Task 'download_file[0]': finished task run for task with final state: 'Success'
    [2020-08-29 21:12:04] INFO - prefect.TaskRunner | Task 'download_file[1]': Starting task run...
    Download file 1. Sleeping for 15s
    [2020-08-29 21:12:19] INFO - prefect.TaskRunner | Task 'download_file[1]': finished task run for task with final state: 'Success'
    [2020-08-29 21:12:19] INFO - prefect.TaskRunner | Task 'download_file[2]': Starting task run...
    Download file 2. Sleeping for 5s
    [2020-08-29 21:12:24] INFO - prefect.TaskRunner | Task 'download_file[2]': finished task run for task with final state: 'Success'
    [2020-08-29 21:12:24] INFO - prefect.TaskRunner | Task 'download_file[3]': Starting task run...
    Download file 3. Sleeping for 7s
    So you can see, it runs download_file[0] -> download_file[1] -> etc...
    I'm trying to do a mapping of two tasks but I expect to get download_file[0] -> import_file[0] -> download_file[1] -> import_file[1] -> etc..
    from prefect import Flow, Task, task
    from prefect.engine.executors import DaskExecutor
    from distributed.deploy.local import LocalCluster
    from random import randint
    from time import sleep
    
    
    @task
    def get_files():
        return list(range(10))
    
    
    @task
    def download_file(file):
        timeout = randint(5, 15)
        print(f"Download file {file}. Sleeping for {timeout}s")
    
        sleep(timeout)
    
        return file
    
    
    @task
    def import_file(file):
        print(f"Importing file {file}")
    
    
    def main():
        blocker = Task(name="blocker")
    
        with Flow("Files downloader") as flow:
            files = get_files()
            downloaded_files = download_file.map(files)
            blocker.set_upstream(downloaded_files)
            import_file.map(downloaded_files, upstream_tasks=[blocker])
    
        # flow.run(executor=DaskExecutor(
            # cluster_class=LocalCluster, cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}))
        # flow.run(executor=DaskExecutor())
        flow.run()
    
    
    if __name__ == "__main__":
        main()
    Chris White

    Chris White

    2 years ago
    Oh apologies I misunderstood your question; what version of Prefect are you running?
    Johnny Bravo

    Johnny Bravo

    2 years ago
    Latest I guess
    Requirement already satisfied: prefect in ./.venv/lib/python3.8/site-packages (0.13.4)
    Chris White

    Chris White

    2 years ago
    Interesting I’m surprised that’s not already behaving as you expect; are the download tasks very fast?
    Johnny Bravo

    Johnny Bravo

    2 years ago
    No, I have random sleeps from 5 to 10 seconds. See the source above
    Chris White

    Chris White

    2 years ago
    Interesting yea; ultimately when using mapping, Dask is provided the task graph and makes the decisions as to when to run each piece of the graph, and for some reason Dask is choosing to proceed in a breadth first manner here. If you need a strict guarantee about execution order, you should use a static DAG instead of a dynamic pipeline. E.g., explicitly make task copies corresponding to each filename. You might also consider combining the download + import steps into a single task
    Johnny Bravo

    Johnny Bravo

    2 years ago
    Thanks! The last one seems to be a better solution for now