Sagun Garg
01/04/2021, 3:55 AMwiretrack
01/04/2021, 4:00 AMSagun Garg
01/04/2021, 7:46 AMfrom prefect import task, Flow
from prefect.tasks.prefect import StartFlowRun
import json
# Event Driven Flow Calling using GraphQL API of prefect
# Resource 1: <https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a>
class PrefectClientException(Exception):
pass
class PrefectClient(object):
def __init__(self, api_url, api_token):
self.__api_url = api_url
self.__api_token = api_token #you can reuse runner token here as well for GraphQL API Client
def get_flowId(self,input):
# State message for this task are returned in the format:
# <<flow run id>> finished in state << state >>
# so we can grab the id easily from that message
id = input.state.message.split(" ", 1)[0]
print(id)
return id
def execute_prefectflow(self,event, context, flow_name, project_name):
print("Received event: " + json.dumps(event, indent=2))
print("Received context: " + json.dumps(context, indent=2))
print("For Project:", project_name)
print("For Flow:", flow_name)
## prep the data
create_mutation = """
mutation($input: createFlowRunInput!){
createFlowRun(input: $input){
flow_run{
id
}
}
}
"""
task_duration = """
query TaskRunDuration {
flow_run(where: { flow_id: {_eq: "your_flow_id"}, state: {_eq: "Success"} }) {
id
name
task_runs(where: {task_id: {_eq: "your_task_id"}}) {
start_time
end_time
id
}
}
}
"""
a = StartFlowRun(project_name=project_name, wait=True,)(flow_name=flow_name)
flowId = get_flowId(a)
# Indicate the flowId based on flow_name by th
inputs = dict(flowId=os.getenv(flowId))
# pass the full event
# if you wish to pass information about the triggering event as a parameter,
# simply add that to the inputs dictionary under the parameters key,
# whose value should be a dictionary of PARAMETER_NAME -> PARAMETER_VALUE
inputs['parameters'] = dict(event=event)
variables = dict(input=inputs)
data = json.dumps(
dict(query=create_mutation, variables=json.dumps(variables))
).encode("utf-8")
## prep the request
req = urllib.request.Request(os.getenv(__api_url), data=data)
req.add_header("Content-Type", "application/json")
req.add_header(
"Authorization", "Bearer {}".format(os.getenv(__api_token))
)
## send the request and return the response
resp = urllib.request.urlopen(req)
return json.loads(resp.read().decode())
prefect_api_client = PrefectClient("<https://api.prefect.io>", "PREFECT_CLOUD_RUNNER_API_KEY")
status = prefect_api_client.execute_prefectflow(event,context,flow_name,project_name)
print('Results returned after triggerring the flow', status)
Sagun Garg
01/04/2021, 7:48 AMSagun Garg
01/04/2021, 8:24 AMSagun Garg
01/07/2021, 4:27 AMimport json
import prefect, urllib, requests
# Event Driven Flow Calling using GraphQL API of prefect
# Resource 1: <https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a>
class PrefectClientException(Exception):
pass
class PrefectClient(object):
def __init__(self):
self.__api_url = "<https://api.prefect.io>"
self.__api_token = "YOUR_PREFECT_RUNNER_KEY" #you can reuse runner token here as well for GraphQL API Client
self.__client = prefect.Client(api_token="YOUR_PREFECT_RUNNER_KEY")
def getFlowId(self,flowName):
query = """
query GetFlowId($flow_name: String!)
{
flow(where: { name: { _ilike: $flow_name } }) {
id
name
flow_runs(order_by: { start_time: desc }, limit: 1) {
id
state
}
}
}
"""
variables = dict(flow_name=flowName)
# <https://docs.prefect.io/api/latest/client/client.html#client-2> Can you be used for further params as part of defn
flows = self.__client.graphql(query=query, variables=variables)
print(flows)
print('#########BREAKER#########\n')
print(flows.data)
print('#########BREAKER#########\n')
print(flows.data.flow[0].id)
print('#########BREAKER#########\n')
flowsDict = flows.data.flow[0]
# Printing dictionary
print ("Dict key-value i.e flow-id and value are : \n" + str(flowsDict))
for key, value in flowsDict.items():
print(key,value)
print('Return the first item in the Dictionary i.e Flow Id ', value)
return value
def adhocFlowRun(self,flowId, event, context):
## prep the data
mutation = """
mutation($input: createFlowRunInput!){
createFlowRun(input: $input){
flow_run{
id
}
}
}
"""
print("Received event: " + json.dumps(event, indent=2))
print("Received context: " + json.dumps(context, indent=2))
print("This is the flow Id:", flowId)
# Indicate the flowId
inputs = dict(flowId=flowId)
# if you wish to pass information about the triggering event as a parameter,
# simply add that to the inputs dictionary under the parameters key,
# whose value should be a dictionary of PARAMETER_NAME -> PARAMETER_VALUE
# pass the full event and context as Prefect Flow Parameters
inputs['parameters'] = dict(event=event, context=context)
variables = dict(input=inputs)
data = json.dumps(
dict(query=mutation, variables=json.dumps(variables))).encode("utf-8")
#print("Data: ", data)
## prep the request
req= urllib.request.Request((self.__api_url), data=data)
#print("Req: ", req)
req.add_header("Content-Type", "application/json")
req.add_header(
"Authorization", "Bearer {}".format((self.__api_token))
)
## send the request and return the response
resp = urllib.request.urlopen(req)
return json.loads(resp.read().decode())
pc = PrefectClient()
flowId = pc.getFlowId("vault:fetch_vessel_schedule")
# prep the data
# This can be any Payload that you want to send to the Flow
# One sample payload is event generated from AWS
event={
"Records": [
{
"messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
"receiptHandle": "MessageReceiptHandle",
"body": "Hello from SQS!",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1523232000000",
"SenderId": "123456789012",
"ApproximateFirstReceiveTimestamp": "1523232000001"
},
"messageAttributes": {},
"md5OfBody": "7b270e59b47ff90a553787216d55d91d",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:MyQueue",
"awsRegion": "us-west-2"
}
]
}
context = {"champion": "Sagun"}
response = pc.adhocFlowRun(flowId,event,context)
print('Results returned after triggerring the flow', response)
Sagun Garg
01/07/2021, 4:27 AMSagun Garg
01/07/2021, 4:27 AM