https://prefect.io logo
#prefect-community
Title
# prefect-community
k

Kyle Combs

07/14/2020, 9:15 PM
Hello everyone. I am trying to access a Parameter from within a state handler using prefect.context.get("parameters", {}).get(stakeholderEmail). When I run my code I get no errors, but it does result in my variable containing the emails to be a nonetype. Is my approach the proper method for retrieving a parameter from inside a state handler, or is there a better way to do this?
z

Zachary Hughes

07/14/2020, 9:43 PM
Hi @Kyle Combs! Asking around with the team for best practices, will be back with an answer as soon as I have one!
j

Jim Crist-Harif

07/14/2020, 10:09 PM
Hi @Kyle Combs, can you provide a reproducible example? I'm unable to reproduce:
Copy code
import prefect
from prefect import task, Flow, Parameter

def handler(task, old, new):
    print(prefect.context["parameters"])


@task(state_handlers=[handler])
def inc(x):
    return x + 1


with Flow('test', state_handlers=[handler]) as flow:
    a = Parameter('a')
    b = inc(a)

flow.run(a=1)
Copy code
$python test.py
[2020-07-14 22:09:10] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-07-14 22:09:10] INFO - prefect.FlowRunner | Starting flow run.
{'a': 1}
[2020-07-14 22:09:10] INFO - prefect.TaskRunner | Task 'a': Starting task run...
[2020-07-14 22:09:10] INFO - prefect.TaskRunner | Task 'a': finished task run for task with final state: 'Success'
[2020-07-14 22:09:10] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
{'a': 1}
{'a': 1}
[2020-07-14 22:09:10] INFO - prefect.TaskRunner | Task 'inc': finished task run for task with final state: 'Success'
[2020-07-14 22:09:10] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
{'a': 1}
k

Kyle Combs

07/15/2020, 1:05 PM
Copy code
def jobFailed(task, old_state, new_state):
    #grabbing email parameter
    emails = prefect.context.get("parameters", {}).get(stakeholderEmail).split(", ")
    print("EMAIL",emails)
Copy code
with Flow("ComputationWS", state_handlers=[jobFailed]) as flow:
    #stakeholder = Parameter('stakeholder', default="John Doe")
    stakeholderEmail = Parameter("Emails separated by comma space", default = "<mailto:kyle.combs@email.com|kyle.combs@email.com>, <mailto:nick.he@email.com|nick.he@email.com>
Copy code
(base) $ python prefectComputeWS.py
[2020-07-15 13:07:06] INFO - prefect.FlowRunner | Beginning Flow run for 'ComputationWS'
[2020-07-15 13:07:06] INFO - prefect.FlowRunner | Starting flow run.
[2020-07-15 13:07:06] ERROR - prefect.FlowRunner | Unexpected error while calling state handlers: AttributeError("'NoneType' object has no attribute 'split'")
j

Jim Crist-Harif

07/15/2020, 1:09 PM
Looks like you're indexing with the parameter object
stakeholderEmail
, instead you want to index with the string name of the parameter (
"Emails separated by comma space"
)
Copy code
def jobFailed(task, old_state, new_state):
    #grabbing email parameter
    emails = prefect.context["parameters"][string_name_of_your_parameter].split(", ")
    print("EMAIL",emails)
Generally you want
Parameter
names to be some short memorable name (like
"stakeholder_emails"
), not a description of their purpose. There's nothing wrong with using a longer name of course.
k

Kyle Combs

07/15/2020, 1:22 PM
Hi, I altered my code and changed the string name of the parameter to be stakeholderEmail. I tried running my code again and am now getting the following error
Copy code
emails = prefect.context["parameters"]["stakeholderEmail"].split(", ")

KeyError: 'parameters'
[2020-07-15 13:19:15] ERROR - prefect.ComputationWS | Unexpected error occured in FlowRunner: KeyError('parameters')
Copy code
with Flow("ComputationWS", state_handlers=[jobFailed]) as flow:
    stakeholderEmail = Parameter("stakeholderEmail", default = "kyle.combs@email, nick.he@email")
j

Jim Crist-Harif

07/15/2020, 1:28 PM
Copy code
import prefect
from prefect import task, Flow, Parameter

def handler(task, old, new):
    emails = prefect.context["parameters"]["stakeholder_emails"]
    print(f"Emails are {emails}")


@task(state_handlers=[handler])
def echo(x):
    return print(x)


with Flow('test', state_handlers=[handler]) as flow:
    stakeholder_emails = Parameter('stakeholder_emails', default="<mailto:alice@google.com|alice@google.com>")
    b = echo(stakeholder_emails)

flow.run()
Copy code
$ python test.py
[2020-07-15 13:28:17] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-07-15 13:28:17] INFO - prefect.FlowRunner | Starting flow run.
Emails are <mailto:alice@google.com|alice@google.com>
[2020-07-15 13:28:17] INFO - prefect.TaskRunner | Task 'stakeholder_emails': Starting task run...
[2020-07-15 13:28:18] INFO - prefect.TaskRunner | Task 'stakeholder_emails': finished task run for task with final state: 'Success'
[2020-07-15 13:28:18] INFO - prefect.TaskRunner | Task 'echo': Starting task run...
Emails are <mailto:alice@google.com|alice@google.com>
<mailto:alice@google.com|alice@google.com>
Emails are <mailto:alice@google.com|alice@google.com>
[2020-07-15 13:28:18] INFO - prefect.TaskRunner | Task 'echo': finished task run for task with final state: 'Success'
[2020-07-15 13:28:18] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Emails are <mailto:alice@google.com|alice@google.com>
Since your parameter was never used in your flow, it wasn't actually added to the flow. You either need to pass the parameter to a task, or use
flow.add_task(parameter)
to add it to your flow manually.
k

Kyle Combs

07/15/2020, 1:51 PM
Hi, sorry but I am still unable to get this to work. I end up eventually passing the parameter to a task in my flow. Here is the entirety of it:
Copy code
#State handler to notify is flow has failed
def jobFailed(task, old_state, new_state):
    #grabbing email parameter
    emails = prefect.context["parameters"]["stakeholder_emails"]
    print("EMAIL",emails)

with Flow("ComputationWS", state_handlers=[jobFailed]) as flow:
    stakeholderEmail = Parameter("stakeholder_emails", default = "kyle.combs@email, nick.he@email)
    
    rundate = Parameter('rundate', default ='date')
    datadir = Parameter('datadir', default = 'xyz')
    ref = Parameter('ref', default = 'abc')
    refpb = Parameter('refpb',default = 'adx')

    callComp = callComputeService(rundate,datadir,ref,refpb)
    checkStat = checkStatus(callComp)
    buildResults = createTable(checkStat[0],checkStat[1], stakeholderEmail)
I pass in the stakeholderEmail parameter when I call my createTable function at the bottom of the code block. As before, I am still getting the following key error:
Copy code
File "prefectComputeWS.py", line 143, in jobFailed
    emails = prefect.context["parameters"]["stakeholder_emails"]
  File "/home/user/anaconda3/lib/python3.7/site-packages/prefect/utilities/collections.py", line 79, in __getitem__
    return self.__dict__[key]  # __dict__ expects string keys
KeyError: 'parameters'
Sorry for the confusion!
j

Jim Crist-Harif

07/15/2020, 1:57 PM
Copy code
import prefect
from prefect import Flow, task, Parameter

# State handler to notify is flow has failed
def jobFailed(task, old_state, new_state):
    # grabbing email parameter
    emails = prefect.context["parameters"]["stakeholder_emails"]
    print("EMAIL",emails)


@task
def callComputeService(rundate, datadir, ref, refpb):
    pass


@task
def checkStatus(callComp):
    return (0, 1)  # fake data


@task
def createTable(checkStat0, checkStat1, stakeholderEmail):
    pass


with Flow("ComputationWS", state_handlers=[jobFailed]) as flow:
    stakeholderEmail = Parameter("stakeholder_emails", default = "kyle.combs@email, nick.he@email")
    rundate = Parameter('rundate', default ='date')
    datadir = Parameter('datadir', default = 'xyz')
    ref = Parameter('ref', default = 'abc')
    refpb = Parameter('refpb',default = 'adx')
    callComp = callComputeService(rundate,datadir,ref,refpb)
    checkStat = checkStatus(callComp)
    buildResults = createTable(checkStat[0],checkStat[1], stakeholderEmail)


flow.run()
Copy code
$ python test.py
[2020-07-15 13:58:10] INFO - prefect.FlowRunner | Beginning Flow run for 'ComputationWS'
[2020-07-15 13:58:10] INFO - prefect.FlowRunner | Starting flow run.
EMAIL kyle.combs@email, nick.he@email
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'ref': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'ref': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'rundate': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'rundate': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'datadir': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'datadir': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'refpb': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'refpb': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'stakeholder_emails': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'stakeholder_emails': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'callComputeService': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'callComputeService': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'checkStatus': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'checkStatus': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'createTable': Starting task run...
[2020-07-15 13:58:10] INFO - prefect.TaskRunner | Task 'createTable': finished task run for task with final state: 'Success'
[2020-07-15 13:58:10] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
EMAIL kyle.combs@email, nick.he@email
What version of prefect are you using?
k

Kyle Combs

07/15/2020, 1:59 PM
0.11.4
j

Jim Crist-Harif

07/15/2020, 2:00 PM
Try upgrading to the most recent release perhaps. The latest release is 0.12.4.
k

Kyle Combs

07/15/2020, 2:09 PM
That was it. Thank you for all of your help!
j

Jim Crist-Harif

07/15/2020, 2:09 PM
No problem, glad you got things working!