Hi community! I have been exploring the `with cas...
# prefect-community
a
Hi community! I have been exploring the
with case
possibilities and have the following question: • Using
with case
only worked under the
with Flow(...) as flow:
statement. When using
with case
within a
Task
, it didn’t work, does it make sense?
🎉 1
j
Hey @Alberto de Santos the prefect
case
is not intended to work inside a task because the
case
is a task itself https://docs.prefect.io/api/latest/tasks/control_flow.html#case
🙌 1
a
Thanks, this is very useful.
I have observed also that, within that
with case
if I have a Task planned in two days, the task is Scheduled, which makes sense. But I assume (as with those not planned or immediately executed), that when Running that Task will check that it should SKIP, am I right?
j
What do you mean that the task is planned in two days?
a
Copy code
def add_scheduled_task(self, ID_jornada):
   c = Client()
   c.create_flow_run(version_group_id=self.version_group_id,
                 parameters={'ID_jornada': str(ID_jornada)},
                 run_name='_'.join([str(ID_jornada), self.version_group_id]),
                 scheduled_start_time=pendulum.instance(self.get_update_partido_time(str(ID_jornada)), tz='Europe/Madrid')
                 )
I setup the
scheduled_start_time
at a specific time
j
Ah yeah that is scheduling a flow run, not a task run. If you surround that task in a case and the condition is not met then that task will be skipped
a
sorry
Ok, that task will be skipped, but it should appear as scheduled
My question is then: why are they scheduled then?
j
When scheduling a flow run the flow run should be in a Scheduled state but the task runs should be Pending. Where are you seeing that the task run is in a scheduled state?
a
It is probably my language, sorry. Look.
Copy code
trigger_criteria    = self.check_acceptance_micro_apuestas(total_micro_apuestas)

with case(trigger_criteria, True):

   print(trigger_criteria)
   olimpo     = self.olimpo(ID_jornada, total_micro_apuestas)

   update_partido      = UpdatePartido()
   task_update_partido = update_partido.add_scheduled_task(self.ID_jornada)
trigger_criteria
is
False
Then why to Schedule a flow if it will be skipped?
j
I still don’t follow. If the condition in the
with case
evaluates to false then any tasks inside that block will be skipped. Not sure if you have seen it but this page in the docs may explain it more: https://docs.prefect.io/core/idioms/conditional.html
a
Yes, I read it
I think I understand now what happened. I have two agents: one to attend general flow, the otherone to attend UPdatePartido-like flows.
The second one was down. Once I’ve restarted, all the “scheduled” flows have disappeared.
Coming back again, I need to understand this
So this is the situation:
Copy code
trigger_criteria    = self.check_acceptance_micro_apuestas(total_micro_apuestas)

with case(trigger_criteria, True):

   print(trigger_criteria)
   olimpo     = self.olimpo(ID_jornada, total_micro_apuestas)

   update_partido      = UpdatePartido()
   task_update_partido = update_partido.add_scheduled_task(self.ID_jornada)
However, this
add_scheduled_task
, launches a new Flow, attended by a different Agent. Since it is scheduled, it will run.
Blue one is the current one, the one with the
case
. Scheduled one is the UpdatePartido one. So how to avoid this? Is not enough with the
case
?
j
You need an agent running with the same labels as that flow whose run you are creating in order to pick it up and run it. It doesn’t have anything to do with the case statement
a
Yes, I have that agent running and picking it up and running it. That’s not the problem.
j
I don’t understand the problem
a
The problem is that if within the
with case
I call that new Flow, it will run!! despite of being in a
with case
j
That must mean that the case statement is evaluating to True
a
No
It is evaluating to False
Promised
Look, in summary it is this way:
with case: # Returning False
do_next_task() # This is skipped, great
call_another_new_flow() # this is scheduled and later, executed, as a different agent is taking care of it
j
What do the logs of the case task say from the run?
a
probably, I should not use
with case
for this situation but a standard
if-else
162952 lens INFO CloudTaskRunner Task ‘case(True)’: Starting task run... link 162953 lens INFO CloudTaskRunner SKIP signal raised: SKIP(‘Provided value “False” did not match “True”’,) link 162954 lens INFO CloudTaskRunner Task ‘case(True)’: finished task run for task with final state: ‘Skipped’
j
I recommend opening an issue on the GitHub repo with a reproducible flow example because I am unable to see why that task would be running if the case block is skipping
a
Probably because it belongs to a different Flow
When a Task is within the case, it is run but as the status is SKIP, it is skipped. However, if what you execute is a Task from a different, new Flow, …
It is again run, but the SKIP status is lost
makes sense?
j
Nope
a
Ok, then I am sorry, I can’t explain myself better
j
No worries 👍
a
ok, let’s say this way
Copy code
with Flow(...) as flow_a:
   
   # do stuff

   with case(False, True):
      # This is Skipped
      run_tasks_from_flow_a()
      
      # This is Scheduled
      # This flow_B is imported from a different file
      schedule_flow_B()
And now, before the case task runs, the Flow B is already scheduled.
So, even before the
case
assess wether it’s True or False, the Flow B is already Scheduled, so nothing can be done after
case
is evaluated
Did I explained myself better?
j
I suggest opening an issue on the repo with more information because I am not observing the same behavior. Flow:
Copy code
with Flow("conditional-branches") as flow:
    cond = check_condition()

    with case(cond, True):
        val = action_if_true()
        another_action(val)

        t = testtask()
Importing
testtask
from another file. When
cond
is False all of those tasks inside the case block enter a skipped state.
a
Understood