# prefect-community
welcome back Sang 👋 could you move the code blocks to the thread?
Hi all. I’m currently running a deployment with a flow like:
def my_flow():
    ctx: FlowRunContext = get_run_context()
    seconds_late = (
        pendulum.now().diff(ctx.flow_run.expected_start_time, abs=False).in_seconds()
        * -1

    if seconds_late > 10:
        return Failed(message=f"Flow started {seconds_late} seconds late.")

so that if a run is past a certain time, automatically return a failure with a message. However, when the cli, when I try to execute a simplified example:
def my_flow():
    ctx: FlowRunContext = get_run_context()
    return Failed(message=f"Flow started {seconds_late} seconds late.")

(So the second one should automatically fail) this one seems to run with a successful flow without failing. Is this meant to be happening? So I get something like:
Loading flow from deployed location...
Running flow...
14:04:04.075 | INFO    | prefect.engine - Created flow run 'taupe-coua' for flow 'ng'
14:04:04.076 | INFO    | Flow run 'taupe-coua' - Using task runner 'ConcurrentTaskRunner'
14:04:04.355 | INFO    | Flow run 'taupe-coua' - Flow can run - still current
14:04:04.500 | INFO    | Flow run 'taupe-coua' - Finished in state Completed()
I was expecting something like Failed() with some error messages
basically, the main message should be a problem description without the code thanks so much 🙏
Sorry yeah haha
Yeah, so I’m currently trying to design a controlled failure for a flow, but running
prefect deployment execute ...
for this particular flow doesnt seem to reproduce the Failure
execute is equivalent to:
python yourflow.py
can you try to describe the problem from a business perspective in your main message? I didn't fully understand yet what you are trying to do
and for the code, any chance you could make it to a full flow I could reproduce?
Sure, I will to write
def late_polling(ctx, maxlate=datetime.timedelta(hours=12)):
    delibrately kill the flow if the we have gone past the maxlate threshold
    logger = get_run_logger()   
    return Cancelled(
            message=f"Flow is too late - it went past the max_late threshold."
    if ctx.flow_run.expected_start_time + maxlate < datetime.datetime.now(
    #        tz=pytz.utc):
        return Cancelled(
            message=f"Flow is too late - it went past the max_late threshold."
        <http://logger.info|logger.info>("Flow can run - still current")
@flow(name = "ng")
def ng_daily_polling():
    """flow to call the NG daily polling function"""
    ctx: FlowRunContext = get_run_context()
    # late_polling is the equivalent of the task handler we have made
    print("This is a random flow running that should not print if we fail successfully")
So I’m trying to ensure that the flow doesnt work after a certain threshold of time when deployed, which in the case is after 12 hours or so. I was planning on using a conditional failure return and test it with a random deployment, but I figured that if I removed the time conditional (12 hours) and just returned Failure state immediately, it would at least confirm the logic works out. As you see, I couldn’t quite produce the Failure state
I see - so this is your solution to the problem of: "Run it real-time all the time in a while-loop fashion but only between 8 and 22"?
Well, run it in a schedule, but if we have a deployment that is out of bounds in terms of time (12 hours) then automatically fail
But again, Im literally failing at failing this haha
let me think about it, I'll get back to you later today or tomorrow thanks for sharing your use case
Sure no worries
If you want a flow to run every hour between 8 and 22, can’t you use a CronSchedule like this?
0 8-22 * * *
this is for every hour, but you can change it to the frequency you need.
No its no quite that, Ive currently got a deployment schedule using rrules, but after its deployed, I wish to make it fail after a certain number of hours (in this case 12) has past after it has been deployed. The deploymentschedule at the moment is:
FlowSchedule = rrule(
        dtstart=dt(2020, 1, 1, 8, 31, 0),
        byhour=range(8, 9),
        byminute=range(30, 52),

    return DeploymentSpec(
        # parameters=parameters,
I may not have explained the problem properly so my apologies on my side
Ah you want a Deployment to only last for 12 hours? Or you really want to fail flows?
So, I wish to bake in every flow in a deployment that if for some reason, if it is delayed beyond 12 hours, then even if it runs fail it
Oh delayed? I understand now. Then I think your code above is how I’d do the same. Why does it fail? If it’s because of the Cancellation:
return Cancelled(
            message=f"Flow is too late - it went past the max_late threshold."
Instead you can do something like:
Copy code
def fail_task():
    raise TimeoutError()

def myflow():
     fail_task().result() # will be raised and end execution
So after the state of ‘cancelled’ has been returned, if I call upin the fail_task().result(), the flow should be cancelled? So I am thinking:
Copy code
@flow(name = "ng")
def ng_daily_polling():
    """flow to call the NG daily polling function"""
    ctx: FlowRunContext = get_run_context()
    # late_polling is the equivalent of the task handler we have made

    if late_polling(ctx):
    print("This is a random flow running that should not print if we fail successfully")
something like this?
I wouldn’t rely on Cancellation, I would just return True/False I think from
. And then fail my flow.
Hmm ok, I will give it a try. Thank you for the advice
Just another question - no rush on this - I’ve been running deployments of different flows with the same tags
Copy code
tags = ['run_type_1']
which creates a work-queue of the following:
Copy code
          created='27 seconds ago', updated='27 seconds ago', 
          filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
From my interpretation of it, this workqueue should therefore be able to run both flows, correct? I did a preview of the work-queue however, and I only seem to getting the id of a single type of deployment: So just wondering, are the reployments and the tags only supportive of a single flow? Or can it support multiple flows?
It can support multiple. Can you
your work queue through the CLI?
ok I’ll have a look
I suspect you specified a deployment in there
Copy code
prefect work-queue ls
I get:
Copy code
0bd02e1e-523a-4151-9d92-ea3d902e8a44 │ work_queue_type_1 │ None  

7b09b84f-ce57-42f2-90c1-a42b1dc16999 │ newworkqueue      │ None
work_queue_type_1 is the one with tags. From that, I do:
Copy code
prefect work-queue inspect 0bd02e1e-523a-4151-9d92-ea3d902e8a44
and I get:
Copy code
WorkQueue(id='0bd02e1e-523a-4151-9d92-ea3d902e8a44', created='6 hours ago', updated='6 hours ago', filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
I think I did something wrong, but its good to hear that work-queues does indeed support multiple deployments
Yeah that looks good and yes it should support