Bo Lin
07/19/2024, 6:53 AMMarvin
07/19/2024, 6:53 AMprefect.runtime
module to access context information about the deployment, flow run, or task run.
Here's an example of how to access runtime information in a flow and task:
from prefect import flow, task
from prefect import runtime
@flow(log_prints=True)
def my_flow(x):
print("My name is", runtime.flow_run.name)
print("I belong to deployment", runtime.deployment.name)
my_task(2)
@task
def my_task(y):
print("My name is", runtime.task_run.name)
print("Flow run parameters:", runtime.flow_run.parameters)
my_flow(1)
Running this code will provide logs showing the runtime context information, such as flow run name, deployment name, and task run details.
For more details, you can refer to the Runtime Context Documentation.Bo Lin
07/19/2024, 6:55 AMimport httpx
import time
from datetime import timedelta
from prefect import flow, task
from prefect.results import LiteralResult
from prefect.deployments.deployments import run_deployment
from prefect.states import raise_state_exception
from typing import Optional
@flow
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
return ['1', '2', '3']
@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
repo_stats = get_url(f"<https://api.github.com/repos/{repo_name}>")
issues = run_deployment(name="k8s-deployment-subflow")
raise_state_exception(issues.state)
if __name__ == "__main__":
get_repo_info.deploy(
name="k8s-deployment",
work_pool_name="my-work-pool",
image="k8s-deployment-image:tutorial",
push=False
)
get_open_issues.deploy(
name="k8s-deployment-subflow",
work_pool_name="my-work-pool",
image="k8s-deployment-image:tutorial2",
push=False
)
And I need to get the result of the issues @MarvinMarvin
07/19/2024, 6:58 AMHere is my code```import httpx import time from datetime import timedelta from prefect import flow, task from prefect.results import LiteralResult from prefect.deployments.deployments import run_deployment from prefect.states import raise_state_exception from typing import Optional @task def get_url(url: str, params: Optional[dict[str, any]] = None): response = httpx.get(url, params=params) response.raise_for_status() return response.json() @flow def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100): issues = [] pages = range(1, -(open_issues_count // -per_page) + 1) for page in pages: issues.append( get_url.submit( f"https://api.github.com/repos/{repo_name}/issues", params={"page": page, "per_page": per_page, "state": "open"}, ) ) return [i for p in issues for i in p.result()] @flow(log_prints=True) def get_repo_info(repo_name: str = "PrefectHQ/prefect"): repo_stats = get_url(f"https://api.github.com/repos/{repo_name}") issues = run_deployment(name="k8s-deployment-subflow", parameters={ 'repo_name': repo_name, 'open_issues_count': repo
Marvin
07/19/2024, 6:59 AMHere is my code
import httpx
import time
from datetime import timedelta
from prefect import flow, task
from prefect.results import LiteralResult
from prefect.deployments.deployments import run_deployment
from prefect.states import raise_state_exception
from typing import Optional
@flow
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
return ['1', '2', '3']
@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
repo_stats = get_url(f"<https://api.github.com/repos/{repo_name}>")
issues = run_deployment(name="k8s-deployment-subflow", parameters={
'repo_name': repo_name,
'open_issues_count': repo_stats["open_issues_count"]
})
raise_state_exception(issues.state)
if __name__ == "__main__":
get_repo_info.deploy(
name="k8s-deployment",
work_pool_name="my-work-pool",
image="k8s-deployment-image:tutorial",
push=False
)
get_open_issues.deploy(
name="k8s-deployment-subflow",
work_pool_name="my-work-pool",
image="k8s-deployment-image:tutorial2",
push=False
)
And I need to get theMarvin
07/19/2024, 7:00 AMrun_deployment
function in Prefect, you can extract and use the state of the flow run. You can get the output of the deployed flow after it finishes executing. Here's how you can modify your flow to capture the results:
python
import httpx
import time
from datetime import timedelta
from prefect import flow, task
from prefect.results import LiteralResult
from prefect.deployments.deployments import run_deployment
from prefect.states import raise_state_exception
from typing import Optional
@flow
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
return ['1', '2', '3']
@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
repo_stats = get_url(f"<https://api.github.com/repos/{repo_name}>")
deployment_execution = run_deployment(name="k8s-deployment-subflow")
# Raise an exception if the deployment run failed
raise_state_exception(deployment_execution.state)
# Retrieve the result from the deployment run state
deployment_result = deployment_execution.state.result(raise_on_failure=True)
print(f"Deployment result: {deployment_result}")
return deployment_result
if __name__ == "__main__":
get_repo_info.deploy(
name="k8s-deployment",
work_pool_name="my-work-pool",
image="k8s-deployment-image:tutorial",
push=False
)
get_open_issues.deploy(
name="k8s-deployment-subflow",
work_pool_name="my-work-pool",
image="k8s-deployment-image:tutorial2",
push=False
)
In this example, deployment_execution.state.result(raise_on_failure=True)
retrieves the result from the deployment run state. By using raise_state_exception(issues.state)
, you ensure that any failures in the deployment run are raised as exceptions.
This will allow you to fetch and utilize the results of your deployed flow.
For more details, please refer to the Runtime Context Documentation and the provided code example.