Thread
#prefect-community
    Sagun Garg

    Sagun Garg

    1 year ago
    Requesting Guidance for a Code Sample 🙏 For events based trigger, I believe we have to use the GraphQL API and Prefect Client. Usecase: Based on certain event, I wish to spawn parametrised flows on my prefect cloud in adhoc fashion. The code written for an event to trigger this flow is outside the dockerized repo of the flows. [FlowId Vs FlowName] in mutation GraphQL Prefect client The Mutation query listed in the examples demands a flow id. Is there a better supported syntax for triggering a flow with it's flow name. I want through the following example https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a
    wiretrack

    wiretrack

    1 year ago
    Interested to see what you come up with, I have a similar use case that I would like to explore (flows triggered by webhooks for example). I would have to double check, but you could probably just make a two step process: (1) search the flow id (2) trigger the flow. a bit tedious, but should work i guess
    Sagun Garg

    Sagun Garg

    1 year ago
    from prefect import task, Flow
    from prefect.tasks.prefect import StartFlowRun
    import json
    
    # Event Driven Flow Calling using GraphQL API of prefect
    # Resource 1: <https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a>
    
    class PrefectClientException(Exception):
        pass
    
    class PrefectClient(object):
        
        def __init__(self, api_url, api_token):
            self.__api_url =  api_url
            self.__api_token =  api_token #you can reuse runner token here as well for GraphQL API Client
            
        
        def get_flowId(self,input):
        # State message for this task are returned in the format:
        # <<flow run id>> finished in state << state >>
        # so we can grab the id easily from that message
            id = input.state.message.split(" ", 1)[0]
            print(id)
            return id
    
        
        def execute_prefectflow(self,event, context, flow_name, project_name):
            print("Received event: " + json.dumps(event, indent=2))
            print("Received context: " + json.dumps(context, indent=2))
            print("For Project:", project_name)
            print("For Flow:", flow_name)
    
            ## prep the data
            create_mutation = """
            mutation($input: createFlowRunInput!){
                createFlowRun(input: $input){
                    flow_run{
                        id
                    }
                }
            }
            """
    
    
            task_duration = """
            query TaskRunDuration {
                  flow_run(where: { flow_id: {_eq: "your_flow_id"}, state: {_eq: "Success"} }) {
                    id
                    name
                    task_runs(where: {task_id: {_eq: "your_task_id"}}) {
                      start_time
                      end_time
                      id
                    }
                  }
                }
            """
    
    
    
            a = StartFlowRun(project_name=project_name, wait=True,)(flow_name=flow_name)
            flowId = get_flowId(a)
    
            # Indicate the flowId based on flow_name by th
            inputs = dict(flowId=os.getenv(flowId))
    
            # pass the full event
            # if you wish to pass information about the triggering event as a parameter,
            # simply add that to the inputs dictionary under the parameters key,
            # whose value should be a dictionary of PARAMETER_NAME -> PARAMETER_VALUE
            inputs['parameters'] = dict(event=event)
    
            variables = dict(input=inputs)
            data = json.dumps(
                dict(query=create_mutation, variables=json.dumps(variables))
            ).encode("utf-8")
    
            ## prep the request
            req = urllib.request.Request(os.getenv(__api_url), data=data)
            req.add_header("Content-Type", "application/json")
            req.add_header(
                "Authorization", "Bearer {}".format(os.getenv(__api_token))
            )
    
            ## send the request and return the response
            resp = urllib.request.urlopen(req)
            return json.loads(resp.read().decode())
    
    prefect_api_client = PrefectClient("<https://api.prefect.io>", "PREFECT_CLOUD_RUNNER_API_KEY")
    status = prefect_api_client.execute_prefectflow(event,context,flow_name,project_name)
    print('Results returned after triggerring the flow', status)
    @wiretrack @Chris White This is what I have been able to gather. Apologies for my rookiness on Prefect. From what I understand the above code is StartFlowRun is a function which can only be called in a Flow (but as per requirement, the caller of the flow run i.e equivalent for flow.run() but event driven from outside the flows and should not have any flows defined in prefect. I am explicitly trying to write a client independent of prefect but be able to trigger a flow by it's name
    @Chris White @wiretrack I don't want to call the flow by it's id because I anticipate the following issue 'The challenge I am facing is how will I get flow_id every-time when I keep upgrading the version of the flow based on incremental development changes'
    import json
    import prefect, urllib, requests   
    
    # Event Driven Flow Calling using GraphQL API of prefect
    # Resource 1: <https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a>
    
    class PrefectClientException(Exception):
        pass
    
    
    class PrefectClient(object):
        
        def __init__(self):
            self.__api_url = "<https://api.prefect.io>"
            self.__api_token = "YOUR_PREFECT_RUNNER_KEY"  #you can reuse runner token here as well for GraphQL API Client
            self.__client = prefect.Client(api_token="YOUR_PREFECT_RUNNER_KEY")  
        
        def getFlowId(self,flowName):
    
            query = """
                query GetFlowId($flow_name: String!) 
                    {
                        flow(where: { name: { _ilike: $flow_name } }) {
                            id
                            name
                            flow_runs(order_by: { start_time: desc }, limit: 1) {
                               id
                               state
                            }
                            
                          }
                        }            
                """   
    
            variables = dict(flow_name=flowName)
    
            # <https://docs.prefect.io/api/latest/client/client.html#client-2> Can you be used for further params as part of defn
            flows = self.__client.graphql(query=query, variables=variables)
    
            print(flows)
    
            print('#########BREAKER#########\n')
    
            print(flows.data)
            print('#########BREAKER#########\n')
    
            print(flows.data.flow[0].id)
            print('#########BREAKER#########\n')
    
            flowsDict = flows.data.flow[0]
            # Printing dictionary 
    
            print ("Dict key-value i.e flow-id and value are : \n" + str(flowsDict)) 
            for key, value in flowsDict.items(): 
                print(key,value)
                print('Return the first item in the Dictionary i.e Flow Id ', value)
                return value
    
    
        def adhocFlowRun(self,flowId, event, context):
    
            ## prep the data
            mutation = """
                mutation($input: createFlowRunInput!){
                    createFlowRun(input: $input){
                        flow_run{
                            id
                        }
                    }
                }
                """
    
            print("Received event: " + json.dumps(event, indent=2))
            print("Received context: " + json.dumps(context, indent=2))
            print("This is the flow Id:", flowId)
    
            # Indicate the flowId
            inputs = dict(flowId=flowId)
    
            # if you wish to pass information about the triggering event as a parameter,
            # simply add that to the inputs dictionary under the parameters key,
            # whose value should be a dictionary of PARAMETER_NAME -> PARAMETER_VALUE
            # pass the full event and context as Prefect Flow Parameters
            inputs['parameters'] = dict(event=event, context=context)
            
            variables = dict(input=inputs)
            
            data = json.dumps(
                dict(query=mutation, variables=json.dumps(variables))).encode("utf-8")
    
            #print("Data: ", data)
            ## prep the request
            req= urllib.request.Request((self.__api_url), data=data)
            
            #print("Req: ", req)
    
            req.add_header("Content-Type", "application/json")
            req.add_header(
                "Authorization", "Bearer {}".format((self.__api_token))
            )
    
            ## send the request and return the response
            
            resp = urllib.request.urlopen(req)
            return json.loads(resp.read().decode())
    
    
        
    pc = PrefectClient()
    
    flowId = pc.getFlowId("vault:fetch_vessel_schedule")
    
    # prep the data
    # This can be any Payload that you want to send to the Flow
    # One sample payload is event generated from AWS 
    event={
          
          "Records": [
            {
              "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
              "receiptHandle": "MessageReceiptHandle",
              "body": "Hello from SQS!",
              "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1523232000000",
                "SenderId": "123456789012",
                "ApproximateFirstReceiveTimestamp": "1523232000001"
              },
              "messageAttributes": {},
              "md5OfBody": "7b270e59b47ff90a553787216d55d91d",
              "eventSource": "aws:sqs",
              "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:MyQueue",
              "awsRegion": "us-west-2"
            }
          ]
        }
    
    context = {"champion": "Sagun"}
    
    response = pc.adhocFlowRun(flowId,event,context)
    
    print('Results returned after triggerring the flow', response)
    @wiretrack @Chris White This is what I have been able to run. My Flows are separately dockerized and can be run on AWS EKS Fargate Serverless style using this external python client with two payloads (event,context) passed to the flow at runtime. This is much cleaner 3 line flow.run() capability triggerred externally
    message has been deleted