Requesting Guidance for a Code Sample :pray: For ...
# prefect-community
s
Requesting Guidance for a Code Sample 🙏 For events based trigger, I believe we have to use the GraphQL API and Prefect Client. Usecase: Based on certain event, I wish to spawn parametrised flows on my prefect cloud in adhoc fashion. The code written for an event to trigger this flow is outside the dockerized repo of the flows. [FlowId Vs FlowName] in mutation GraphQL Prefect client The Mutation query listed in the examples demands a flow id. Is there a better supported syntax for triggering a flow with it's flow name. I want through the following example https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a
w
Interested to see what you come up with, I have a similar use case that I would like to explore (flows triggered by webhooks for example). I would have to double check, but you could probably just make a two step process: (1) search the flow id (2) trigger the flow. a bit tedious, but should work i guess
👍 1
s
Copy code
from prefect import task, Flow
from prefect.tasks.prefect import StartFlowRun
import json

# Event Driven Flow Calling using GraphQL API of prefect
# Resource 1: <https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a>

class PrefectClientException(Exception):
    pass

class PrefectClient(object):
    
    def __init__(self, api_url, api_token):
        self.__api_url =  api_url
        self.__api_token =  api_token #you can reuse runner token here as well for GraphQL API Client
        
    
    def get_flowId(self,input):
    # State message for this task are returned in the format:
    # <<flow run id>> finished in state << state >>
    # so we can grab the id easily from that message
        id = input.state.message.split(" ", 1)[0]
        print(id)
        return id

    
    def execute_prefectflow(self,event, context, flow_name, project_name):
        print("Received event: " + json.dumps(event, indent=2))
        print("Received context: " + json.dumps(context, indent=2))
        print("For Project:", project_name)
        print("For Flow:", flow_name)

        ## prep the data
        create_mutation = """
        mutation($input: createFlowRunInput!){
            createFlowRun(input: $input){
                flow_run{
                    id
                }
            }
        }
        """


        task_duration = """
        query TaskRunDuration {
              flow_run(where: { flow_id: {_eq: "your_flow_id"}, state: {_eq: "Success"} }) {
                id
                name
                task_runs(where: {task_id: {_eq: "your_task_id"}}) {
                  start_time
                  end_time
                  id
                }
              }
            }
        """



        a = StartFlowRun(project_name=project_name, wait=True,)(flow_name=flow_name)
        flowId = get_flowId(a)

        # Indicate the flowId based on flow_name by th
        inputs = dict(flowId=os.getenv(flowId))

        # pass the full event
        # if you wish to pass information about the triggering event as a parameter,
        # simply add that to the inputs dictionary under the parameters key,
        # whose value should be a dictionary of PARAMETER_NAME -> PARAMETER_VALUE
        inputs['parameters'] = dict(event=event)

        variables = dict(input=inputs)
        data = json.dumps(
            dict(query=create_mutation, variables=json.dumps(variables))
        ).encode("utf-8")

        ## prep the request
        req = urllib.request.Request(os.getenv(__api_url), data=data)
        req.add_header("Content-Type", "application/json")
        req.add_header(
            "Authorization", "Bearer {}".format(os.getenv(__api_token))
        )

        ## send the request and return the response
        resp = urllib.request.urlopen(req)
        return json.loads(resp.read().decode())

prefect_api_client = PrefectClient("<https://api.prefect.io>", "PREFECT_CLOUD_RUNNER_API_KEY")
status = prefect_api_client.execute_prefectflow(event,context,flow_name,project_name)
print('Results returned after triggerring the flow', status)
@wiretrack @Chris White This is what I have been able to gather. Apologies for my rookiness on Prefect. From what I understand the above code is StartFlowRun is a function which can only be called in a Flow (but as per requirement, the caller of the flow run i.e equivalent for flow.run() but event driven from outside the flows and should not have any flows defined in prefect. I am explicitly trying to write a client independent of prefect but be able to trigger a flow by it's name
@Chris White @wiretrack I don't want to call the flow by it's id because I anticipate the following issue 'The challenge I am facing is how will I get flow_id every-time when I keep upgrading the version of the flow based on incremental development changes'
@wiretrack @Chris White This is what I have been able to run. My Flows are separately dockerized and can be run on AWS EKS Fargate Serverless style using this external python client with two payloads (event,context) passed to the flow at runtime. This is much cleaner 3 line flow.run() capability triggerred externally
Copy code
import json
import prefect, urllib, requests   

# Event Driven Flow Calling using GraphQL API of prefect
# Resource 1: <https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a>

class PrefectClientException(Exception):
    pass


class PrefectClient(object):
    
    def __init__(self):
        self.__api_url = "<https://api.prefect.io>"
        self.__api_token = "YOUR_PREFECT_RUNNER_KEY"  #you can reuse runner token here as well for GraphQL API Client
        self.__client = prefect.Client(api_token="YOUR_PREFECT_RUNNER_KEY")  
    
    def getFlowId(self,flowName):

        query = """
            query GetFlowId($flow_name: String!) 
                {
                    flow(where: { name: { _ilike: $flow_name } }) {
                        id
                        name
                        flow_runs(order_by: { start_time: desc }, limit: 1) {
                           id
                           state
                        }
                        
                      }
                    }            
            """   

        variables = dict(flow_name=flowName)

        # <https://docs.prefect.io/api/latest/client/client.html#client-2> Can you be used for further params as part of defn
        flows = self.__client.graphql(query=query, variables=variables)

        print(flows)

        print('#########BREAKER#########\n')

        print(flows.data)
        print('#########BREAKER#########\n')

        print(flows.data.flow[0].id)
        print('#########BREAKER#########\n')

        flowsDict = flows.data.flow[0]
        # Printing dictionary 

        print ("Dict key-value i.e flow-id and value are : \n" + str(flowsDict)) 
        for key, value in flowsDict.items(): 
            print(key,value)
            print('Return the first item in the Dictionary i.e Flow Id ', value)
            return value


    def adhocFlowRun(self,flowId, event, context):

        ## prep the data
        mutation = """
            mutation($input: createFlowRunInput!){
                createFlowRun(input: $input){
                    flow_run{
                        id
                    }
                }
            }
            """

        print("Received event: " + json.dumps(event, indent=2))
        print("Received context: " + json.dumps(context, indent=2))
        print("This is the flow Id:", flowId)

        # Indicate the flowId
        inputs = dict(flowId=flowId)

        # if you wish to pass information about the triggering event as a parameter,
        # simply add that to the inputs dictionary under the parameters key,
        # whose value should be a dictionary of PARAMETER_NAME -> PARAMETER_VALUE
        # pass the full event and context as Prefect Flow Parameters
        inputs['parameters'] = dict(event=event, context=context)
        
        variables = dict(input=inputs)
        
        data = json.dumps(
            dict(query=mutation, variables=json.dumps(variables))).encode("utf-8")

        #print("Data: ", data)
        ## prep the request
        req= urllib.request.Request((self.__api_url), data=data)
        
        #print("Req: ", req)

        req.add_header("Content-Type", "application/json")
        req.add_header(
            "Authorization", "Bearer {}".format((self.__api_token))
        )

        ## send the request and return the response
        
        resp = urllib.request.urlopen(req)
        return json.loads(resp.read().decode())


    
pc = PrefectClient()

flowId = pc.getFlowId("vault:fetch_vessel_schedule")

# prep the data
# This can be any Payload that you want to send to the Flow
# One sample payload is event generated from AWS 
event={
      
      "Records": [
        {
          "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
          "receiptHandle": "MessageReceiptHandle",
          "body": "Hello from SQS!",
          "attributes": {
            "ApproximateReceiveCount": "1",
            "SentTimestamp": "1523232000000",
            "SenderId": "123456789012",
            "ApproximateFirstReceiveTimestamp": "1523232000001"
          },
          "messageAttributes": {},
          "md5OfBody": "7b270e59b47ff90a553787216d55d91d",
          "eventSource": "aws:sqs",
          "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:MyQueue",
          "awsRegion": "us-west-2"
        }
      ]
    }

context = {"champion": "Sagun"}

response = pc.adhocFlowRun(flowId,event,context)

print('Results returned after triggerring the flow', response)