Kyle Combs
07/14/2020, 9:15 PMZachary Hughes
07/14/2020, 9:43 PMJim Crist-Harif
07/14/2020, 10:09 PMimport prefect
from prefect import task, Flow, Parameter
def handler(task, old, new):
print(prefect.context["parameters"])
@task(state_handlers=[handler])
def inc(x):
return x + 1
with Flow('test', state_handlers=[handler]) as flow:
a = Parameter('a')
b = inc(a)
flow.run(a=1)
$python test.py
[2020-07-14 22:09:10] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-07-14 22:09:10] INFO - prefect.FlowRunner | Starting flow run.
{'a': 1}
[2020-07-14 22:09:10] INFO - prefect.TaskRunner | Task 'a': Starting task run...
[2020-07-14 22:09:10] INFO - prefect.TaskRunner | Task 'a': finished task run for task with final state: 'Success'
[2020-07-14 22:09:10] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
{'a': 1}
{'a': 1}
[2020-07-14 22:09:10] INFO - prefect.TaskRunner | Task 'inc': finished task run for task with final state: 'Success'
[2020-07-14 22:09:10] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
{'a': 1}
Kyle Combs
07/15/2020, 1:05 PMdef jobFailed(task, old_state, new_state):
#grabbing email parameter
emails = prefect.context.get("parameters", {}).get(stakeholderEmail).split(", ")
print("EMAIL",emails)
with Flow("ComputationWS", state_handlers=[jobFailed]) as flow:
#stakeholder = Parameter('stakeholder', default="John Doe")
stakeholderEmail = Parameter("Emails separated by comma space", default = "<mailto:kyle.combs@email.com|kyle.combs@email.com>, <mailto:nick.he@email.com|nick.he@email.com>
(base) $ python prefectComputeWS.py
[2020-07-15 13:07:06] INFO - prefect.FlowRunner | Beginning Flow run for 'ComputationWS'
[2020-07-15 13:07:06] INFO - prefect.FlowRunner | Starting flow run.
[2020-07-15 13:07:06] ERROR - prefect.FlowRunner | Unexpected error while calling state handlers: AttributeError("'NoneType' object has no attribute 'split'")
Jim Crist-Harif
07/15/2020, 1:09 PMstakeholderEmail
, instead you want to index with the string name of the parameter ("Emails separated by comma space"
)def jobFailed(task, old_state, new_state):
#grabbing email parameter
emails = prefect.context["parameters"][string_name_of_your_parameter].split(", ")
print("EMAIL",emails)
Parameter
names to be some short memorable name (like "stakeholder_emails"
), not a description of their purpose. There's nothing wrong with using a longer name of course.Kyle Combs
07/15/2020, 1:22 PMemails = prefect.context["parameters"]["stakeholderEmail"].split(", ")
KeyError: 'parameters'
[2020-07-15 13:19:15] ERROR - prefect.ComputationWS | Unexpected error occured in FlowRunner: KeyError('parameters')
with Flow("ComputationWS", state_handlers=[jobFailed]) as flow:
stakeholderEmail = Parameter("stakeholderEmail", default = "kyle.combs@email, nick.he@email")
Jim Crist-Harif
07/15/2020, 1:28 PMimport prefect
from prefect import task, Flow, Parameter
def handler(task, old, new):
emails = prefect.context["parameters"]["stakeholder_emails"]
print(f"Emails are {emails}")
@task(state_handlers=[handler])
def echo(x):
return print(x)
with Flow('test', state_handlers=[handler]) as flow:
stakeholder_emails = Parameter('stakeholder_emails', default="<mailto:alice@google.com|alice@google.com>")
b = echo(stakeholder_emails)
flow.run()
$ python test.py
[2020-07-15 13:28:17] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-07-15 13:28:17] INFO - prefect.FlowRunner | Starting flow run.
Emails are <mailto:alice@google.com|alice@google.com>
[2020-07-15 13:28:17] INFO - prefect.TaskRunner | Task 'stakeholder_emails': Starting task run...
[2020-07-15 13:28:18] INFO - prefect.TaskRunner | Task 'stakeholder_emails': finished task run for task with final state: 'Success'
[2020-07-15 13:28:18] INFO - prefect.TaskRunner | Task 'echo': Starting task run...
Emails are <mailto:alice@google.com|alice@google.com>
<mailto:alice@google.com|alice@google.com>
Emails are <mailto:alice@google.com|alice@google.com>
[2020-07-15 13:28:18] INFO - prefect.TaskRunner | Task 'echo': finished task run for task with final state: 'Success'
[2020-07-15 13:28:18] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Emails are <mailto:alice@google.com|alice@google.com>
flow.add_task(parameter)
to add it to your flow manually.Kyle Combs
07/15/2020, 1:51 PM#State handler to notify is flow has failed
def jobFailed(task, old_state, new_state):
#grabbing email parameter
emails = prefect.context["parameters"]["stakeholder_emails"]
print("EMAIL",emails)
with Flow("ComputationWS", state_handlers=[jobFailed]) as flow:
stakeholderEmail = Parameter("stakeholder_emails", default = "kyle.combs@email, nick.he@email)
rundate = Parameter('rundate', default ='date')
datadir = Parameter('datadir', default = 'xyz')
ref = Parameter('ref', default = 'abc')
refpb = Parameter('refpb',default = 'adx')
callComp = callComputeService(rundate,datadir,ref,refpb)
checkStat = checkStatus(callComp)
buildResults = createTable(checkStat[0],checkStat[1], stakeholderEmail)
I pass in the stakeholderEmail parameter when I call my createTable function at the bottom of the code block.
As before, I am still getting the following key error:
File "prefectComputeWS.py", line 143, in jobFailed
emails = prefect.context["parameters"]["stakeholder_emails"]
File "/home/user/anaconda3/lib/python3.7/site-packages/prefect/utilities/collections.py", line 79, in __getitem__
return self.__dict__[key] # __dict__ expects string keys
KeyError: 'parameters'
Sorry for the confusion!Jim Crist-Harif
07/15/2020, 1:57 PMimport prefect
from prefect import Flow, task, Parameter
# State handler to notify is flow has failed
def jobFailed(task, old_state, new_state):
# grabbing email parameter
emails = prefect.context["parameters"]["stakeholder_emails"]
print("EMAIL",emails)
@task
def callComputeService(rundate, datadir, ref, refpb):
pass
@task
def checkStatus(callComp):
return (0, 1) # fake data
@task
def createTable(checkStat0, checkStat1, stakeholderEmail):
pass
with Flow("ComputationWS", state_handlers=[jobFailed]) as flow:
stakeholderEmail = Parameter("stakeholder_emails", default = "kyle.combs@email, nick.he@email")
rundate = Parameter('rundate', default ='date')
datadir = Parameter('datadir', default = 'xyz')
ref = Parameter('ref', default = 'abc')
refpb = Parameter('refpb',default = 'adx')
callComp = callComputeService(rundate,datadir,ref,refpb)
checkStat = checkStatus(callComp)
buildResults = createTable(checkStat[0],checkStat[1], stakeholderEmail)
flow.run()
$ python test.py
[2020-07-15 13:58:10] INFO - prefect.FlowRunner | Beginning Flow run for 'ComputationWS'
[2020-07-15 13:58:10] INFO - prefect.FlowRunner | Starting flow run.
EMAIL kyle.combs@email, nick.he@email
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'ref': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'ref': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'rundate': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'rundate': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'datadir': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'datadir': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'refpb': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'refpb': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'stakeholder_emails': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'stakeholder_emails': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'callComputeService': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'callComputeService': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'checkStatus': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'checkStatus': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'createTable': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'createTable': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
EMAIL kyle.combs@email, nick.he@email
Kyle Combs
07/15/2020, 1:59 PMJim Crist-Harif
07/15/2020, 2:00 PMKyle Combs
07/15/2020, 2:09 PMJim Crist-Harif
07/15/2020, 2:09 PM