Hi guys - I'm running into this error while using ...
# prefect-cloud
z
Hi guys - I'm running into this error while using prefect cloud automation to send slack msgs on different states of flow run:
Copy code
"The notification block was invalid: KeyError(\"No class found for dispatch key 'slack-incoming-webhook' in registry for type 'Block'.\")"
Has anyone run into this as well ? Below is the full log:
Copy code
{
  "id": "77b5b58d-e9fa-4fff-bb5b-5c2157314dba",
  "account": "94a1563f-c82c-4049-854c-df6f4547b850",
  "event": "prefect-cloud.automation.action.failed",
  "occurred": "2023-05-29T21:52:20.401Z",
  "payload": {
    "reason": "The notification block was invalid: KeyError(\"No class found for dispatch key 'slack-incoming-webhook' in registry for type 'Block'.\")",
    "invocation": "8ff1536d-55b2-4170-9af4-10d6dd13243f",
    "action_type": "send-notification",
    "action_index": 0
  },
  "received": "2023-05-29T21:52:20.401Z",
  "related": [],
  "resource": {
    "prefect.resource.id": "prefect-cloud.automation.fcac6a63-01bb-4ade-94f0-e27664583542",
    "prefect-cloud.posture": "Reactive",
    "prefect.resource.name": "prod-wf-state"
  },
  "workspace": "1e3683bb-8412-4b7f-a2a8-c858d613af44"
}
1
c
Hi @Zhang David thanks for your question! We've had a bit of confusion around this, sorry you ran into this. There are currently two Slack Webhooks: 1. The
prefect.blocks.notifications.SlackWebhook
from the core
prefect
library (https://github.com/PrefectHQ/prefect/blob/main/src/prefect/blocks/notifications.py#L78) which has the slug
slack-webhook
2. The
prefect_slack.credentials.SlackWebhook
from the
prefect-slack
collection (https://github.com/PrefectHQ/prefect-slack/blob/main/prefect_slack/credentials.py#L51) which has the slug
slack-incoming-webhook
For Prefect Cloud automations, we currently only support notification blocks that come from the core
prefect
library. We aim to eventually support additional blocks coming from collections libraries, but for now that would mean that you can only use the
slack-webhook
with automations. Very sorry for this confusion, we do have an internal issue to improve this situation.
@Luis Cebrián, I believe you encountered this same issue ^
z
@Chris Guidry thank you for the quick response ! so is
slack-webhook
a separate app under Blocks if I were to use the cloud GUI ?
c
Yes it's the one titled
Slack Webhook
(and not the one titled
Slack Incoming Webhook
) :
👍 1
l
Got it. Thanks @Chris Guidry for pinging me.
z
thank you ! let me try that !
c
Excellent, thanks for your patience!
z
@Chris Guidry on a separate topic, and apology for digression - I'm trying to setup my production workflow on prefect entirely. one thing i have yet worked out is: Say I have to Flows A and B, where A is scheduled to run every 30minutes, and B is scheduled to run every hour, but B depends on the successful completion of A. Now in prefect 2.0, how do I deploy the flows so that both the scheduling and dependency are honored ?
Appreciate if you can share some insight !
c
Great questions! I've got a few ideas, but they may be more sophisticated ways to do this 😄 I'll make sure some other folks see this as well to help bring more ideas. First, the scheduling part should be pretty natural with Prefect 2.0: you'd create two deployments from the two flows and attach their half-hourly and hourly schedules directly to the deployments. That will get you the timing you're looking for. Now the dependency part is a little more involved. You should start by deciding what the dependency actually is in detail. Here are some ideas: Does Flow A produce an artifact when it runs and you want to make sure that Flow B is seeing the latest one of those artifacts? You can use Prefect to create artifacts during your flows, and then query them back in Flow B to find whether there's been a new version of that artifact. You may need to do a little bookkeeping on your depending on what the artifact(s) are. If the hourly timing of Flow B is flexible, maybe remove Flow B's schedule and create a new higher-level flow that runs Flow A and then Flow B as subflows? It sounds like you are a Prefect Cloud user (thank you!), so you may also be able to configure an Automation that does something like: "after Flow A has Completed 2 times within 90 minutes, run Flow B". The way you'd spell that is something like:
Copy code
{
  "match_related": {
    "prefect.resource.name": "name of deployment A",
    "prefect.resource.role": "deployment"
  },
  "expect": [
    "prefect.flow-run.Completed"
  ],
  "threshold": 2,
  "within": 5400,
  "posture": "Reactive",
  "actions": [
    {
      "type": "run-deployment",
      "source": "selected",
      "deployment_id": "...id of flow b deployment"
    }
  ]
}
This is saying: "expect 2 prefect.flow-run.Completed events for deployment A within 90 minutes, then as soon as you've seen the second event, run deployment B"
@Jeff Hale Can you think of other ways @Zhang David can accomplish this? I think there are a lot of paths to take
z
@Chris Guidry really appreciate the fast reply and suggestions !
First, the scheduling part should be pretty natural with Prefect 2.0: you'd create two deployments from the two flows and attach their half-hourly and hourly schedules directly to the deployments. That will get you the timing you're looking for.
yes ! this I'm currently doing it via the yaml file
Does Flow A produce an artifact when it runs and you want to make sure that Flow B is seeing the latest one of those artifacts? You can use Prefect to create artifacts during your flows, and then query them back in Flow B to find whether there's been a new version of that artifact. You may need to do a little bookkeeping on your depending on what the artifact(s) are.
This is very interesting ! so for my actual production use case, for each task, I would on my end take care of all the checkpointing etc to make sure when the job completes, everything is indeed completed, and I'm using prefect task/flow (well task in this case) as a wrapper so that I delegate the workflow scheduling to prefect. are you suggesting something like writing out a success token as an artifact so that when B gets triggered even though now is say 10:00, if in case A takes > 30minutes to run, B would wait until A completes and then run ? that's exactly what I'm trying to achieve, i.e. if A takes more than it should (in this example > 30minutes) to finish, B would wait until A completes then run, instead of just run at 100000
c
I think that approach would work. The only tricky thing is making B wait for the lastest run of A to complete. I think you'd be able to do that by polling the API for an Artifact or the flow run's status. It does sound like you might want a higher-level flow that coordinates Flow A and Flow B as subflows to make sure the dependencies are met.
z
I think that approach would work. The only tricky thing is making B wait for the lastest run of A to complete. I think you'd be able to do that by polling the API for an Artifact or the flow run's status.
ha - i was just typing to ask if there's a
wait_for
kinda interface exposed... looks like no ?
It does sound like you might want a higher-level flow that coordinates Flow A and Flow B as subflows to make sure the dependencies are met
yes - I thought about this, but my question is for the example below:
Copy code
@flow
def run_A():
    pass

@flow  
def run_B():
    pass 

@flow 
def run_C():
   a = run_A()
   b = run_B(wait_for=[a])
how can I
prefect deployment apply
A and B w/ two different schedules ?
If I naively do 2 separate applies on
run_A
and
run_B
, when I run C, wouldn't A be run twice?
c
Ah okay, so one thing that may not have been clear is that each flow may have as many deployments as you like, each with a different schedule. However, I'm not sure that entirely covers what you'd like to do. I think if you can make it so that Flow B is able to exit early if it doesn't have the prerequisites it needs, and perhaps also reschedule itself to run again in a few minutes, that may be an option. I have to admit, I'm not as familiar with the options you have for managing dependencies across time like this as my other colleagues, so take what I'm saying with a grain of salt 😄
👍 1
z
thank you! really appreciate the help ! since this is a universal pattern of all my WFs, so I would hope to figure out a systematic way. btw I picked prefect for two reasons (over airflow): 1 is that its definitely more pythonic 2. most importantly, I've been a long long term Ray user, so it was actually Ray/Anyscale ppl whol recommended you guys 🙂 Coming back to this topic - is there are docs/readings that you can point me to on this specific flow deployment practice? or better yet, do you think you can point me to your other colleagues who might have additional insights on this ? thanks again!
c
Sure! I think the section on Composing flows is very good to get you started, and then the docs on Final state determination give you a lot of options for what you might use to communicate from Flow A to Flow B. It sounds like you're pretty comfortable with the scheduling side of things. If you're a Cloud user, give Automations a look as well. They might help to bridge some of the dependencies-over-time gaps that you're trying to fill here. The example trigger I gave above is a good starting point.
👍 1
z
Thank you!!