Thread
#prefect-community
    l

    Louis Burtz

    1 year ago
    Hello Prefect community! I'm starting out with Prefect, I have many similar tasks that been a pleasure to automate, -> I attach a sample code snippet of my flow in the thread 🧵 to avoid spamming here. My questions are:1. I couldn't figure out if instead of the three for loops I could/should be using map/iterated mapping (I've made .map() work well with the equivalent of 1 for loop but not the three nested ones to go over all my parameters) 2. I couldn't figure out how to execute 'depth first' ( = execute the task_inference task after each long task_training is complete). I've read in the prefect docs that Depth First Execution is with Dask executors. In my case I don't want parallel execution since each task takes the entire available GPU memory of my local machine i'm running these on. Many thanks for any pointers and corrections of my misunderstandings
    Code snippets shows the basic flow with the many parameters I want to try out and the resulting many simple trees. which works well for me. prefect UI screenshot shows that mostly only task_train are executed at the beginning. I would like a way to have the task_inference executed as soon as their parent task_train is successful -> to be able to monitor intermediate results
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Louis Burtz! Nice to see some deep learning with Prefect. I think in this case you have to group inference and training into a single task, and then chain your tasks with upstream dependencies. I have a sample code snippet below for looping and setting the upstream dependencies. This will run sequentially
    with Flow("schedule flow") as schedule_flow:
        tasks = [
            sub_flow_task(parameters={"date": date})
            for date in dates
        ]
        for i in range(1, len(tasks)):
            tasks[i].set_upstream(tasks[i - 1])
    l

    Louis Burtz

    1 year ago
    Thanks! the main reason to separate training and inference into separate tasks is for GPU memory management -> when the training task finishes all the GPU memory is released and made available for the inference task. this task encapsulation works seamlessly for this purpose! that was a nice 'ahah' moment thanks to prefect I think from your code snippet that I can still keep the training and inference task separate but chain the inference task with the next training task ( train[i+1].set_upstream(inference[i) ) i'll try that thank you (note: now that all tasks will be chained together, I also need to make sure that even if a task fails, the others in the chain will continue:1. if training task i fails -> skip inference task i and continue to training task i+1 2. if inference task i fails -> continue to training task i+1
    Kevin Kho

    Kevin Kho

    1 year ago
    Nice figuring out how to chain train and inference! 1 and 2 you outlined have varying degrees of difficulty i think. First I think you want to set the training to always run.
    For inference, you can try
    any_successful
    , but I’m not entirely sure it’ll work. I’d have to check if this only applies to the direct upstream all degrees of upstream tasks
    If it’s all degrees, we’ll need to think of something else.
    l

    Louis Burtz

    1 year ago
    yes! the key question seems to be what degrees of upstream states are considered for the argument `upstream_states' in https://github.com/PrefectHQ/prefect/blob/1e63886d3def4a5fae048dd36c10762d19cb9bca/src/prefect/triggers.py#L138
    Kevin Kho

    Kevin Kho

    1 year ago
    Yes I will try tom and get back to you.
    l

    Louis Burtz

    1 year ago
    Thanks, much appreciated!
    Kevin Kho

    Kevin Kho

    1 year ago
    I have a working code example for you
    from prefect import Flow, flatten, task
    from prefect.engine import signals
    import prefect
    from prefect.triggers import all_successful, all_failed, always_run, any_failed, any_successful
    
    @task(trigger = always_run, skip_on_upstream_skip = False)
    def train(x):
        try:
            logger = prefect.context.get("logger")
            <http://logger.info|logger.info>(x)
            if x == 3:
                raise ValueError
        except:
            raise signals.SKIP
        return
    
    @task(trigger = any_successful, skip_on_upstream_skip = True)
    def inference(x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(x)
        return
    
    with Flow("test") as flow:
        samples = [1,2,3,4,5,6]
        all_tasks = []
        for i in samples:
            all_tasks.append(train(i))
            all_tasks.append(inference(i))
    
        for i in range(1, len(samples)*2):
            all_tasks[i].set_upstream(all_tasks[i - 1])
    
    flow.run()
    l

    Louis Burtz

    1 year ago
    Oh very nice, I hadn't spotted this skip_on_upstream_skip argument! and I was raising signals.FAIL when training runs failed but SKIP will work too. THANK YOU for this code example!
    to fully close the loop of my questions -> can you confirm that any_successful and all_successful etc type triggers consider the state of all upstream tasks / not just all immediately upstream tasks?
    Kevin Kho

    Kevin Kho

    1 year ago
    Yes they consider all upstream, but I think skip_on_upstream_skip is for the immediate upstream
    SKIP will propagate to all downstream task, but we can intercept that with the
    always_run
    trigger
    l

    Louis Burtz

    1 year ago
    makes sense. Thanks for that too
    Kevin Kho

    Kevin Kho

    1 year ago
    No problem!
    l

    Louis Burtz

    1 year ago
    @Kevin Kho sorry for bothering you again with this. As shown in my initial screenshot above I need to pass the string 'run_id' from the output of task_train() to task_inference() that part was trivial in my screenshot. but now I'm trying to reconcile it with the method following your working code snippet. now that the tasks are not 'assigned' but 'appended to a list', how to I pass the output of the first task as input to the second task? I can't get the arguments task_args or key to work, I feel I'm missing a prefect concept here!
    from prefect import Flow, flatten, task
    from prefect.engine import signals
    import prefect
    from prefect.triggers import all_successful, all_failed, always_run, any_failed, any_successful
    from prefect.executors import LocalDaskExecutor, LocalExecutor
    
    string = 'prefectisawesome'
    print(string[0])
    
    @task(trigger = always_run, skip_on_upstream_skip = False)
    def train(x):
        try:
            s = string[x]
            logger = prefect.context.get("logger")
            <http://logger.info|logger.info>(x)
            if x == 3:
                raise ValueError
        except:
            raise signals.SKIP
        return s
    
    
    @task(trigger = any_successful, skip_on_upstream_skip = True)
    def inference(s):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(s)
        return
    
    
    with Flow("test") as flow:
        samples = [1,2,3,4,5,6]
        all_tasks = []
        for x in samples:
            all_tasks.append(train(x))
            i = len(all_tasks)
            all_tasks.append(inference(all_tasks[i-1]))
    #         all_tasks.append(inference(x))
        for i in range(1, len(all_tasks)):
            all_tasks[i].set_upstream(all_tasks[i - 1])
    
            flow.executor = LocalExecutor()
    flow.visualize()
    (sorry for the formatting) here is my attempt to better explain it works but the flow.visualize() is strange so I'm afraid something is wrong below the hood or does that extra branch between train and inference make sense? is that the correct way to access the result of a task in a list?
    Kevin Kho

    Kevin Kho

    1 year ago
    I knwo what you mean. I’ll look into this.
    I think what you’re seeing is because the dependency is set twice. The first is set when you pass the output of the train over to the inference. The second is in the loop with
    set_upstream
    . I suggest setting the upstream for only every
    inference -> train
    and not
    train -> inference
    because those are already set.
    You can do something like thsi
    for i in range(1, len(all_tasks), 2):
            all_tasks[i].set_upstream(all_tasks[i - 1])
            flow.executor = LocalExecutor()
    and hopefully it sets the upstream while skipping the ones already set
    l

    Louis Burtz

    1 year ago
    Thank you that works perfectly. much appreciated, I understand better what creates task dependencies + there is no check on already existing dependencies. Glad I asked!