<@ULVA73B9P> How would I mock a flow being execute...
# ask-marvin
r
@Marvin How would I mock a flow being executed from the main flows on_completion decorator?
m
I could certainly use a bit of cheering up finding a document detailing how to mock a flow with an on_completion decorator, but I seem to be coming up empty. Sometimes reality doesn't meet our expectations, but that doesn't mean we won't try again. That's what I have, you'd probably need to visit Prefect's official documentation or ask in their community Slack for more details. Now isn't that just the way things go? Prefect Documentation Ask in Prefect Slack If your heart is set on mocking, a general approach in Python would include using the
unittest.mock
library to mock the flow and its on_completion callback. Here's a very simplified example to give you an idea:
Copy code
python
from unittest.mock import MagicMock, patch

# Assuming `my_flow` is your Prefect flow and `my_on_completion_function` is
# the function you set with the `on_completion` decorator

with patch('my_project.my_module.my_flow') as mock_flow:
    mock_on_completion = MagicMock()
    mock_flow.on_completion = mock_on_completion

    # Trigger the flow execution here in the way it would be triggered in your application

    # Assert that the on_completion method was called
    assert mock_on_completion.called
This mock might help in checking whether the
on_completion
method gets called. But remember, as the sentiment of a sentient, artificially intelligent being goes, the implementation details would depend on how your actual flow is constructed and how it's being triggered. Sometimes things are complicated – I should know.
r
@Marvin When I mock a subprocess of a parent flow to be a CrashedRun, the state of the parent flow is coming back as failed and not crashed. Why?
m
It seems that in the subtle corners of the Prefect's codebase, there's a discernible distinction between a flow that has failed and one that has crashed. According to some unearthed bits of code, a flow might return a 'failed' state instead of 'crashed' if the failure doesn't meet certain criteria that they've defined for a 'crash'. Here's what your situation might imply: - A 'crashed' state is usually reserved for abrupt interruptions like a process being killed or a system failure. - A simple exception or a subprocess exiting with a non-zero status would often result in a 'failed' state rather than 'crashed'. - An abort is specifically not marked as a crash to distinguish between active cancellations and unexpected failures. If you're mocking a subprocess to simulate a crash, you might need to ensure that the mock accurately represents the conditions under which Prefect would classify a run as 'crashed'. This can involve mocking system-level interrupts (like
KeyboardInterrupt
or
SystemExit
) instead of regular exceptions. Remember though, this is a gloomy take based on the general practice - the specifics could vary with the version and the actual implementation details of your system. You might want to scrutinize the Prefect source code further or reach out to their community for a ray of light on this issue. To dig deeper, your best bet would be to replicate the exact conditions that Prefect considers a crash in your mocks. You could review the test cases in Prefect's codebase, such as those in
test_engine.py
, to get some insights into how they handle and test these scenarios. Here's a glimmer of guidance directly from the tests: test_engine.py If you like wallowing in documentation or code like I do, you'll find the answers you're looking for... eventually.
r
@Marvin Can a flows on_failure hooks be passed a lambda function to only execute a function/flow if a certain condition is met? If so, what would an example look like?
m
In Prefect's world of flows and tasks, state change hooks can indeed execute code in response to changes in flow or task run states. They offer a morsel of customization for those who wish to enact actions upon specific state transitions within a workflow. When you speak of using a lambda function in an
on_failure
hook to conditionally execute some operation, I suppose you're pondering about Python's limitations. Functionally, a lambda is allowed, but remember, lambda functions in Python are quite restricted: they can only contain expressions and cannot contain statements or annotations. Here's an example that hinges on the realm of possibility if the lambda's simplicity doesn't clamber beyond Python's boundaries:
Copy code
python
from prefect import flow

@flow(on_failure=lambda flow_run: my_custom_function() if some_condition else None)
def my_flow():
    # Flow logic here
    pass
In this scrap of code,
my_custom_function
will only be executed if
some_condition
is
True
. Of course, this some_condition needs to be defined somewhere within your code and accessible to the lambda for this to work - lambdas can't pluck variables out of thin air, despite how convenient that would be. I must caution you, though, this style can quickly evolve into a coder's nightmare – it's a bit cryptic, isn't it? For clarity's sake, it might be prudent to encase your logic within a proper function where conditions and context can be made as clear as an unblemished diamond:
Copy code
python
def conditional_failure_handler(flow_run):
    if some_condition:
        my_custom_function()

@flow(on_failure=conditional_failure_handler)
def my_flow():
    # Flow logic here
    pass
This would indeed be an illuminated path – expansive, clear, maintainable – and let's be honest, that last quality is often as elusive as a shadow in the dark.
r
@Marvin Trying to execute the provided sample above, except as a flow, results in prefect.exceptions.UpstreamTaskError: Upstream task run 'None' did not reach a 'COMPLETED' state.
m
Ah, the classic `UpstreamTaskError`: a harbinger of connectivity issues amongst tasks within a Prefect flow. It's a deceptively simple error message indicating that an upstream task didn't successfully finish its journey - thus halting the progress of depending tasks down the stream. The error you're encountering suggests that there is likely a task within your flow which Prefect expected to complete successfully, but did not. That's quite inconvenient, isn't it? Typically, Prefect flows demand that all tasks within it reach a
COMPLETED
state for the flow to run smoothly unless specifically designed to handle different upstream states. Here's a modified version of the previous example, structured as a flow and taking into account the on_failure condition. It includes additional tasks to illustrate how they might depend on each other:
Copy code
python
from prefect import flow, task
from prefect.engine import Completed, Failed

# Define some tasks that will be part of the flow
@task
def task_1():
    return "Task 1 complete"

@task
def task_2(upstream_task):
    if upstream_task != "Task 1 complete":
        raise Exception("Task 1 did not complete, cannot proceed")
    return "Task 2 complete"

def conditional_failure_handler(flow_run):
    if some_condition:
        retry_flow_or_other_logic()

@flow(on_failure=conditional_failure_handler)
def my_flow():
    t1 = task_1()
    t2 = task_2(upstream_task=t1)
    # Flow continues with other tasks potentially depending on t1 and t2

# Execute the flow
my_flow()
In this example, if
task_2
is dependent on
task_1
and does not receive the correct input, it will raise an exception, illustrating an upstream dependency issue. To resolve a `UpstreamTaskError`: 1. Check that all of your tasks that have downstream dependencies are completing successfully. 2. Ensure that any task data passed downstream is being passed correctly. 3. Use state handlers or other task settings to manage dependencies and their states more finely. Remember to define
some_condition
and the appropriate handling logic (
retry_flow_or_other_logic
) to fit your unique circumstances. Otherwise, you'll end up with another cryptic lambda, and down that road lies madness, or so I've heard. Life, don't talk to me about life. But do make sure your tasks aren't holding grudges against each other by not completing; it tends to create issues in the workflow ecosystem.