Zach Jablons
03/10/2023, 1:33 AMimport time
import sys
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, refresh_cache=True)
def a():
print("executing a")
time.sleep(1)
return 2
@task(cache_key_fn=task_input_hash, refresh_cache=True)
def b(result_from_a):
print("executing b")
time.sleep(10)
return result_from_a + 2
@task(cache_key_fn=task_input_hash, refresh_cache=True)
def c(result_from_a):
print("executing c")
return result_from_a + 4
@flow
def test1():
print("Submitting a")
res_a = a.submit()
print("Submitting b")
res_b = b.submit(res_a, wait_for=res_a)
print("Submitting c")
res_c = c.submit(res_a, wait_for=res_a)
return {
'b':res_b,
'a':res_a,
'c':res_c
}
if __name__ == "__main__":
total_tic = time.time()
nodes = test1()
print(nodes[sys.argv[1]].result())
print(f"Took {time.time() - total_tic} seconds")
test.py c
without invoking the b
task, but also allowing a user to run test.py b
(does that make sense?). To ground this, for example b
might be a deep analysis of a model that takes a while to run, and c
might be a smaller analysis that's used for hyperparameter tuning or something like thattest.py c
18:58:30.763 | INFO | prefect.engine - Created flow run 'gorgeous-capybara' for flow 'test1'
Submitting a
Submitting b
Submitting c
18:58:30.917 | INFO | Flow run 'gorgeous-capybara' - Created task run 'a-0' for task 'a'
18:58:30.918 | INFO | Flow run 'gorgeous-capybara' - Submitted task run 'a-0' for execution.
/home/zach/.pyenv/versions/prefect_test/lib/python3.8/site-packages/prefect/engine.py:1215: UserWarning: A 'PrefectFuture' from a task
call was cast to a boolean; did you mean to check the result of the task instead? e.g. `if my_task().result(): ...`
if wait_for:
18:58:30.961 | INFO | Flow run 'gorgeous-capybara' - Created task run 'c-0' for task 'c'
18:58:30.962 | INFO | Flow run 'gorgeous-capybara' - Submitted task run 'c-0' for execution.
executing a
18:58:31.011 | INFO | Flow run 'gorgeous-capybara' - Created task run 'b-0' for task 'b'
18:58:31.011 | INFO | Flow run 'gorgeous-capybara' - Submitted task run 'b-0' for execution.
18:58:32.048 | INFO | Task run 'a-0' - Finished in state Completed()
executing b
executing c
18:58:32.145 | INFO | Task run 'c-0' - Finished in state Completed()
18:58:42.167 | INFO | Task run 'b-0' - Finished in state Completed()
18:58:42.212 | INFO | Flow run 'gorgeous-capybara' - Finished in state Completed()
6
Took 11.928142786026001 seconds
b
is executed - I expected that submit
would create a Future and would not actually execute that until result
is called on it, which I'm not doing - but I might be completely misunderstanding how that worksAustin Weisgrau
03/10/2023, 5:20 PMZach Jablons
03/10/2023, 5:21 PMAustin Weisgrau
03/10/2023, 5:22 PM@flow
def test1(run_b: bool):
print("Submitting a")
res_a = a.submit()
if run_b:
print("Submitting b")
res_b = b.submit(res_a, wait_for=res_a)
print("Submitting c")
res_c = c.submit(res_a, wait_for=res_a)
result = {
'a':res_a,
'c':res_c
}
if run_b:
result.update({'b': res_b})
return result
Zach Jablons
03/10/2023, 5:23 PMAustin Weisgrau
03/10/2023, 5:26 PMZach Jablons
03/10/2023, 5:26 PMAustin Weisgrau
03/10/2023, 5:27 PMZach Jablons
03/10/2023, 5:28 PMAustin Weisgrau
03/10/2023, 6:03 PMZach Jablons
03/10/2023, 6:08 PMBianca Hoch
04/03/2023, 7:05 PMZach Jablons
04/03/2023, 7:46 PMimport time
import sys
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, refresh_cache=True)
def a():
print("executing a")
time.sleep(1)
return 2
@task(cache_key_fn=task_input_hash, refresh_cache=True)
def b(result_from_a):
print("executing b")
time.sleep(10)
return result_from_a + 2
@task(cache_key_fn=task_input_hash, refresh_cache=True)
def c(result_from_a):
print("executing c")
return result_from_a + 4
@flow
def flow_a():
return a.submit()
@flow
def flow_b(result_from_a):
return b.submit(result_from_a)
@flow
def flow_c(result_from_a):
return c.submit(result_from_a)
@flow
def main_flow():
res_a = flow_a()
res_b = flow_b(res_a)
res_c = flow_c(res_a)
return {
'a': res_a,
'b': res_b,
'c': res_c
}
if __name__ == "__main__":
total_tic = time.time()
nodes = main_flow()
print(nodes[sys.argv[1]].result())
print(f"Took {time.time() - total_tic} seconds")
Taylor Curran
04/03/2023, 9:51 PMZach Jablons
04/03/2023, 10:01 PMTaylor Curran
04/03/2023, 10:22 PMZach Jablons
04/03/2023, 10:26 PMTaylor Curran
04/03/2023, 10:28 PM@flow
def main_flow():
res_a = flow_a()
res_b = flow_b(res_a)
res_c = flow_c(res_a)
return {
'a': res_a,
'b': res_b,
'c': res_c
}
You would want flow_b
to not run because its result is not used in a downstream subflow/task
and 'b': res_b
would ideally return a cached value
right?Zach Jablons
04/04/2023, 2:00 PMZanie
04/04/2023, 6:07 PMZach Jablons
04/04/2023, 6:11 PM