Hello. Noob question here. I'm trying to download ...
# prefect-community
j
Hello. Noob question here. I'm trying to download a couple of files from a website and want to do the jobs in parallel and limit the workers (so I can download max 2 files at a time). I've figured out how to limit the download files with this run
Copy code
flow.run(executor=DaskExecutor(
        cluster_class=LocalCluster, cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}))
Not sure if is the right way to do. Now I have this flow
Copy code
with Flow("Files downloader") as flow:
        files = get_files()
        downloaded_files = download_file.map(files)
        import_file.map(downloaded_files)
The problem here is, after first two downloads, it goes to the next download task, instead of getting to import task. So, because I'm limited to 2 workers at a time, I need to prioritize
import_file
task over
download_file
task. Is there a better way to do this?
c
The simplest way would be to add in a dummy task in between the import map and the download map that depends on the import and has a downstream dependency of the download file. This way, the download file task can’t begin until this “blocker” task succeeds, which is dependent on both imports succeeding
j
I don't get about dummy task. Can you give an example please? That will be very helpful!
c
Sure; there might be a more elegant way of achieving this but this is the idea:
Copy code
from prefect import Task, Flow

    blocker = Task(name="blocker")

    with Flow("Files downloader") as flow:
        files = get_files()
        downloaded_files = download_file.map(files)
        blocker.set_upstream(downloaded_files)
        import_file.map(downloaded_files, upstream_tasks=[blocker])
Because
blocker
depends on
download_files
, it won’t run until both mapped tasks are complete. And because
import_file
depends on
blocker
, it won’t run until after both files are complete as well
j
I've tried your approach but still getting download tasks running first. I'm running without parallelization with just
flow.run()
and get this
Copy code
[2020-08-29 21:11:54] INFO - prefect.FlowRunner | Beginning Flow run for 'Files downloader'
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'get_files': Starting task run...
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'get_files': finished task run for task with final state: 'Success'
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'download_file': Starting task run...
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'download_file': finished task run for task with final state: 'Mapped'
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'download_file[0]': Starting task run...
Download file 0. Sleeping for 10s
[2020-08-29 21:12:04] INFO - prefect.TaskRunner | Task 'download_file[0]': finished task run for task with final state: 'Success'
[2020-08-29 21:12:04] INFO - prefect.TaskRunner | Task 'download_file[1]': Starting task run...
Download file 1. Sleeping for 15s
[2020-08-29 21:12:19] INFO - prefect.TaskRunner | Task 'download_file[1]': finished task run for task with final state: 'Success'
[2020-08-29 21:12:19] INFO - prefect.TaskRunner | Task 'download_file[2]': Starting task run...
Download file 2. Sleeping for 5s
[2020-08-29 21:12:24] INFO - prefect.TaskRunner | Task 'download_file[2]': finished task run for task with final state: 'Success'
[2020-08-29 21:12:24] INFO - prefect.TaskRunner | Task 'download_file[3]': Starting task run...
Download file 3. Sleeping for 7s
So you can see, it runs download_file[0] -> download_file[1] -> etc...
I'm trying to do a mapping of two tasks but I expect to get download_file[0] -> import_file[0] -> download_file[1] -> import_file[1] -> etc..
Copy code
from prefect import Flow, Task, task
from prefect.engine.executors import DaskExecutor
from distributed.deploy.local import LocalCluster
from random import randint
from time import sleep


@task
def get_files():
    return list(range(10))


@task
def download_file(file):
    timeout = randint(5, 15)
    print(f"Download file {file}. Sleeping for {timeout}s")

    sleep(timeout)

    return file


@task
def import_file(file):
    print(f"Importing file {file}")


def main():
    blocker = Task(name="blocker")

    with Flow("Files downloader") as flow:
        files = get_files()
        downloaded_files = download_file.map(files)
        blocker.set_upstream(downloaded_files)
        import_file.map(downloaded_files, upstream_tasks=[blocker])

    # flow.run(executor=DaskExecutor(
        # cluster_class=LocalCluster, cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}))
    # flow.run(executor=DaskExecutor())
    flow.run()


if __name__ == "__main__":
    main()
c
Oh apologies I misunderstood your question; what version of Prefect are you running?
j
Latest I guess
Copy code
Requirement already satisfied: prefect in ./.venv/lib/python3.8/site-packages (0.13.4)
c
Interesting I’m surprised that’s not already behaving as you expect; are the download tasks very fast?
j
No, I have random sleeps from 5 to 10 seconds. See the source above
c
Interesting yea; ultimately when using mapping, Dask is provided the task graph and makes the decisions as to when to run each piece of the graph, and for some reason Dask is choosing to proceed in a breadth first manner here. If you need a strict guarantee about execution order, you should use a static DAG instead of a dynamic pipeline. E.g., explicitly make task copies corresponding to each filename. You might also consider combining the download + import steps into a single task
j
Thanks! The last one seems to be a better solution for now
👍 1