Hello Prefect community! I'm starting out with Pre...
# ask-community
l
Hello Prefect community! I'm starting out with Prefect, I have many similar tasks that been a pleasure to automate, -> I attach a sample code snippet of my flow in the thread 🧵 to avoid spamming here. My questions are: 1. I couldn't figure out if instead of the three for loops I could/should be using map/iterated mapping (I've made .map() work well with the equivalent of 1 for loop but not the three nested ones to go over all my parameters) 2. I couldn't figure out how to execute 'depth first' ( = execute the task_inference task after each long task_training is complete). I've read in the prefect docs that Depth First Execution is with Dask executors. In my case I don't want parallel execution since each task takes the entire available GPU memory of my local machine i'm running these on. Many thanks for any pointers and corrections of my misunderstandings
Code snippets shows the basic flow with the many parameters I want to try out and the resulting many simple trees. which works well for me. prefect UI screenshot shows that mostly only task_train are executed at the beginning. I would like a way to have the task_inference executed as soon as their parent task_train is successful -> to be able to monitor intermediate results
k
Hi @Louis Burtz! Nice to see some deep learning with Prefect. I think in this case you have to group inference and training into a single task, and then chain your tasks with upstream dependencies. I have a sample code snippet below for looping and setting the upstream dependencies. This will run sequentially
Copy code
with Flow("schedule flow") as schedule_flow:
    tasks = [
        sub_flow_task(parameters={"date": date})
        for date in dates
    ]
    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])
l
Thanks! the main reason to separate training and inference into separate tasks is for GPU memory management -> when the training task finishes all the GPU memory is released and made available for the inference task. this task encapsulation works seamlessly for this purpose! that was a nice 'ahah' moment thanks to prefect I think from your code snippet that I can still keep the training and inference task separate but chain the inference task with the next training task ( train[i+1].set_upstream(inference[i) ) i'll try that thank you (note: now that all tasks will be chained together, I also need to make sure that even if a task fails, the others in the chain will continue: 1. if training task i fails -> skip inference task i and continue to training task i+1 2. if inference task i fails -> continue to training task i+1
k
Nice figuring out how to chain train and inference! 1 and 2 you outlined have varying degrees of difficulty i think. First I think you want to set the training to always run.
👍 1
For inference, you can try
any_successful
, but I’m not entirely sure it’ll work. I’d have to check if this only applies to the direct upstream all degrees of upstream tasks
If it’s all degrees, we’ll need to think of something else.
👍 1
l
yes! the key question seems to be what degrees of upstream states are considered for the argument `upstream_states' in https://github.com/PrefectHQ/prefect/blob/1e63886d3def4a5fae048dd36c10762d19cb9bca/src/prefect/triggers.py#L138
k
Yes I will try tom and get back to you.
🙏 1
l
Thanks, much appreciated!
k
I have a working code example for you
Copy code
from prefect import Flow, flatten, task
from prefect.engine import signals
import prefect
from prefect.triggers import all_successful, all_failed, always_run, any_failed, any_successful

@task(trigger = always_run, skip_on_upstream_skip = False)
def train(x):
    try:
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(x)
        if x == 3:
            raise ValueError
    except:
        raise signals.SKIP
    return

@task(trigger = any_successful, skip_on_upstream_skip = True)
def inference(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    return

with Flow("test") as flow:
    samples = [1,2,3,4,5,6]
    all_tasks = []
    for i in samples:
        all_tasks.append(train(i))
        all_tasks.append(inference(i))

    for i in range(1, len(samples)*2):
        all_tasks[i].set_upstream(all_tasks[i - 1])

flow.run()
l
Oh very nice, I hadn't spotted this skip_on_upstream_skip argument! and I was raising signals.FAIL when training runs failed but SKIP will work too. THANK YOU for this code example!
to fully close the loop of my questions -> can you confirm that any_successful and all_successful etc type triggers consider the state of all upstream tasks / not just all immediately upstream tasks?
k
Yes they consider all upstream, but I think skip_on_upstream_skip is for the immediate upstream
SKIP will propagate to all downstream task, but we can intercept that with the
always_run
trigger
l
makes sense. Thanks for that too
k
No problem!
l
@Kevin Kho sorry for bothering you again with this. As shown in my initial screenshot above I need to pass the string 'run_id' from the output of task_train() to task_inference() that part was trivial in my screenshot. but now I'm trying to reconcile it with the method following your working code snippet. now that the tasks are not 'assigned' but 'appended to a list', how to I pass the output of the first task as input to the second task? I can't get the arguments task_args or key to work, I feel I'm missing a prefect concept here!
Copy code
from prefect import Flow, flatten, task
from prefect.engine import signals
import prefect
from prefect.triggers import all_successful, all_failed, always_run, any_failed, any_successful
from prefect.executors import LocalDaskExecutor, LocalExecutor

string = 'prefectisawesome'
print(string[0])

@task(trigger = always_run, skip_on_upstream_skip = False)
def train(x):
    try:
        s = string[x]
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(x)
        if x == 3:
            raise ValueError
    except:
        raise signals.SKIP
    return s


@task(trigger = any_successful, skip_on_upstream_skip = True)
def inference(s):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(s)
    return


with Flow("test") as flow:
    samples = [1,2,3,4,5,6]
    all_tasks = []
    for x in samples:
        all_tasks.append(train(x))
        i = len(all_tasks)
        all_tasks.append(inference(all_tasks[i-1]))
#         all_tasks.append(inference(x))
    for i in range(1, len(all_tasks)):
        all_tasks[i].set_upstream(all_tasks[i - 1])

        flow.executor = LocalExecutor()
flow.visualize()
(sorry for the formatting) here is my attempt to better explain ☝️ it works but the flow.visualize() is strange so I'm afraid something is wrong below the hood or does that extra branch between train and inference make sense? is that the correct way to access the result of a task in a list?
k
I knwo what you mean. I’ll look into this.
I think what you’re seeing is because the dependency is set twice. The first is set when you pass the output of the train over to the inference. The second is in the loop with
set_upstream
. I suggest setting the upstream for only every
inference -> train
and not
train -> inference
because those are already set.
You can do something like thsi
Copy code
for i in range(1, len(all_tasks), 2):
        all_tasks[i].set_upstream(all_tasks[i - 1])
        flow.executor = LocalExecutor()
and hopefully it sets the upstream while skipping the ones already set
l
Thank you that works perfectly. much appreciated, I understand better what creates task dependencies + there is no check on already existing dependencies. Glad I asked!
👍 1