https://prefect.io logo
Title
b

bral

08/18/2020, 9:03 PM
Is there right way to check executions on dask cluster? My flow:
from prefect import task, Flow, case, Task
from random import randint
from prefect.schedules import IntervalSchedule
from datetime import timedelta
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment


class Preprocess:
    def __init__(self, number):
        print(number)
        self.number = number
        self.data = None

    def read_file(self):
        self.data = self.number

    def preprocess_file(self):
        self.data = self.data * 2

    def save_file(self):
        self.data = self.data / 3


@task
def is_running():
    return False

@task
def get_files():
    return [i for i in range(0, randint(0, 15))]

@task
def _print(lst):
    print(lst)
    print(len(lst))

@task
def etl(file):
    prep = Preprocess(file)
    prep.read_file()
    prep.preprocess_file()
    prep.save_file()


executor = DaskExecutor(address="<tcp://localhost:8786>")
schedule = IntervalSchedule(interval=timedelta(seconds=5))
local = LocalEnvironment(executor=executor)

with Flow("process", schedule=schedule, environment=local) as flow:
    condition = is_running()
    with case(condition, False) as cond:
        files = get_files()
        _print(files)
        etl.map(files)

flow.run()
Accorging tuto : https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html I started scheduler and 2 workers , and after looked at the boker-web ui. But Tasks are empy on ui.
j

Jim Crist-Harif

08/18/2020, 9:05 PM
Hi bral, when using
flow.run()
your environment on the flow is ignored, configuring a flow environment is for usage with flow deployments via
flow.register()
(see https://docs.prefect.io/orchestration/tutorial/first.html for more info). If you're trying to use
flow.run
with a dask executor, you need to pass in the executor explicitly as
flow.run(executor=executor)
b

bral

08/18/2020, 9:06 PM
@Jim Crist-Harif thanks again!