https://prefect.io logo
Title
z

Zach Jablons

03/10/2023, 1:33 AM
I'm also trying to figure out how I can defer execution of potentially expensive tasks in a flow unless requested, or to only execute a given task and its parents. 🧵
import time                                                                      
import sys                                                                       
                                                                                 
from prefect import flow, task                                                   
from prefect.tasks import task_input_hash                                        
                                                                                 
@task(cache_key_fn=task_input_hash, refresh_cache=True)                          
def a():                                                                         
    print("executing a")                                                         
    time.sleep(1)                                                                
    return 2                                                                     
                                                                                 
@task(cache_key_fn=task_input_hash, refresh_cache=True)                          
def b(result_from_a):                                                            
    print("executing b")                                                         
    time.sleep(10)                                                               
    return result_from_a + 2                                                     
                                                                                 
@task(cache_key_fn=task_input_hash, refresh_cache=True)                          
def c(result_from_a):                                                            
    print("executing c")                                                         
    return result_from_a + 4                                                     
                                                                                 
@flow                                                                            
def test1():                                                                     
    print("Submitting a")                                                        
    res_a = a.submit()                                                           
    print("Submitting b")                                                        
    res_b = b.submit(res_a, wait_for=res_a)                                      
    print("Submitting c")                                                        
    res_c = c.submit(res_a, wait_for=res_a)                                      
    return {                                                                     
            'b':res_b,                                                           
            'a':res_a,                                                           
            'c':res_c                                                            
    }                                                                            
                                                                                 
if __name__ == "__main__":                                                       
    total_tic = time.time()                                                      
    nodes = test1()                                                              
    print(nodes[sys.argv[1]].result())                                           
    print(f"Took {time.time() - total_tic} seconds")
What I'm hoping to be able to do here is to run
test.py c
without invoking the
b
task, but also allowing a user to run
test.py b
(does that make sense?). To ground this, for example
b
might be a deep analysis of a model that takes a while to run, and
c
might be a smaller analysis that's used for hyperparameter tuning or something like that
This is the output of
test.py c
18:58:30.763 | INFO    | prefect.engine - Created flow run 'gorgeous-capybara' for flow 'test1'
Submitting a
Submitting b
Submitting c
18:58:30.917 | INFO    | Flow run 'gorgeous-capybara' - Created task run 'a-0' for task 'a'
18:58:30.918 | INFO    | Flow run 'gorgeous-capybara' - Submitted task run 'a-0' for execution.
/home/zach/.pyenv/versions/prefect_test/lib/python3.8/site-packages/prefect/engine.py:1215: UserWarning: A 'PrefectFuture' from a task
call was cast to a boolean; did you mean to check the result of the task instead? e.g. `if my_task().result(): ...`
  if wait_for:
18:58:30.961 | INFO    | Flow run 'gorgeous-capybara' - Created task run 'c-0' for task 'c'
18:58:30.962 | INFO    | Flow run 'gorgeous-capybara' - Submitted task run 'c-0' for execution.
executing a
18:58:31.011 | INFO    | Flow run 'gorgeous-capybara' - Created task run 'b-0' for task 'b'
18:58:31.011 | INFO    | Flow run 'gorgeous-capybara' - Submitted task run 'b-0' for execution.
18:58:32.048 | INFO    | Task run 'a-0' - Finished in state Completed()
executing b
executing c
18:58:32.145 | INFO    | Task run 'c-0' - Finished in state Completed()
18:58:42.167 | INFO    | Task run 'b-0' - Finished in state Completed()
18:58:42.212 | INFO    | Flow run 'gorgeous-capybara' - Finished in state Completed()
6
Took 11.928142786026001 seconds
Note that
b
is executed - I expected that
submit
would create a Future and would not actually execute that until
result
is called on it, which I'm not doing - but I might be completely misunderstanding how that works
a

Austin Weisgrau

03/10/2023, 5:20 PM
A future is returned immediately and the task is sent to the runner for execution, but calling future.result() won't return the result until the task has finished executing
z

Zach Jablons

03/10/2023, 5:21 PM
Is there any way to get the runner to delay execution until requested?
a

Austin Weisgrau

03/10/2023, 5:22 PM
I think if you don't want b to run unless requested, you could parametrize the flow:
@flow                                                                            
def test1(run_b: bool):                                                                     
    print("Submitting a")                                                        
    res_a = a.submit()     
    if run_b:                                                      
        print("Submitting b")                                                        
        res_b = b.submit(res_a, wait_for=res_a)                                      
    print("Submitting c")                                                        
    res_c = c.submit(res_a, wait_for=res_a)                                      
    result = {                                                                                                                               
            'a':res_a,                                                           
            'c':res_c                                                            
    } 
    if run_b:
      result.update({'b': res_b})
    return result
z

Zach Jablons

03/10/2023, 5:23 PM
Yeah, I'm hoping to avoid building that kind of logic for any arbitrary DAG
a

Austin Weisgrau

03/10/2023, 5:26 PM
So if I understand, you don't want to need to tell test1() up front if b is required to run, but have it be inferred by whether or not res_b is used downstream, is that right?
z

Zach Jablons

03/10/2023, 5:26 PM
Essentially, yes - I'm hoping to be able to construct a DAG in the flow and have it execute the portions of the DAG that are requested on invocation
(I've built something that does that in the past, but it doesn't have all the cool features Prefect has 🙂 )
a

Austin Weisgrau

03/10/2023, 5:27 PM
I'm not sure I understand the difference between what you mean by "requested on invocation" and adding a parameter to the dag
z

Zach Jablons

03/10/2023, 5:28 PM
Basically the extension to your example solution for an arbitrary DAG is to have a dictionary of tasks, and their dependencies - but then you have to recursively go through the dependencies and get their outcomes as well
I think that's possible to do, but I was just wondering if there was a way to have Prefect handle that on its own
a

Austin Weisgrau

03/10/2023, 6:03 PM
like a lazy evaluation for task futures 🤔
z

Zach Jablons

03/10/2023, 6:08 PM
Something like that would be really neat, especially if it can work with asynchronous execution while maintaining the DAG ordering correctly.
b

Bianca Hoch

04/03/2023, 7:05 PM
Hey @Zach Jablons ! It was good talking to you earlier. Feel free to drop the example we reviewed here if it is more up-to-date
z

Zach Jablons

04/03/2023, 7:46 PM
Sure thing, I think the one I posted above is along the same lines but the example I was showing is below:
import time                                                                    
import sys                                                                     
                                                                               
from prefect import flow, task                                                 
from prefect.tasks import task_input_hash                                      
                                                                               
@task(cache_key_fn=task_input_hash, refresh_cache=True)                        
def a():                                                                       
    print("executing a")                                                       
    time.sleep(1)                                                              
    return 2                                                                   
                                                                               
@task(cache_key_fn=task_input_hash, refresh_cache=True)                        
def b(result_from_a):                                                          
    print("executing b")                                                       
    time.sleep(10)                                                             
    return result_from_a + 2                                                   
                                                                               
@task(cache_key_fn=task_input_hash, refresh_cache=True)                        
def c(result_from_a):                                                          
    print("executing c")                                                       
    return result_from_a + 4                                                   
                                                                               
@flow                                                                          
def flow_a():                                                                  
    return a.submit()                                                          
                                                                               
@flow                                                                          
def flow_b(result_from_a):                                                     
    return b.submit(result_from_a)                                             
                                                                               
@flow                                                                          
def flow_c(result_from_a):                                                     
    return c.submit(result_from_a)                                             
                                                                               
@flow                                                                          
def main_flow():                                                               
    res_a = flow_a()                                                           
    res_b = flow_b(res_a)                                                      
    res_c = flow_c(res_a)                                                      
                                                                               
    return {                                                                   
        'a': res_a,                                                            
        'b': res_b,                                                            
        'c': res_c                                                             
    }                                                                          
                                                                               
if __name__ == "__main__":                                                     
    total_tic = time.time()                                                    
    nodes = main_flow()                                                        
    print(nodes[sys.argv[1]].result())                                         
    print(f"Took {time.time() - total_tic} seconds")
t

Taylor Curran

04/03/2023, 9:51 PM
Hi @Zach Jablons on the topic of caching, I wonder if this custom caching example might help: https://github.com/taylor-curran/flow-patterns/blob/main/flows/caching/custom_cache_key_function.py A list of tasks is passed as a parameter and those tasks are skipped thanks to the function static_cache_key.
z

Zach Jablons

04/03/2023, 10:01 PM
Oh that's interesting, but doesn't quite solve my issue - ideally my user would specify the tasks they want outputs for, and Prefect would figure out what tasks need to be executed to achieve that - rather than the user having to determine which list of tasks (which, it could be a lot) should be skipped (and then specify that in the hash key)
👍 1
t

Taylor Curran

04/03/2023, 10:22 PM
Do you think it might work if the cache function checked for conditions instead of checked for user input?
z

Zach Jablons

04/03/2023, 10:26 PM
What do you mean by conditions?
t

Taylor Curran

04/03/2023, 10:28 PM
conditions that would indicate wether or not the task needs to run But hmm im jumping to conclusions, let me read through this thread a little more thoroughly first 😅
I see that the condition would be wether or not the result is used Like
@flow                                                                          
def main_flow():                                                               
    res_a = flow_a()                                                           
    res_b = flow_b(res_a)                                                      
    res_c = flow_c(res_a)                                                      
                                                                               
    return {                                                                   
        'a': res_a,                                                            
        'b': res_b,                                                            
        'c': res_c                                                             
    }
You would want
flow_b
to not run because its result is not used in a downstream subflow/task and
'b': res_b
would ideally return a cached value right?
Prefect to my knowledge does not have something like that out of the box. But I’ll ask around — someone might have an idea. Also this page in the docs is a good overview of results and futures in Perfect our very own @Zanie wrote them.
z

Zach Jablons

04/04/2023, 2:00 PM
Yeah, I think the key thing that I want to be able to do here is define the DAG separately from executing the DAG, and this is an example of why. The ideal interface I'm trying to achieve here is that our team would only have to define a) what configuration they want the DAG to run with and b) what nodes they want results from, and then Prefect would take care of the rest w/o executing anything unnecessarily
z

Zanie

04/04/2023, 6:07 PM
👋 Hey Zach, I’m going to think about this feature but it’s probably not something that can be hacked together using our existing interfaces.
You could of course write a generic flow that walks a DAG and executes it dynamically then define the DAG separately but to use Prefect itself for lazy execution we’d need to make some substantial changes to how submission for execution happens.
z

Zach Jablons

04/04/2023, 6:11 PM
Ok, that's helpful to know - thanks