Andy Dienes
05/10/2023, 2:13 PMdef Flow(*flow_args, **flow_kwargs):
# basically the same as @prefect.flow but adds Slack webhook
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
slack_webhook_block = SlackWebhook.load("alerts-prefect")
msg = f"Your job {func.__name__} failed with exception {e}.\n"
msg += f"See the flow run at: <https://prefect.myinstance.net/flow-runs/flow-run/{prt.flow_run.id}>\n"
msg += f"Tags: {prt.flow_run.tags}\n"
msg += f"Scheduled start time = {prt.flow_run.scheduled_start_time}"
slack_webhook_block.notify(msg)
raise e
return flow(*flow_args, **flow_kwargs)(wrapper)
return decorator
it works, but it pings for each retry of the flow. any tips to make my flow ping only once the whole thing fails, not just a retry?Nate
05/10/2023, 3:18 PMon_failure
hook!
from prefect import flow
from prefect.blocks.core import Block
from prefect.server.schemas.core import Flow, FlowRun
from prefect.server.schemas.states import State
from prefect.settings import PREFECT_API_URL
def notify_slack(flow: Flow, flow_run: FlowRun, state: State):
slack_webhook_block = Block.load("slack-webhook/test")
slack_webhook_block.notify(
f"Your job {flow_run.name} entered {state.name} with message:\n\n>{state.message}\n\n"
f"See <https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start time = {flow_run.expected_start_time}\n"
)
@flow(on_failure=[notify_slack], retries=1)
def noisy_flow():
raise ValueError("oops!")
if __name__ == "__main__":
noisy_flow()
10:18:10.817 | INFO | Flow run 'radical-oarfish' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
the flow will enter AwaitingRetry
and then try again before entering Failed
, so it won't run the hook until it enters Failed
at the endAndy Dienes
05/10/2023, 3:24 PMon_failure
does not ping slack. is there a certain version I need to be on? I think I'm currently on 2.10.4Nate
05/12/2023, 1:37 PMAndy Dienes
05/12/2023, 1:38 PMexpected_start_time
to flow_run.scheduled_start_time
, and it doesn't have that field I suppose. this is a little confusing to me, since I had previously been using prefect.runtime.flow_run.scheduled_start_time
without issue.prefect.runtime.flow_run
not return a FlowRun
type?Nate
05/12/2023, 1:43 PMAndy Dienes
05/12/2023, 1:54 PM0 9 * * *
, and I want to see the timestamp it was supposed to run be logged. if I use expected_start_time
then it gives me whatever random timestamp at when the worker actually started, which is usually a few millis off of the correct timestamp, but in the worst case if my pool is unhealthy and the job is late then it can be many hours off of the logical time I wanted@flow(on_failure=[notify_slack], retries=1)
ā ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ā
ā File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread ā
ā return await anyio.to_thread.run_sync( ā
ā ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ā
ā File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 31, in run_sync ā
ā return await get_asynclib().run_sync_in_worker_thread( ā
ā ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ā
ā File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread ā
ā return await future ā
ā ^^^^^^^^^^^^ ā
ā File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 867, in run ā
ā result = context.run(func, *args) ā
ā ^^^^^^^^^^^^^^^^^^^^^^^^ ā
ā File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 703, in _watch_job ā
ā for event in watch.stream( ā
ā File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream ā
ā for line in iter_resp_lines(resp): ā
ā File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines ā
ā for seg in resp.stream(amt=None, decode_content=False): ā
ā File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream ā
ā for line in self.read_chunked(amt, decode_content=decode_content): ā
ā File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked ā
ā with self._error_catcher(): ā
ā File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__ ā
ā self.gen.throw(typ, value, traceback) ā
ā File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher ā
ā raise ProtocolError("Connection broken: %r" % e, e) ā
ā urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
maybe that is causing more retries than intended?Nate
05/12/2023, 3:48 PM