Sang Young Noh
06/14/2022, 1:06 PMAnna Geller
06/14/2022, 1:11 PMSang Young Noh
06/14/2022, 1:11 PM@flow
def my_flow():
ctx: FlowRunContext = get_run_context()
seconds_late = (
pendulum.now().diff(ctx.flow_run.expected_start_time, abs=False).in_seconds()
* -1
)
if seconds_late > 10:
return Failed(message=f"Flow started {seconds_late} seconds late.")
my_task()
so that if a run is past a certain time, automatically return a failure with a message. However, when the cli, when I try to execute a simplified example:
@flow
def my_flow():
ctx: FlowRunContext = get_run_context()
return Failed(message=f"Flow started {seconds_late} seconds late.")
my_task()
(So the second one should automatically fail) this one seems to run with a successful flow without failing. Is this meant to be happening? So I get something like:
Loading flow from deployed location...
Running flow...
14:04:04.075 | INFO | prefect.engine - Created flow run 'taupe-coua' for flow 'ng'
14:04:04.076 | INFO | Flow run 'taupe-coua' - Using task runner 'ConcurrentTaskRunner'
14:04:04.355 | INFO | Flow run 'taupe-coua' - Flow can run - still current
14:04:04.500 | INFO | Flow run 'taupe-coua' - Finished in state Completed()
I was expecting something like Failed() with some error messagesAnna Geller
06/14/2022, 1:12 PMSang Young Noh
06/14/2022, 1:13 PMprefect deployment execute ...
for this particular flow doesnt seem to reproduce the FailureAnna Geller
06/14/2022, 1:14 PMpython yourflow.py
Sang Young Noh
06/14/2022, 1:16 PMAnna Geller
06/14/2022, 1:16 PMSang Young Noh
06/14/2022, 1:20 PMdef late_polling(ctx, maxlate=datetime.timedelta(hours=12)):
"""
delibrately kill the flow if the we have gone past the maxlate threshold
date
"""
logger = get_run_logger()
return Cancelled(
message=f"Flow is too late - it went past the max_late threshold."
)
if ctx.flow_run.expected_start_time + maxlate < datetime.datetime.now(
# tz=pytz.utc):
return Cancelled(
message=f"Flow is too late - it went past the max_late threshold."
)
else:
<http://logger.info|logger.info>("Flow can run - still current")
@flow(name = "ng")
def ng_daily_polling():
"""flow to call the NG daily polling function"""
ctx: FlowRunContext = get_run_context()
# late_polling is the equivalent of the task handler we have made
late_polling(ctx)
print("This is a random flow running that should not print if we fail successfully")
So I’m trying to ensure that the flow doesnt work after a certain threshold of time when deployed, which in the case is after 12 hours or so. I was planning on using a conditional failure return and test it with a random deployment, but I figured that if I removed the time conditional (12 hours) and just returned Failure state immediately, it would at least confirm the logic works out. As you see, I couldn’t quite produce the Failure stateAnna Geller
06/14/2022, 1:27 PMSang Young Noh
06/14/2022, 1:36 PMAnna Geller
06/14/2022, 1:37 PMSang Young Noh
06/14/2022, 1:38 PMKevin Kho
06/14/2022, 1:46 PM0 8-22 * * *
this is for every hour, but you can change it to the frequency you need.Sang Young Noh
06/14/2022, 1:50 PMFlowSchedule = rrule(
freq=MINUTELY,
interval=5,
dtstart=dt(2020, 1, 1, 8, 31, 0),
byhour=range(8, 9),
byminute=range(30, 52),
)
return DeploymentSpec(
flow=ng_daily_polling,
name=f"test_2",
schedule=RRuleSchedule.from_rrule(FlowSchedule),
tags=deploy_tags,
# parameters=parameters,
flow_runner=SubprocessFlowRunner(),
)
Kevin Kho
06/14/2022, 2:04 PMSang Young Noh
06/14/2022, 2:06 PMKevin Kho
06/14/2022, 2:10 PMreturn Cancelled(
message=f"Flow is too late - it went past the max_late threshold."
)
Instead you can do something like:
@task
def fail_task():
raise TimeoutError()
@flow
def myflow():
fail_task().result() # will be raised and end execution
maybe?Sang Young Noh
06/14/2022, 2:13 PM@flow(name = "ng")
def ng_daily_polling():
"""flow to call the NG daily polling function"""
ctx: FlowRunContext = get_run_context()
# late_polling is the equivalent of the task handler we have made
if late_polling(ctx):
fail_task().result()
print("This is a random flow running that should not print if we fail successfully")
something like this?Kevin Kho
06/14/2022, 2:30 PMlate_polling
. And then fail my flow.Sang Young Noh
06/14/2022, 2:46 PMtags = ['run_type_1']
which creates a work-queue of the following:
WorkQueue(id='0bd02e1e-523a-4151-9d92-ea3d902e8a44',
created='27 seconds ago', updated='27 seconds ago',
filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
From my interpretation of it, this workqueue should therefore be able to run both flows, correct? I did a preview of the work-queue however, and I only seem to getting the id of a single type of deployment:
So just wondering, are the reployments and the tags only supportive of a single flow? Or can it support multiple flows?Kevin Kho
06/14/2022, 3:36 PMinspect
your work queue through the CLI?Sang Young Noh
06/14/2022, 3:37 PMKevin Kho
06/14/2022, 3:38 PMSang Young Noh
06/14/2022, 3:41 PMprefect work-queue ls
I get:
0bd02e1e-523a-4151-9d92-ea3d902e8a44 │ work_queue_type_1 │ None
7b09b84f-ce57-42f2-90c1-a42b1dc16999 │ newworkqueue │ None
work_queue_type_1 is the one with tags. From that, I do:
prefect work-queue inspect 0bd02e1e-523a-4151-9d92-ea3d902e8a44
and I get:
WorkQueue(id='0bd02e1e-523a-4151-9d92-ea3d902e8a44', created='6 hours ago', updated='6 hours ago', filter=QueueFilter(tags=['run_type_1']), name='work_queue_type_1')
Kevin Kho
06/14/2022, 3:47 PM