https://prefect.io logo
#prefect-community
Title
# prefect-community
s

Sang Young Noh

06/14/2022, 1:06 PM
Hi all. I’m currently running a deployment with a flow like: .. (sorry, please follow the thread for the original message!)
1
a

Anna Geller

06/14/2022, 1:11 PM
welcome back Sang 👋 could you move the code blocks to the thread?
s

Sang Young Noh

06/14/2022, 1:11 PM
Sure
Hi all. I’m currently running a deployment with a flow like:
Copy code
@flow
def my_flow():
    ctx: FlowRunContext = get_run_context()
    seconds_late = (
        pendulum.now().diff(ctx.flow_run.expected_start_time, abs=False).in_seconds()
        * -1
    )

    if seconds_late > 10:
        return Failed(message=f"Flow started {seconds_late} seconds late.")

    my_task()
so that if a run is past a certain time, automatically return a failure with a message. However, when the cli, when I try to execute a simplified example:
Copy code
@flow
def my_flow():
    ctx: FlowRunContext = get_run_context()
    
    return Failed(message=f"Flow started {seconds_late} seconds late.")

    my_task()
(So the second one should automatically fail) this one seems to run with a successful flow without failing. Is this meant to be happening? So I get something like:
Copy code
Loading flow from deployed location...
Running flow...
14:04:04.075 | INFO    | prefect.engine - Created flow run 'taupe-coua' for flow 'ng'
14:04:04.076 | INFO    | Flow run 'taupe-coua' - Using task runner 'ConcurrentTaskRunner'
14:04:04.355 | INFO    | Flow run 'taupe-coua' - Flow can run - still current
14:04:04.500 | INFO    | Flow run 'taupe-coua' - Finished in state Completed()
I was expecting something like Failed() with some error messages
a

Anna Geller

06/14/2022, 1:12 PM
basically, the main message should be a problem description without the code thanks so much 🙏
s

Sang Young Noh

06/14/2022, 1:13 PM
Sorry yeah haha
Yeah, so I’m currently trying to design a controlled failure for a flow, but running
Copy code
prefect deployment execute ...
for this particular flow doesnt seem to reproduce the Failure
a

Anna Geller

06/14/2022, 1:14 PM
execute is equivalent to:
python yourflow.py
can you try to describe the problem from a business perspective in your main message? I didn't fully understand yet what you are trying to do
and for the code, any chance you could make it to a full flow I could reproduce?
s

Sang Young Noh

06/14/2022, 1:16 PM
Sure, I will to write
a

Anna Geller

06/14/2022, 1:16 PM
awesome
s

Sang Young Noh

06/14/2022, 1:20 PM
Copy code
def late_polling(ctx, maxlate=datetime.timedelta(hours=12)):
    """
    delibrately kill the flow if the we have gone past the maxlate threshold
    date 
    """
    logger = get_run_logger()   
    return Cancelled(
            message=f"Flow is too late - it went past the max_late threshold."
    )
    if ctx.flow_run.expected_start_time + maxlate < datetime.datetime.now(
    #        tz=pytz.utc):
        return Cancelled(
            message=f"Flow is too late - it went past the max_late threshold."
        )
    else:
        <http://logger.info|logger.info>("Flow can run - still current")
Copy code
@flow(name = "ng")
def ng_daily_polling():
    """flow to call the NG daily polling function"""
    ctx: FlowRunContext = get_run_context()
    # late_polling is the equivalent of the task handler we have made
    late_polling(ctx)
    print("This is a random flow running that should not print if we fail successfully")
So I’m trying to ensure that the flow doesnt work after a certain threshold of time when deployed, which in the case is after 12 hours or so. I was planning on using a conditional failure return and test it with a random deployment, but I figured that if I removed the time conditional (12 hours) and just returned Failure state immediately, it would at least confirm the logic works out. As you see, I couldn’t quite produce the Failure state
a

Anna Geller

06/14/2022, 1:27 PM
I see - so this is your solution to the problem of: "Run it real-time all the time in a while-loop fashion but only between 8 and 22"?
s

Sang Young Noh

06/14/2022, 1:36 PM
yes\
Well, run it in a schedule, but if we have a deployment that is out of bounds in terms of time (12 hours) then automatically fail
But again, Im literally failing at failing this haha
💯 1
😂 1
a

Anna Geller

06/14/2022, 1:37 PM
let me think about it, I'll get back to you later today or tomorrow thanks for sharing your use case
s

Sang Young Noh

06/14/2022, 1:38 PM
Sure no worries
k

Kevin Kho

06/14/2022, 1:46 PM
If you want a flow to run every hour between 8 and 22, can’t you use a CronSchedule like this?
Copy code
0 8-22 * * *
this is for every hour, but you can change it to the frequency you need.
upvote 1
s

Sang Young Noh

06/14/2022, 1:50 PM
No its no quite that, Ive currently got a deployment schedule using rrules, but after its deployed, I wish to make it fail after a certain number of hours (in this case 12) has past after it has been deployed. The deploymentschedule at the moment is:
Copy code
FlowSchedule = rrule(
        freq=MINUTELY,
        interval=5,
        dtstart=dt(2020, 1, 1, 8, 31, 0),
        byhour=range(8, 9),
        byminute=range(30, 52),
    )

    return DeploymentSpec(
        flow=ng_daily_polling,
        name=f"test_2",
        schedule=RRuleSchedule.from_rrule(FlowSchedule),
        tags=deploy_tags,
        # parameters=parameters,
        flow_runner=SubprocessFlowRunner(),
    )
I may not have explained the problem properly so my apologies on my side
k

Kevin Kho

06/14/2022, 2:04 PM
Ah you want a Deployment to only last for 12 hours? Or you really want to fail flows?
s

Sang Young Noh

06/14/2022, 2:06 PM
So, I wish to bake in every flow in a deployment that if for some reason, if it is delayed beyond 12 hours, then even if it runs fail it
k

Kevin Kho

06/14/2022, 2:10 PM
Oh delayed? I understand now. Then I think your code above is how I’d do the same. Why does it fail? If it’s because of the Cancellation:
Copy code
return Cancelled(
            message=f"Flow is too late - it went past the max_late threshold."
    )
Instead you can do something like:
Copy code
@task
def fail_task():
    raise TimeoutError()

@flow
def myflow():
     fail_task().result() # will be raised and end execution
maybe?
🙌 1
s

Sang Young Noh

06/14/2022, 2:13 PM
So after the state of ‘cancelled’ has been returned, if I call upin the fail_task().result(), the flow should be cancelled? So I am thinking:
Copy code
@flow(name = "ng")
def ng_daily_polling():
    """flow to call the NG daily polling function"""
    ctx: FlowRunContext = get_run_context()
    # late_polling is the equivalent of the task handler we have made

    if late_polling(ctx):
       fail_task().result() 
    print("This is a random flow running that should not print if we fail successfully")
something like this?
k

Kevin Kho

06/14/2022, 2:30 PM
I wouldn’t rely on Cancellation, I would just return True/False I think from
late_polling
. And then fail my flow.
🙌 1
s

Sang Young Noh

06/14/2022, 2:46 PM
Hmm ok, I will give it a try. Thank you for the advice
Just another question - no rush on this - I’ve been running deployments of different flows with the same tags
Copy code
tags = ['run_type_1']
which creates a work-queue of the following:
Copy code
WorkQueue(id='0bd02e1e-523a-4151-9d92-ea3d902e8a44', 
          created='27 seconds ago', updated='27 seconds ago', 
          filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
From my interpretation of it, this workqueue should therefore be able to run both flows, correct? I did a preview of the work-queue however, and I only seem to getting the id of a single type of deployment: So just wondering, are the reployments and the tags only supportive of a single flow? Or can it support multiple flows?
k

Kevin Kho

06/14/2022, 3:36 PM
It can support multiple. Can you
inspect
your work queue through the CLI?
s

Sang Young Noh

06/14/2022, 3:37 PM
ok I’ll have a look
k

Kevin Kho

06/14/2022, 3:38 PM
I suspect you specified a deployment in there
s

Sang Young Noh

06/14/2022, 3:41 PM
Running:
Copy code
prefect work-queue ls
I get:
Copy code
0bd02e1e-523a-4151-9d92-ea3d902e8a44 │ work_queue_type_1 │ None  

7b09b84f-ce57-42f2-90c1-a42b1dc16999 │ newworkqueue      │ None
work_queue_type_1 is the one with tags. From that, I do:
Copy code
prefect work-queue inspect 0bd02e1e-523a-4151-9d92-ea3d902e8a44
and I get:
Copy code
WorkQueue(id='0bd02e1e-523a-4151-9d92-ea3d902e8a44', created='6 hours ago', updated='6 hours ago', filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
I think I did something wrong, but its good to hear that work-queues does indeed support multiple deployments
k

Kevin Kho

06/14/2022, 3:47 PM
Yeah that looks good and yes it should support
6 Views