Thread
#prefect-community
    bral

    bral

    2 years ago
    Is there right way to check executions on dask cluster? My flow:
    from prefect import task, Flow, case, Task
    from random import randint
    from prefect.schedules import IntervalSchedule
    from datetime import timedelta
    from prefect.engine.executors import DaskExecutor
    from prefect.environments import LocalEnvironment
    
    
    class Preprocess:
        def __init__(self, number):
            print(number)
            self.number = number
            self.data = None
    
        def read_file(self):
            self.data = self.number
    
        def preprocess_file(self):
            self.data = self.data * 2
    
        def save_file(self):
            self.data = self.data / 3
    
    
    @task
    def is_running():
        return False
    
    @task
    def get_files():
        return [i for i in range(0, randint(0, 15))]
    
    @task
    def _print(lst):
        print(lst)
        print(len(lst))
    
    @task
    def etl(file):
        prep = Preprocess(file)
        prep.read_file()
        prep.preprocess_file()
        prep.save_file()
    
    
    executor = DaskExecutor(address="<tcp://localhost:8786>")
    schedule = IntervalSchedule(interval=timedelta(seconds=5))
    local = LocalEnvironment(executor=executor)
    
    with Flow("process", schedule=schedule, environment=local) as flow:
        condition = is_running()
        with case(condition, False) as cond:
            files = get_files()
            _print(files)
            etl.map(files)
    
    flow.run()
    Accorging tuto : https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html I started scheduler and 2 workers , and after looked at the boker-web ui. But Tasks are empy on ui.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hi bral, when using
    flow.run()
    your environment on the flow is ignored, configuring a flow environment is for usage with flow deployments via
    flow.register()
    (see https://docs.prefect.io/orchestration/tutorial/first.html for more info). If you're trying to use
    flow.run
    with a dask executor, you need to pass in the executor explicitly as
    flow.run(executor=executor)
    bral

    bral

    2 years ago
    @Jim Crist-Harif thanks again!