Is there right way to check executions on dask clu...
# prefect-community
b
Is there right way to check executions on dask cluster? My flow:
Copy code
from prefect import task, Flow, case, Task
from random import randint
from prefect.schedules import IntervalSchedule
from datetime import timedelta
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment


class Preprocess:
    def __init__(self, number):
        print(number)
        self.number = number
        self.data = None

    def read_file(self):
        self.data = self.number

    def preprocess_file(self):
        self.data = self.data * 2

    def save_file(self):
        self.data = self.data / 3


@task
def is_running():
    return False

@task
def get_files():
    return [i for i in range(0, randint(0, 15))]

@task
def _print(lst):
    print(lst)
    print(len(lst))

@task
def etl(file):
    prep = Preprocess(file)
    prep.read_file()
    prep.preprocess_file()
    prep.save_file()


executor = DaskExecutor(address="<tcp://localhost:8786>")
schedule = IntervalSchedule(interval=timedelta(seconds=5))
local = LocalEnvironment(executor=executor)

with Flow("process", schedule=schedule, environment=local) as flow:
    condition = is_running()
    with case(condition, False) as cond:
        files = get_files()
        _print(files)
        etl.map(files)

flow.run()
Accorging tuto : https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html I started scheduler and 2 workers , and after looked at the boker-web ui. But Tasks are empy on ui.
j
Hi bral, when using
flow.run()
your environment on the flow is ignored, configuring a flow environment is for usage with flow deployments via
flow.register()
(see https://docs.prefect.io/orchestration/tutorial/first.html for more info). If you're trying to use
flow.run
with a dask executor, you need to pass in the executor explicitly as
flow.run(executor=executor)
b
@Jim Crist-Harif thanks again!