Mac Gréco Péralte Chéry
07/16/2020, 8:09 PMdef notify_on_success(flow: Flow, old_state: State, new_state: State)->State:
if(new_state.is_successful()):
getSmsTextTask=flow.get_tasks(name="Retrieve SMS Text")[0]
smsMessageLocation=new_state.result[getSmsTextTask]._result.location
smsMessages3Result=flow.result.read(smsMessageLocation)
smsMessage=smsMessages3Result.value
scheduledSmsIdTask = flow.get_tasks(name="scheduleSmsId")[0]
scheduleSmsId = new_state.result[scheduledSmsIdTask]._result.value
link="<https://myurl/>"+scheduleSmsId
mailText=build_sms_success_email_notification_text(sms_text=smsMessage,call_to_action_link=link)
send_email(["myemail"],message=mailText,subject="SMS SENDING COMPLETE")
return
(I have to mention that scheduleSmsId is a Parameter)
This result handler works fine in my local machine but once i test it on my EC2 instance i get this error:
Exception raised while calling state handlers: KeyError(<Task: Retrieve SMS Text>,)
Chris White
07/16/2020, 8:55 PMresult
dictionary on the flow state is only populated after the final state transition has occurred (so after this state handler gets called).
If you want to react to the result of an individual task I recommend writing a task state handler insteadMac Gréco Péralte Chéry
07/16/2020, 10:17 PMwith Flow("SMS FLOW",storage=S3(bucket="prefect-bucket"),state_handlers=[slack_notifier(only_states=[Failed]),notify_on_success],result=S3Result(bucket="prefect-flows")) as flow:
scheduleSmsId = Parameter(name="scheduleSmsId", required=True)
sender = Parameter(name="sender", default='xxxxxxx')
connexion=check_db_connection()
smsMessage=get_sms_text(upstream_tasks=[connexion],scheduledSmsId=scheduleSmsId)
smsRecipients=get_sms_recipients(upstream_tasks=[smsMessage],scheduledSmsId=scheduleSmsId)
# Send identified numbers selected message
status = send_twilio_sms.map(smsRecipients,unmapped(smsMessage),unmapped(scheduleSmsId),unmapped(sender))
# Save the results of the SMS message to database
saved_status = save_sms_status(upstream_tasks=[smsMessage],status=status)
I have also another option . Its to create an additionnal task after the save_sms_status task that will send the email to the stakeholders.
# We define a task that send email an to stakeholders once the twillio sms logs are saved to the database (we consider the flow is successful at this point)
notify_sms_was_sent(upstream_tasks=[saved_status],scheduled_sms_id=scheduledSmsId,sms_message=smsMessage)
Chris White
07/16/2020, 10:22 PMMac Gréco Péralte Chéry
07/16/2020, 10:31 PMChris White
07/16/2020, 10:32 PM