Hi all. I’m currently running a deployment with a...
# prefect-community
s
Hi all. I’m currently running a deployment with a flow like: .. (sorry, please follow the thread for the original message!)
1
a
welcome back Sang 👋 could you move the code blocks to the thread?
s
Sure
Hi all. I’m currently running a deployment with a flow like:
Copy code
@flow
def my_flow():
    ctx: FlowRunContext = get_run_context()
    seconds_late = (
        pendulum.now().diff(ctx.flow_run.expected_start_time, abs=False).in_seconds()
        * -1
    )

    if seconds_late > 10:
        return Failed(message=f"Flow started {seconds_late} seconds late.")

    my_task()
so that if a run is past a certain time, automatically return a failure with a message. However, when the cli, when I try to execute a simplified example:
Copy code
@flow
def my_flow():
    ctx: FlowRunContext = get_run_context()
    
    return Failed(message=f"Flow started {seconds_late} seconds late.")

    my_task()
(So the second one should automatically fail) this one seems to run with a successful flow without failing. Is this meant to be happening? So I get something like:
Copy code
Loading flow from deployed location...
Running flow...
14:04:04.075 | INFO    | prefect.engine - Created flow run 'taupe-coua' for flow 'ng'
14:04:04.076 | INFO    | Flow run 'taupe-coua' - Using task runner 'ConcurrentTaskRunner'
14:04:04.355 | INFO    | Flow run 'taupe-coua' - Flow can run - still current
14:04:04.500 | INFO    | Flow run 'taupe-coua' - Finished in state Completed()
I was expecting something like Failed() with some error messages
a
basically, the main message should be a problem description without the code thanks so much 🙏
s
Sorry yeah haha
Yeah, so I’m currently trying to design a controlled failure for a flow, but running
Copy code
prefect deployment execute ...
for this particular flow doesnt seem to reproduce the Failure
a
execute is equivalent to:
python yourflow.py
can you try to describe the problem from a business perspective in your main message? I didn't fully understand yet what you are trying to do
and for the code, any chance you could make it to a full flow I could reproduce?
s
Sure, I will to write
a
awesome
s
Copy code
def late_polling(ctx, maxlate=datetime.timedelta(hours=12)):
    """
    delibrately kill the flow if the we have gone past the maxlate threshold
    date 
    """
    logger = get_run_logger()   
    return Cancelled(
            message=f"Flow is too late - it went past the max_late threshold."
    )
    if ctx.flow_run.expected_start_time + maxlate < datetime.datetime.now(
    #        tz=pytz.utc):
        return Cancelled(
            message=f"Flow is too late - it went past the max_late threshold."
        )
    else:
        <http://logger.info|logger.info>("Flow can run - still current")
Copy code
@flow(name = "ng")
def ng_daily_polling():
    """flow to call the NG daily polling function"""
    ctx: FlowRunContext = get_run_context()
    # late_polling is the equivalent of the task handler we have made
    late_polling(ctx)
    print("This is a random flow running that should not print if we fail successfully")
So I’m trying to ensure that the flow doesnt work after a certain threshold of time when deployed, which in the case is after 12 hours or so. I was planning on using a conditional failure return and test it with a random deployment, but I figured that if I removed the time conditional (12 hours) and just returned Failure state immediately, it would at least confirm the logic works out. As you see, I couldn’t quite produce the Failure state
a
I see - so this is your solution to the problem of: "Run it real-time all the time in a while-loop fashion but only between 8 and 22"?
s
yes\
Well, run it in a schedule, but if we have a deployment that is out of bounds in terms of time (12 hours) then automatically fail
But again, Im literally failing at failing this haha
💯 1
😂 1
a
let me think about it, I'll get back to you later today or tomorrow thanks for sharing your use case
s
Sure no worries
k
If you want a flow to run every hour between 8 and 22, can’t you use a CronSchedule like this?
Copy code
0 8-22 * * *
this is for every hour, but you can change it to the frequency you need.
upvote 1
s
No its no quite that, Ive currently got a deployment schedule using rrules, but after its deployed, I wish to make it fail after a certain number of hours (in this case 12) has past after it has been deployed. The deploymentschedule at the moment is:
Copy code
FlowSchedule = rrule(
        freq=MINUTELY,
        interval=5,
        dtstart=dt(2020, 1, 1, 8, 31, 0),
        byhour=range(8, 9),
        byminute=range(30, 52),
    )

    return DeploymentSpec(
        flow=ng_daily_polling,
        name=f"test_2",
        schedule=RRuleSchedule.from_rrule(FlowSchedule),
        tags=deploy_tags,
        # parameters=parameters,
        flow_runner=SubprocessFlowRunner(),
    )
I may not have explained the problem properly so my apologies on my side
k
Ah you want a Deployment to only last for 12 hours? Or you really want to fail flows?
s
So, I wish to bake in every flow in a deployment that if for some reason, if it is delayed beyond 12 hours, then even if it runs fail it
k
Oh delayed? I understand now. Then I think your code above is how I’d do the same. Why does it fail? If it’s because of the Cancellation:
Copy code
return Cancelled(
            message=f"Flow is too late - it went past the max_late threshold."
    )
Instead you can do something like:
Copy code
@task
def fail_task():
    raise TimeoutError()

@flow
def myflow():
     fail_task().result() # will be raised and end execution
maybe?
🙌 1
s
So after the state of ‘cancelled’ has been returned, if I call upin the fail_task().result(), the flow should be cancelled? So I am thinking:
Copy code
@flow(name = "ng")
def ng_daily_polling():
    """flow to call the NG daily polling function"""
    ctx: FlowRunContext = get_run_context()
    # late_polling is the equivalent of the task handler we have made

    if late_polling(ctx):
       fail_task().result() 
    print("This is a random flow running that should not print if we fail successfully")
something like this?
k
I wouldn’t rely on Cancellation, I would just return True/False I think from
late_polling
. And then fail my flow.
🙌 1
s
Hmm ok, I will give it a try. Thank you for the advice
Just another question - no rush on this - I’ve been running deployments of different flows with the same tags
Copy code
tags = ['run_type_1']
which creates a work-queue of the following:
Copy code
WorkQueue(id='0bd02e1e-523a-4151-9d92-ea3d902e8a44', 
          created='27 seconds ago', updated='27 seconds ago', 
          filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
From my interpretation of it, this workqueue should therefore be able to run both flows, correct? I did a preview of the work-queue however, and I only seem to getting the id of a single type of deployment: So just wondering, are the reployments and the tags only supportive of a single flow? Or can it support multiple flows?
k
It can support multiple. Can you
inspect
your work queue through the CLI?
s
ok I’ll have a look
k
I suspect you specified a deployment in there
s
Running:
Copy code
prefect work-queue ls
I get:
Copy code
0bd02e1e-523a-4151-9d92-ea3d902e8a44 │ work_queue_type_1 │ None  

7b09b84f-ce57-42f2-90c1-a42b1dc16999 │ newworkqueue      │ None
work_queue_type_1 is the one with tags. From that, I do:
Copy code
prefect work-queue inspect 0bd02e1e-523a-4151-9d92-ea3d902e8a44
and I get:
Copy code
WorkQueue(id='0bd02e1e-523a-4151-9d92-ea3d902e8a44', created='6 hours ago', updated='6 hours ago', filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
I think I did something wrong, but its good to hear that work-queues does indeed support multiple deployments
k
Yeah that looks good and yes it should support