Thread
#prefect-community
    Sang Young Noh

    Sang Young Noh

    3 months ago
    Hi all. I’m currently running a deployment with a flow like: .. (sorry, please follow the thread for the original message!)
    Anna Geller

    Anna Geller

    3 months ago
    welcome back Sang 👋 could you move the code blocks to the thread?
    Sang Young Noh

    Sang Young Noh

    3 months ago
    Sure
    Hi all. I’m currently running a deployment with a flow like:
    @flow
    def my_flow():
        ctx: FlowRunContext = get_run_context()
        seconds_late = (
            pendulum.now().diff(ctx.flow_run.expected_start_time, abs=False).in_seconds()
            * -1
        )
    
        if seconds_late > 10:
            return Failed(message=f"Flow started {seconds_late} seconds late.")
    
        my_task()
    so that if a run is past a certain time, automatically return a failure with a message. However, when the cli, when I try to execute a simplified example:
    @flow
    def my_flow():
        ctx: FlowRunContext = get_run_context()
        
        return Failed(message=f"Flow started {seconds_late} seconds late.")
    
        my_task()
    (So the second one should automatically fail) this one seems to run with a successful flow without failing. Is this meant to be happening? So I get something like:
    Loading flow from deployed location...
    Running flow...
    14:04:04.075 | INFO    | prefect.engine - Created flow run 'taupe-coua' for flow 'ng'
    14:04:04.076 | INFO    | Flow run 'taupe-coua' - Using task runner 'ConcurrentTaskRunner'
    14:04:04.355 | INFO    | Flow run 'taupe-coua' - Flow can run - still current
    14:04:04.500 | INFO    | Flow run 'taupe-coua' - Finished in state Completed()
    I was expecting something like Failed() with some error messages
    Anna Geller

    Anna Geller

    3 months ago
    basically, the main message should be a problem description without the code thanks so much 🙏
    Sang Young Noh

    Sang Young Noh

    3 months ago
    Sorry yeah haha
    Yeah, so I’m currently trying to design a controlled failure for a flow, but running
    prefect deployment execute ...
    for this particular flow doesnt seem to reproduce the Failure
    Anna Geller

    Anna Geller

    3 months ago
    execute is equivalent to:
    python yourflow.py
    can you try to describe the problem from a business perspective in your main message? I didn't fully understand yet what you are trying to do
    and for the code, any chance you could make it to a full flow I could reproduce?
    Sang Young Noh

    Sang Young Noh

    3 months ago
    Sure, I will to write
    Anna Geller

    Anna Geller

    3 months ago
    awesome
    Sang Young Noh

    Sang Young Noh

    3 months ago
    def late_polling(ctx, maxlate=datetime.timedelta(hours=12)):
        """
        delibrately kill the flow if the we have gone past the maxlate threshold
        date 
        """
        logger = get_run_logger()   
        return Cancelled(
                message=f"Flow is too late - it went past the max_late threshold."
        )
        if ctx.flow_run.expected_start_time + maxlate < datetime.datetime.now(
        #        tz=pytz.utc):
            return Cancelled(
                message=f"Flow is too late - it went past the max_late threshold."
            )
        else:
            <http://logger.info|logger.info>("Flow can run - still current")
    @flow(name = "ng")
    def ng_daily_polling():
        """flow to call the NG daily polling function"""
        ctx: FlowRunContext = get_run_context()
        # late_polling is the equivalent of the task handler we have made
        late_polling(ctx)
        print("This is a random flow running that should not print if we fail successfully")
    So I’m trying to ensure that the flow doesnt work after a certain threshold of time when deployed, which in the case is after 12 hours or so. I was planning on using a conditional failure return and test it with a random deployment, but I figured that if I removed the time conditional (12 hours) and just returned Failure state immediately, it would at least confirm the logic works out. As you see, I couldn’t quite produce the Failure state
    Anna Geller

    Anna Geller

    3 months ago
    I see - so this is your solution to the problem of: "Run it real-time all the time in a while-loop fashion but only between 8 and 22"?
    Sang Young Noh

    Sang Young Noh

    3 months ago
    yes\
    Well, run it in a schedule, but if we have a deployment that is out of bounds in terms of time (12 hours) then automatically fail
    But again, Im literally failing at failing this haha
    Anna Geller

    Anna Geller

    3 months ago
    let me think about it, I'll get back to you later today or tomorrow thanks for sharing your use case
    Sang Young Noh

    Sang Young Noh

    3 months ago
    Sure no worries
    Kevin Kho

    Kevin Kho

    3 months ago
    If you want a flow to run every hour between 8 and 22, can’t you use a CronSchedule like this?
    0 8-22 * * *
    this is for every hour, but you can change it to the frequency you need.
    Sang Young Noh

    Sang Young Noh

    3 months ago
    No its no quite that, Ive currently got a deployment schedule using rrules, but after its deployed, I wish to make it fail after a certain number of hours (in this case 12) has past after it has been deployed. The deploymentschedule at the moment is:
    FlowSchedule = rrule(
            freq=MINUTELY,
            interval=5,
            dtstart=dt(2020, 1, 1, 8, 31, 0),
            byhour=range(8, 9),
            byminute=range(30, 52),
        )
    
        return DeploymentSpec(
            flow=ng_daily_polling,
            name=f"test_2",
            schedule=RRuleSchedule.from_rrule(FlowSchedule),
            tags=deploy_tags,
            # parameters=parameters,
            flow_runner=SubprocessFlowRunner(),
        )
    I may not have explained the problem properly so my apologies on my side
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah you want a Deployment to only last for 12 hours? Or you really want to fail flows?
    Sang Young Noh

    Sang Young Noh

    3 months ago
    So, I wish to bake in every flow in a deployment that if for some reason, if it is delayed beyond 12 hours, then even if it runs fail it
    Kevin Kho

    Kevin Kho

    3 months ago
    Oh delayed? I understand now. Then I think your code above is how I’d do the same. Why does it fail? If it’s because of the Cancellation:
    return Cancelled(
                message=f"Flow is too late - it went past the max_late threshold."
        )
    Instead you can do something like:
    @task
    def fail_task():
        raise TimeoutError()
    
    @flow
    def myflow():
         fail_task().result() # will be raised and end execution
    maybe?
    Sang Young Noh

    Sang Young Noh

    3 months ago
    So after the state of ‘cancelled’ has been returned, if I call upin the fail_task().result(), the flow should be cancelled? So I am thinking:
    @flow(name = "ng")
    def ng_daily_polling():
        """flow to call the NG daily polling function"""
        ctx: FlowRunContext = get_run_context()
        # late_polling is the equivalent of the task handler we have made
    
        if late_polling(ctx):
           fail_task().result() 
        print("This is a random flow running that should not print if we fail successfully")
    something like this?
    Kevin Kho

    Kevin Kho

    3 months ago
    I wouldn’t rely on Cancellation, I would just return True/False I think from
    late_polling
    . And then fail my flow.
    Sang Young Noh

    Sang Young Noh

    3 months ago
    Hmm ok, I will give it a try. Thank you for the advice
    Just another question - no rush on this - I’ve been running deployments of different flows with the same tags
    tags = ['run_type_1']
    which creates a work-queue of the following:
    WorkQueue(id='0bd02e1e-523a-4151-9d92-ea3d902e8a44', 
              created='27 seconds ago', updated='27 seconds ago', 
              filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
    From my interpretation of it, this workqueue should therefore be able to run both flows, correct? I did a preview of the work-queue however, and I only seem to getting the id of a single type of deployment: So just wondering, are the reployments and the tags only supportive of a single flow? Or can it support multiple flows?
    Kevin Kho

    Kevin Kho

    3 months ago
    It can support multiple. Can you
    inspect
    your work queue through the CLI?
    Sang Young Noh

    Sang Young Noh

    3 months ago
    ok I’ll have a look
    Kevin Kho

    Kevin Kho

    3 months ago
    I suspect you specified a deployment in there
    Sang Young Noh

    Sang Young Noh

    3 months ago
    Running:
    prefect work-queue ls
    I get:
    0bd02e1e-523a-4151-9d92-ea3d902e8a44 │ work_queue_type_1 │ None  
    
    7b09b84f-ce57-42f2-90c1-a42b1dc16999 │ newworkqueue      │ None
    work_queue_type_1 is the one with tags. From that, I do:
    prefect work-queue inspect 0bd02e1e-523a-4151-9d92-ea3d902e8a44
    and I get:
    WorkQueue(id='0bd02e1e-523a-4151-9d92-ea3d902e8a44', created='6 hours ago', updated='6 hours ago', filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
    I think I did something wrong, but its good to hear that work-queues does indeed support multiple deployments
    Kevin Kho

    Kevin Kho

    3 months ago
    Yeah that looks good and yes it should support