Omar Sultan
04/19/2022, 8:13 AMAnna Geller
Omar Sultan
04/20/2022, 12:14 AMclass RunMeFirst(Task):
def run(self):
print("I'm running first!")
class PlusOneTask(Task):
def run(self, x):
print(x + 1)
return x + 1
flow = Flow('My Imperative Flow')
plus_one = PlusOneTask()
run_me = RunMeFirst()
plus_one.log_stdout = True
run_me.log_stdout = True
flow.set_dependencies(
task=plus_one,
upstream_tasks=[run_me],
keyword_tasks=dict(x=10))
flow.storage = get_jigsaw_api_storage(1, f"Test Flow")
flow.storage.build()
flow.run_config = get_jigsaw_k8_config_run()
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=2)
flow.register("Utilities")
Omar Sultan
04/20/2022, 12:15 AMAnna Geller
Anna Geller
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
@task
def generate_random_numbers():
return list(range(1, 200))
@task
def add_one(x):
return x + 1
@task(log_stdout=True)
def print_results(res):
print(res)
with Flow("mapping", executor=LocalDaskExecutor(), storage=get_jigsaw_api_storage(1, f"Test Flow")) as flow:
numbers = generate_random_numbers()
result = add_one.map(numbers)
print_results(result)
The issue you have is that you build the storage before you attach the executor. Since the executor information is retrieved from storage at runtime, your executor information won't be found. Switching the order may already help solve your issue:
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=2)
flow.run_config = get_jigsaw_k8_config_run()
flow.storage = get_jigsaw_api_storage(1, f"Test Flow")
flow.storage.build()
flow.register("Utilities")
Omar Sultan
04/21/2022, 10:52 PMOmar Sultan
04/21/2022, 10:52 PMAnna Geller
Omar Sultan
04/24/2022, 11:08 PM