bral
08/18/2020, 9:03 PMfrom prefect import task, Flow, case, Task
from random import randint
from prefect.schedules import IntervalSchedule
from datetime import timedelta
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
class Preprocess:
def __init__(self, number):
print(number)
self.number = number
self.data = None
def read_file(self):
self.data = self.number
def preprocess_file(self):
self.data = self.data * 2
def save_file(self):
self.data = self.data / 3
@task
def is_running():
return False
@task
def get_files():
return [i for i in range(0, randint(0, 15))]
@task
def _print(lst):
print(lst)
print(len(lst))
@task
def etl(file):
prep = Preprocess(file)
prep.read_file()
prep.preprocess_file()
prep.save_file()
executor = DaskExecutor(address="<tcp://localhost:8786>")
schedule = IntervalSchedule(interval=timedelta(seconds=5))
local = LocalEnvironment(executor=executor)
with Flow("process", schedule=schedule, environment=local) as flow:
condition = is_running()
with case(condition, False) as cond:
files = get_files()
_print(files)
etl.map(files)
flow.run()
Accorging tuto : https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html
I started scheduler and 2 workers , and after looked at the boker-web ui. But Tasks are empy on ui.Jim Crist-Harif
08/18/2020, 9:05 PMflow.run()
your environment on the flow is ignored, configuring a flow environment is for usage with flow deployments via flow.register()
(see https://docs.prefect.io/orchestration/tutorial/first.html for more info). If you're trying to use flow.run
with a dask executor, you need to pass in the executor explicitly as flow.run(executor=executor)
bral
08/18/2020, 9:06 PM