Shuchita Tripathi
03/09/2022, 8:01 PMKevin Kho
prefect.context.get("flow_run_id")
to get it inside a task or flow run. Is that what you want?Kevin Kho
Kyle McChesney
03/09/2022, 8:13 PMLATEST_FLOW_BY_NAME = gql(
'''
query LatestFlowByName($name: String) {
flow(
where: {name: {_eq: $name}},
order_by: {version: desc},
limit: 1,
)
{
id
}
}
''',
)
you pass in the flow name, which should be known/constant, and it gives you the latest flow version idKyle McChesney
03/09/2022, 8:14 PMfrom gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
RequestsHTTPTransport(
url=PREFECT_API_ROOT,
verify=True,
retries=3,
)
Client(
transport=transport,
fetch_schema_from_transport=True,
)
client.execute(
LATEST_FLOW_BY_NAME,
variable_values={'name': name},
)
Shuchita Tripathi
03/09/2022, 8:18 PMAnna Geller
from prefect.client import Client
client = Client()
query = '''
query LatestFlowByName($name: String) {
flow(
where: {name: {_eq: $name}},
order_by: {version: desc},
limit: 1,
)
{
id
}
}
'''
client.graphql(query)
Kyle McChesney
03/09/2022, 8:35 PMAnna Geller
import json
import logging
import os
import urllib.parse
import urllib.request
os.environ["API_KEY"] = "xxx"
os.environ["FLOW_VERSION_GROUP_ID"] = "339c86be-5c1c-48f0-b8d3-fe57654afe22"
logger = logging.getLogger()
logger.setLevel(<http://logging.INFO|logging.INFO>)
def handler(event, context):
<http://logger.info|logger.info>("Received event: " + json.dumps(event))
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
s3_path = f"s3://{bucket}/{key}"
create_mutation = """
mutation($input: createFlowRunInput!){
createFlowRun(input: $input){
flow_run{
id
}
}
}
"""
inputs = dict(
versionGroupId=os.environ["FLOW_VERSION_GROUP_ID"],
parameters=dict(s3_path=s3_path),
idempotencyKey=event["Records"][0]["eventTime"],
)
variables = dict(input=inputs)
data = json.dumps(
dict(query=create_mutation, variables=json.dumps(variables))
).encode("utf-8")
## prep the request
req = urllib.request.Request("<https://api.prefect.io>", data=data)
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", "Bearer {}".format(os.environ["API_KEY"]))
## send the request and return the response
resp = urllib.request.urlopen(req)
print(json.loads(resp.read().decode()))
In this case this is a mutation, but queries work the same way since GraphQL requires post request anyway.Anna Geller
os.environ["API_KEY"] = "xxx"
os.environ["FLOW_VERSION_GROUP_ID"] = "339c86be-5c1c-48f0-b8d3-fe57654afe22"
Kyle McChesney
03/09/2022, 8:42 PMZanie
FlowView.from_flow_name
Zanie