Leela Surya Teja Mangamuri
04/27/2023, 3:16 AMSerina
04/27/2023, 3:11 PMfrom prefect import flow
from prefect.logging import get_run_logger
from prefect.context import FlowRunContext
import requests
PREFECT_API_URL = "<https://api.prefect.cloud/api/accounts/abc/workspaces/xyz>"
PREFECT_API_KEY = "pnu_ghijk"
@flow
def my_subflow():
logger = get_run_logger()
flow_run_ctx = FlowRunContext.get()
parent_task_run_id = str(flow_run_ctx.flow_run.parent_task_run_id)
headers = {"Authorization": f"Bearer {PREFECT_API_KEY}"}
endpoint = f"{PREFECT_API_URL}/task_runs/{parent_task_run_id}"
<http://logger.info|logger.info>(f"Endpoint: {endpoint}")
response = requests.get(endpoint, headers=headers)
<http://logger.info|logger.info>(f"Status Code: {response.status_code}")
json = response.json()
<http://logger.info|logger.info>(f"response.json(): {json}")
return json["flow_run_id"]
@flow
def my_flow():
expected_parent_flow_run_id = my_subflow()
flow_run_ctx = FlowRunContext.get()
actual_parent_flow_run_id = str(flow_run_ctx.flow_run.id)
assert expected_parent_flow_run_id == actual_parent_flow_run_id
return actual_parent_flow_run_id
if __name__ == "__main__":
print(my_flow())