Bo Lin
07/19/2024, 6:53 AMMarvin
07/19/2024, 6:53 AMprefect.runtimefrom prefect import flow, task
from prefect import runtime
@flow(log_prints=True)
def my_flow(x):
    print("My name is", runtime.flow_run.name)
    print("I belong to deployment", runtime.deployment.name)
    my_task(2)
@task
def my_task(y):
    print("My name is", runtime.task_run.name)
    print("Flow run parameters:", runtime.flow_run.parameters)
my_flow(1)Bo Lin
07/19/2024, 6:55 AMimport httpx
import time
from datetime import timedelta
from prefect import flow, task
from prefect.results import LiteralResult
from prefect.deployments.deployments import run_deployment
from prefect.states import raise_state_exception
from typing import Optional
@flow
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
    return ['1', '2', '3']
@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    repo_stats = get_url(f"<https://api.github.com/repos/{repo_name}>")
    issues = run_deployment(name="k8s-deployment-subflow")
    raise_state_exception(issues.state)
if __name__ == "__main__":
    get_repo_info.deploy(
        name="k8s-deployment", 
        work_pool_name="my-work-pool", 
        image="k8s-deployment-image:tutorial",
        push=False
    )
    get_open_issues.deploy(
        name="k8s-deployment-subflow", 
        work_pool_name="my-work-pool", 
        image="k8s-deployment-image:tutorial2",
        push=False
    )Marvin
07/19/2024, 6:58 AMHere is my code```import httpx import time from datetime import timedelta from prefect import flow, task from prefect.results import LiteralResult from prefect.deployments.deployments import run_deployment from prefect.states import raise_state_exception from typing import Optional @task def get_url(url: str, params: Optional[dict[str, any]] = None): response = httpx.get(url, params=params) response.raise_for_status() return response.json() @flow def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100): issues = [] pages = range(1, -(open_issues_count // -per_page) + 1) for page in pages: issues.append( get_url.submit( f"https://api.github.com/repos/{repo_name}/issues", params={"page": page, "per_page": per_page, "state": "open"}, ) ) return [i for p in issues for i in p.result()] @flow(log_prints=True) def get_repo_info(repo_name: str = "PrefectHQ/prefect"): repo_stats = get_url(f"https://api.github.com/repos/{repo_name}") issues = run_deployment(name="k8s-deployment-subflow", parameters={ 'repo_name': repo_name, 'open_issues_count': repo
Marvin
07/19/2024, 6:59 AMHere is my code
import httpx
import time
from datetime import timedelta
from prefect import flow, task
from prefect.results import LiteralResult
from prefect.deployments.deployments import run_deployment
from prefect.states import raise_state_exception
from typing import Optional
@flow
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
    return ['1', '2', '3']
@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    repo_stats = get_url(f"<https://api.github.com/repos/{repo_name}>")
    issues = run_deployment(name="k8s-deployment-subflow", parameters={
        'repo_name': repo_name, 
        'open_issues_count': repo_stats["open_issues_count"]
    })
    raise_state_exception(issues.state)
if __name__ == "__main__":
    get_repo_info.deploy(
        name="k8s-deployment", 
        work_pool_name="my-work-pool", 
        image="k8s-deployment-image:tutorial",
        push=False
    )
    get_open_issues.deploy(
        name="k8s-deployment-subflow", 
        work_pool_name="my-work-pool", 
        image="k8s-deployment-image:tutorial2",
        push=False
    )Marvin
07/19/2024, 7:00 AMrun_deploymentpython
import httpx
import time
from datetime import timedelta
from prefect import flow, task
from prefect.results import LiteralResult
from prefect.deployments.deployments import run_deployment
from prefect.states import raise_state_exception
from typing import Optional
@flow
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
    return ['1', '2', '3']
@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    repo_stats = get_url(f"<https://api.github.com/repos/{repo_name}>")
    deployment_execution = run_deployment(name="k8s-deployment-subflow")
    
    # Raise an exception if the deployment run failed
    raise_state_exception(deployment_execution.state)
    
    # Retrieve the result from the deployment run state
    deployment_result = deployment_execution.state.result(raise_on_failure=True)
    print(f"Deployment result: {deployment_result}")
    return deployment_result
if __name__ == "__main__":
    get_repo_info.deploy(
        name="k8s-deployment", 
        work_pool_name="my-work-pool", 
        image="k8s-deployment-image:tutorial",
        push=False
    )
    get_open_issues.deploy(
        name="k8s-deployment-subflow", 
        work_pool_name="my-work-pool", 
        image="k8s-deployment-image:tutorial2",
        push=False
    )deployment_execution.state.result(raise_on_failure=True)raise_state_exception(issues.state)