Johnny Bravo
08/29/2020, 8:31 PMflow.run(executor=DaskExecutor(
cluster_class=LocalCluster, cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}))
Not sure if is the right way to do. Now I have this flow
with Flow("Files downloader") as flow:
files = get_files()
downloaded_files = download_file.map(files)
import_file.map(downloaded_files)
The problem here is, after first two downloads, it goes to the next download task, instead of getting to import task. So, because I'm limited to 2 workers at a time, I need to prioritize import_file
task over download_file
task. Is there a better way to do this?Chris White
08/29/2020, 8:41 PMJohnny Bravo
08/29/2020, 8:55 PMChris White
08/29/2020, 9:01 PMfrom prefect import Task, Flow
blocker = Task(name="blocker")
with Flow("Files downloader") as flow:
files = get_files()
downloaded_files = download_file.map(files)
blocker.set_upstream(downloaded_files)
import_file.map(downloaded_files, upstream_tasks=[blocker])
Because blocker
depends on download_files
, it won’t run until both mapped tasks are complete. And because import_file
depends on blocker
, it won’t run until after both files are complete as wellJohnny Bravo
08/29/2020, 9:13 PMflow.run()
and get this
[2020-08-29 21:11:54] INFO - prefect.FlowRunner | Beginning Flow run for 'Files downloader'
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'get_files': Starting task run...
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'get_files': finished task run for task with final state: 'Success'
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'download_file': Starting task run...
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'download_file': finished task run for task with final state: 'Mapped'
[2020-08-29 21:11:54] INFO - prefect.TaskRunner | Task 'download_file[0]': Starting task run...
Download file 0. Sleeping for 10s
[2020-08-29 21:12:04] INFO - prefect.TaskRunner | Task 'download_file[0]': finished task run for task with final state: 'Success'
[2020-08-29 21:12:04] INFO - prefect.TaskRunner | Task 'download_file[1]': Starting task run...
Download file 1. Sleeping for 15s
[2020-08-29 21:12:19] INFO - prefect.TaskRunner | Task 'download_file[1]': finished task run for task with final state: 'Success'
[2020-08-29 21:12:19] INFO - prefect.TaskRunner | Task 'download_file[2]': Starting task run...
Download file 2. Sleeping for 5s
[2020-08-29 21:12:24] INFO - prefect.TaskRunner | Task 'download_file[2]': finished task run for task with final state: 'Success'
[2020-08-29 21:12:24] INFO - prefect.TaskRunner | Task 'download_file[3]': Starting task run...
Download file 3. Sleeping for 7s
So you can see, it runs download_file[0] -> download_file[1] -> etc...from prefect import Flow, Task, task
from prefect.engine.executors import DaskExecutor
from distributed.deploy.local import LocalCluster
from random import randint
from time import sleep
@task
def get_files():
return list(range(10))
@task
def download_file(file):
timeout = randint(5, 15)
print(f"Download file {file}. Sleeping for {timeout}s")
sleep(timeout)
return file
@task
def import_file(file):
print(f"Importing file {file}")
def main():
blocker = Task(name="blocker")
with Flow("Files downloader") as flow:
files = get_files()
downloaded_files = download_file.map(files)
blocker.set_upstream(downloaded_files)
import_file.map(downloaded_files, upstream_tasks=[blocker])
# flow.run(executor=DaskExecutor(
# cluster_class=LocalCluster, cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}))
# flow.run(executor=DaskExecutor())
flow.run()
if __name__ == "__main__":
main()
Chris White
08/29/2020, 9:23 PMJohnny Bravo
08/29/2020, 9:24 PMRequirement already satisfied: prefect in ./.venv/lib/python3.8/site-packages (0.13.4)
Chris White
08/29/2020, 9:26 PMJohnny Bravo
08/29/2020, 9:27 PMChris White
08/29/2020, 9:39 PMJohnny Bravo
08/29/2020, 11:25 PM