Hello all. Have a small question about cloud hooks...
# prefect-community
y
Hello all. Have a small question about cloud hooks. According to the documentation they work only at the flow level and not at the task level. One of the states I can select in the cloud hook is "TimedOut". However I can't find how to get my flow to enter that state. When I create my flow, I don't see any timeout condition for the entire flow run itself. Have I understood it correctly that cloud hooks only work at the flow run level? And if so how can I get my cloud hook to trigger when a flowrun enters the state "TimedOut"? Thanks in advance
discourse 1
a
TimedOut means: "Finished state indicating failure due to execution timeout." - so I would expect to see it being triggered more on a task run level when there is some timeout defined on a task and it took longer than that. I can ask the team to be sure. But can you explain what problem do you try to solve? Do you want to perform some action if your flow run took longer than e.g. 60 min?
also: are you on Prefect Cloud or Server?
y
Hi Anna, thanks for your response! I am trying prefect server. I'm was trying to get SLAs to work for both task level/flow run level. At the task level, I do not think this is supported. I can fetch the TimeOut state if the task times out, and use a statehandler to send a message wherever, but not via a cloud hook as far as I can tell, since cloud hooks seem to be flow-run level. Either way, I guess there is no way for whatever is running to run to completion AFAIK? I did see this when configuring cloud hooks though, which made me ask this question: Appreciate your help
a
on a task level there is a keyword argument
timeout
on the task decorator you could use:
Copy code
- timeout (Union[int, timedelta], optional): The amount of time (in seconds) to wait while
            running this task before a timeout occurs; note that sub-second
            resolution is not supported, even when passing in a timedelta.
on a flow level, so far we don't have such functionality for Prefect Server, but if you would use Prefect Cloud, there are Automations allowing you to set SLA on a flow level, e.g. if flow doesn't finish within 60 min, do something (cancel the run, create a new run, send an alert, etc)
y
That is clear, thanks for your quick help
👍 1
a
y
and from what I read (and what you explained), automations are also based on the flow level, is that correct? I.e. it's not possible to create an automation based on a single task running over x minutes
a
as I mentioned before, for this you could use a
timeout
argument on the task decorator. This allows you to fail a task run if it takes longer than say 5 min. Then, you could attach a
state_handler
to the same task to perform some action if this flow run fails. You could even then perform some action only if this task failed with a
TimedOut
exception
Copy code
@task(timeout=300, state_handlers=[some_action_on_failed])
def some_task():
   pass
y
Thanks, yes I understand. Am I correct in assuming that both automations/state_handlers require an actual change in state in flows (for automations) and tasks (for state handlers)? Wondering if it's possible to let flows/tasks execute past a predefined SLA, but still send the alert, for example set 60 minutes as the SLA threshold for a flow, send a notification once the 60 minutes is up, but continue to let the flow finish?
a
this is hard - why do you need that? can you explain your use case?
you could build a flow of flows, have one subflow doing the initial work, and this subflow could have an SLA of say 60 minutes configured with an Automation that sends an alert once this SLA criterion is passed, but the work may still continue in other subflows. Everything could be orchestrated from a parent flow. Does it make sense?
if you need some examples of the flow-of-flows orchestration pattern, check the Discourse topics about it here https://discourse.prefect.io/tag/flow-of-flows I think the flow-of-flows would be the easiest and most reliable approach here
wait, scratch that - I think you can really do just that in Prefect Cloud - this will send an alert once 60 min are passed but the flow won't be interrupted
👍 1
y
OK, thanks! It's clear that it's possible on the flow level. Use case wise we do have quite a few extraction tasks that take quite a while to complete, but it would be nice to receive some alerts if they pass a certain threshold time to extract. It's quite expensive to fail these and re-run them though, not just because of time but because it takes up quite a lot of the bandwidth of the upstream source.
a
this totally makes sense! LMK if you have any other questions I can help with
y
Thanks! I actually had one regarding flow-of-flows dependencies. I had a look at the documentation and as far as I understand the recommendation is to create a parent flow that creates flow_runs from pre-defined flows and make use of the
wait_for_flow_run
task to essentially orchestrate flow runs. We have quite a large number of ETL flows and a large number of downstream flows, and further downstream flows which may depend on an ETL again and/or the output of a downstream flow in the middle and so on. This dependency relationship is a many-to-many relationship, i.e. 1 ETL can be upstream to many downstream flows, and 1 downstream flow can be upstream to many downstream flows. We're responsible for the ETLs and our users are free to create templated downstream flows and specify whatever flows already available as dependencies. I'm just wondering what the best thing to do is here. In Airflow there is the concept of execution date which can be passed to a sensor task that will wait for whatever upstream flow run to finish. Based on what I have read, I'm a bit unsure how to approach this in prefect. Would you recommendation be to still use a parent flow that orchestrates the flowruns by creating and waiting? It would be quite a large flow, and something that is constantly changing based on what the user deploys and specifies as their upstream dependency. I was maybe wondering whether it's also possible to create a sensor that waits for the creation of a
flow_run_id
of an upstream flow based on something like its
scheduled_start_time
and pass that into
wait_for_flow_run
? Then we'd still have individual flows consisting of just tasks and a task at the beginning to wait for a flow run of an upstream task at schedule X to finish running. What would be your advice?
a
You talk to the right person 😅 I struggled with exactly the same problem already 2 years ago and back then I wrote a comparison blog post of how this use case can be approached in both Airflow and Prefect
👍 1
Prefect handles it quite elegantly because the
wait_for_flow_run
polls directly for the end state of the child flow run and allows you to even
raise_final_state=True
so that if the child flow run fails, the parent flow run fails as well. And you can even stream child flow run logs so that those are displayed directly in the Prefect UI. I plan to write a blog post about a similar data warehousing use case, you can subscribe to the Tutorials category on Discourse to get notified once it's out: https://discourse.prefect.io/c/tutorials-resources/13
👍 1
y
Thanks for your help and the article! I understand that prefect's current handling of flow-on-flow dependencies is quite like the
TriggerDagRunOperator
with an additional
WaitForCompletion
sensor in Airflow. On one hand it's nicer than how Airflow does it, since the actual flowrun does not start until the upstream flow has finished (nicer than the sensor inside a flow run/the
TriggerDagRunOperator
not waiting in airflow). On the other hand, I'm wondering whether it should be the other way round, i.e. the downstream flow should just check whether the upstream flowrun is done before executing it's flowrun, instead of downstream flows being triggered directly once the upstream flowrun is done. In a way this leads to quite enormous flows for us which feel quite coupled as all downstream flows/upstream flows must be orchestrated in one flow (and kept updated) regardless of their function, instead of autonomous flows with their own execution schedule (in airflow the
ExternalTaskSensor
can be supplied with a different execution_date in case the heartbeats are different, i.e. hourly flow run waiting for a daily flow run) - we do have these cases, I'm also not entirely sure how those cases would be handled in prefect? Quite curious what your thoughts are on this. Really appreciate your help!
a
since the actual flowrun does not start until the upstream flow has finished
This is configurable. If you use the
create_flow_run
task without the
wait_for_flow_run
, you are triggering them in a fire and forget way without waiting and polling for the final state of the triggered child flow run.
On the other hand, I'm wondering whether it should be the other way round, i.e. the downstream flow should just check whether the upstream flowrun is done before executing it's flowrun
This would be a rather weird separation of concerns, don't you think? A flow run shouldn't be evaluating the upstream flow run's state - Prefect API should! Prefect is orchestrating the child flow runs from a parent flow run and manages the execution based on the states of your workflow components.
can be supplied with a different execution_date in case the heartbeats are different, i.e. hourly flow run waiting for a daily flow run
Did you see the part of the article discussing scheduling? Only the parent flow needs to be scheduled, other child flow runs are simply triggered (or called) from it. This is why it's called "the orchestrator pattern" since the parent flow run serves as the orchestrator of child flows and this parent orchestrator decides when to trigger each child flow run. Having schedules attached to child flow runs and synchronizing their execution based on that leads you back to CRON when you schedule one job at 1 AM and another one at 2 AM and hope 🤞 that this will work. The orchestrator pattern allows you to do it much more reliably because downstream child flow runs are triggered directly once the upstream flow runs are finished (or only once they are successful - configurable using the
raise_final_state
argument).
y
Copy code
since the parent flow run serves as the orchestrator of child flows and this parent orchestrator decides when to trigger each child flow run.
Sorry, I think my use case example was perhaps not clear, so we have flow C (needs to run every hour), which has upstream dependencies on flow B (needs to run every day) and flow A (also runs every hour). If I schedule this in a parent flow, then I'd need to schedule the parent flow as an hourly run and execute A & C every hour, but flow B should once execute every 24 hours. I think that case is not covered in the scheduling part of the article? I see
StartFlowRun
takes a
scheduled_start_time
, but that is a datetime object. Is this something that's possible?
Copy code
A flow run shouldn't be evaluating the upstream flow run's state
Indeed, I definitely agree. My question was more trying to understand the reasoning behind how this orchestration is handled. Prefect's solution is to create this parent orchestrator flow. I guess another way to do it is for for prefect (somewhere, e.g. scheduler, or via a sensor outside of the flows themselves) to determine that flow B has a dependency on flow A, therefore schedule flow B after the flow_run of A has completed. I.e. flow B is executed after flow A, but doesn't have knowledge of flow A's state. In this situation there'd be no need to create and maintain parent flows. If I had to guess the reason is that prefect's scheduler is set up to be much leaner/have no access to the actual flow code to infer this kind of stuff? Or is there perhaps another reason?
a
I think that case is not covered in the scheduling part of the article?
I understand your problem, and frankly, I had very similar use cases in my previous job! Back then we used a legacy job scheduler and it had a feature called "job chains" and we were able to specify the link between those in an XML-based job chain - but I digress 😄 Here is how I would approach it in Prefect: Option 1) My preferred choice My preferred solution would be to have only the parent flow being scheduled and this parent flow would orchestrate all child flow runs. The issue with one task that needs to be executed only once every 24 hours could be solved using caching. A simple flow code and flow diagram will explain that better than words:
Copy code
from datetime import timedelta
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

PARENT_FLOW_NAME = "parent_flow_example"
PREFECT_PROJECT_NAME = "community"


with Flow(PARENT_FLOW_NAME) as parent_flow:
    flow_a_run_id = create_flow_run(
        flow_name="Flow_A",
        project_name=PREFECT_PROJECT_NAME,
        task_args=dict(name="Flow A"),
    )
    flow_a_flowrunview = wait_for_flow_run(
        flow_a_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow A"),
    )

    flow_b_run_id = create_flow_run(
        flow_name="Flow_B",
        project_name=PREFECT_PROJECT_NAME,
        upstream_tasks=[flow_a_flowrunview],
        task_args=dict(name="Flow B", cache_for=timedelta(hours=24)),
    )
    flow_b_flowrunview = wait_for_flow_run(
        flow_b_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow B", cache_for=timedelta(hours=24)),
    )
    flow_c_run_id = create_flow_run(
        flow_name="Flow_C",
        project_name=PREFECT_PROJECT_NAME,
        upstream_tasks=[flow_b_flowrunview],
        task_args=dict(name="Flow C"),
    )
    flow_c_flowrunview = wait_for_flow_run(
        flow_c_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow C"),
    )
if __name__ == "__main__":
    parent_flow.visualize()
I think this is the cleanest approach but please judge for yourself. Option 2) Your flow B can still have a daily schedule. The parent flow, orchestrating flow A, B, and C, can have an hourly schedule that should trigger only flow A and C, and it can additionally have a task making an API call to the Prefect API to check the final state of the most recent flow run of flow B to ensure that this flow run was successful and was executed not later than 24 hours ago. To avoid unnecessary computation, this task making an API call to check on the daily flow's state doesn't need to run every hour (since you need to check this state of a daily flow run only once per day), you could cache this task for 24 hours using the
cache_for
task decorator argument.
even better - since you mentioned that flow C depends on A and B but A and B don't need to run sequentially, you could even do it this way in Prefect:
Copy code
from datetime import timedelta
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

PARENT_FLOW_NAME = "parent_flow_example"
PREFECT_PROJECT_NAME = "community"


with Flow(PARENT_FLOW_NAME) as parent_flow:
    flow_a_run_id = create_flow_run(
        flow_name="Flow_A",
        project_name=PREFECT_PROJECT_NAME,
        task_args=dict(name="Flow A"),
    )
    flow_a_flowrunview = wait_for_flow_run(
        flow_a_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow A"),
    )

    flow_b_run_id = create_flow_run(
        flow_name="Flow_B",
        project_name=PREFECT_PROJECT_NAME,
        task_args=dict(name="Flow B", cache_for=timedelta(hours=24)),
    )
    flow_b_flowrunview = wait_for_flow_run(
        flow_b_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow B", cache_for=timedelta(hours=24)),
    )
    flow_c_run_id = create_flow_run(
        flow_name="Flow_C",
        project_name=PREFECT_PROJECT_NAME,
        upstream_tasks=[flow_a_flowrunview, flow_b_flowrunview],
        task_args=dict(name="Flow C"),
    )
    flow_c_flowrunview = wait_for_flow_run(
        flow_c_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow C"),
    )
if __name__ == "__main__":
    parent_flow.visualize()
y
Thank you for the detailed ideas, I will try those out! Would you also have some insight regarding the second question about flow-on-flow dependencies?
a
schedule flow B after the flow_run of A has completed. I.e. flow B is executed after flow A, but doesn't have knowledge of flow A's state
This is possible. You would just set
raise_final_state=False
in this case, it will continue downstream even if this child flow run fails
prefect's scheduler is set up to be much leaner/have no access to the actual flow code to infer this kind of stuff
The flow code has no knowledge of the execution state, the backend API has