a

    Alberto de Santos

    1 year ago
    Hi community! I have been exploring the
    with case
    possibilities and have the following question: • Using
    with case
    only worked under the
    with Flow(...) as flow:
    statement. When using
    with case
    within a
    Task
    , it didn’t work, does it make sense?
    j

    josh

    1 year ago
    Hey @Alberto de Santos the prefect
    case
    is not intended to work inside a task because the
    case
    is a task itself https://docs.prefect.io/api/latest/tasks/control_flow.html#case
    a

    Alberto de Santos

    1 year ago
    Thanks, this is very useful.
    I have observed also that, within that
    with case
    if I have a Task planned in two days, the task is Scheduled, which makes sense. But I assume (as with those not planned or immediately executed), that when Running that Task will check that it should SKIP, am I right?
    j

    josh

    1 year ago
    What do you mean that the task is planned in two days?
    a

    Alberto de Santos

    1 year ago
    def add_scheduled_task(self, ID_jornada):
       c = Client()
       c.create_flow_run(version_group_id=self.version_group_id,
                     parameters={'ID_jornada': str(ID_jornada)},
                     run_name='_'.join([str(ID_jornada), self.version_group_id]),
                     scheduled_start_time=pendulum.instance(self.get_update_partido_time(str(ID_jornada)), tz='Europe/Madrid')
                     )
    I setup the
    scheduled_start_time
    at a specific time
    j

    josh

    1 year ago
    Ah yeah that is scheduling a flow run, not a task run. If you surround that task in a case and the condition is not met then that task will be skipped
    a

    Alberto de Santos

    1 year ago
    sorry
    Ok, that task will be skipped, but it should appear as scheduled
    My question is then: why are they scheduled then?
    j

    josh

    1 year ago
    When scheduling a flow run the flow run should be in a Scheduled state but the task runs should be Pending. Where are you seeing that the task run is in a scheduled state?
    a

    Alberto de Santos

    1 year ago
    It is probably my language, sorry. Look.
    trigger_criteria    = self.check_acceptance_micro_apuestas(total_micro_apuestas)
    
    with case(trigger_criteria, True):
    
       print(trigger_criteria)
       olimpo     = self.olimpo(ID_jornada, total_micro_apuestas)
    
       update_partido      = UpdatePartido()
       task_update_partido = update_partido.add_scheduled_task(self.ID_jornada)
    trigger_criteria
    is
    False
    Then why to Schedule a flow if it will be skipped?
    j

    josh

    1 year ago
    I still don’t follow. If the condition in the
    with case
    evaluates to false then any tasks inside that block will be skipped. Not sure if you have seen it but this page in the docs may explain it more: https://docs.prefect.io/core/idioms/conditional.html
    a

    Alberto de Santos

    1 year ago
    Yes, I read it
    I think I understand now what happened. I have two agents: one to attend general flow, the otherone to attend UPdatePartido-like flows.
    The second one was down. Once I’ve restarted, all the “scheduled” flows have disappeared.
    Coming back again, I need to understand this
    So this is the situation:
    trigger_criteria    = self.check_acceptance_micro_apuestas(total_micro_apuestas)
    
    with case(trigger_criteria, True):
    
       print(trigger_criteria)
       olimpo     = self.olimpo(ID_jornada, total_micro_apuestas)
    
       update_partido      = UpdatePartido()
       task_update_partido = update_partido.add_scheduled_task(self.ID_jornada)
    However, this
    add_scheduled_task
    , launches a new Flow, attended by a different Agent. Since it is scheduled, it will run.
    Blue one is the current one, the one with the
    case
    . Scheduled one is the UpdatePartido one. So how to avoid this? Is not enough with the
    case
    ?
    j

    josh

    1 year ago
    You need an agent running with the same labels as that flow whose run you are creating in order to pick it up and run it. It doesn’t have anything to do with the case statement
    a

    Alberto de Santos

    1 year ago
    Yes, I have that agent running and picking it up and running it. That’s not the problem.
    j

    josh

    1 year ago
    I don’t understand the problem
    a

    Alberto de Santos

    1 year ago
    The problem is that if within the
    with case
    I call that new Flow, it will run!! despite of being in a
    with case
    j

    josh

    1 year ago
    That must mean that the case statement is evaluating to True
    a

    Alberto de Santos

    1 year ago
    No
    It is evaluating to False
    Promised
    Look, in summary it is this way:
    with case: # Returning False
    do_next_task() # This is skipped, great
    call_another_new_flow() # this is scheduled and later, executed, as a different agent is taking care of it
    j

    josh

    1 year ago
    What do the logs of the case task say from the run?
    a

    Alberto de Santos

    1 year ago
    probably, I should not use
    with case
    for this situation but a standard
    if-else
    16:29:52 lens INFO CloudTaskRunner Task ‘case(True)’: Starting task run... link 16:29:53 lens INFO CloudTaskRunner SKIP signal raised: SKIP(‘Provided value “False” did not match “True”’,) link 16:29:54 lens INFO CloudTaskRunner Task ‘case(True)’: finished task run for task with final state: ‘Skipped’
    j

    josh

    1 year ago
    I recommend opening an issue on the GitHub repo with a reproducible flow example because I am unable to see why that task would be running if the case block is skipping
    a

    Alberto de Santos

    1 year ago
    Probably because it belongs to a different Flow
    When a Task is within the case, it is run but as the status is SKIP, it is skipped. However, if what you execute is a Task from a different, new Flow, …
    It is again run, but the SKIP status is lost
    makes sense?
    j

    josh

    1 year ago
    Nope
    a

    Alberto de Santos

    1 year ago
    Ok, then I am sorry, I can’t explain myself better
    j

    josh

    1 year ago
    No worries 👍
    a

    Alberto de Santos

    1 year ago
    ok, let’s say this way
    with Flow(...) as flow_a:
       
       # do stuff
    
       with case(False, True):
          # This is Skipped
          run_tasks_from_flow_a()
          
          # This is Scheduled
          # This flow_B is imported from a different file
          schedule_flow_B()
    And now, before the case task runs, the Flow B is already scheduled.
    So, even before the
    case
    assess wether it’s True or False, the Flow B is already Scheduled, so nothing can be done after
    case
    is evaluated
    Did I explained myself better?
    j

    josh

    1 year ago
    I suggest opening an issue on the repo with more information because I am not observing the same behavior. Flow:
    with Flow("conditional-branches") as flow:
        cond = check_condition()
    
        with case(cond, True):
            val = action_if_true()
            another_action(val)
    
            t = testtask()
    Importing
    testtask
    from another file. When
    cond
    is False all of those tasks inside the case block enter a skipped state.
    a

    Alberto de Santos

    1 year ago
    Understood