Sang Young Noh
04/27/2022, 11:40 AMAnna Geller
04/27/2022, 11:44 AMfrom prefect import flow
@flow
def say_hi(user_name: str):
print(f"Hi {user_name}!")
if __name__ == "__main__":
say_hi("Marvin")
Sang Young Noh
04/27/2022, 11:54 AMAnna Geller
04/29/2022, 1:04 AMfrom prefect.deployments import DeploymentSpec
where did you get this wrong import from? I'd love to correct those docsSang Young Noh
04/29/2022, 8:14 AMAnna Geller
04/29/2022, 3:01 PMSang Young Noh
04/29/2022, 3:48 PMAnna Geller
04/29/2022, 3:48 PMSang Young Noh
05/03/2022, 7:53 AMAnna Geller
05/03/2022, 12:33 PMdeploy_specs = [
dict(deployment_name="flow1", flow_path="./flow1.py"),
dict(deployment_name="flow2", flow_path="./flow2.py"),
dict(deployment_name="flow3", flow_path="./flow3.py"),
]
for ds in deploy_specs:
config = DeploymentSpec(name=ds["deployment_name"], flow_location=ds["flow_path"])
But my personal favorite would be a more functional approach - here is an example I was using:
from prefect.deployments import DeploymentSpec
from prefect.flows import Flow
from prefect.orion.schemas.schedules import SCHEDULE_TYPES
# from prefect.flow_runners import DockerFlowRunner
from typing import Any, Dict, List
from flows.async_flow import async_flow
from flows.crypto_prices_etl import crypto_prices_etl
from flows.repo_trending_check import repo_trending_check
def set_deployment_spec(
flow: Flow,
environment: str = "dev",
schedule: SCHEDULE_TYPES = None,
parameters: Dict[str, Any] = None,
tags: List[str] = None,
) -> DeploymentSpec:
deploy_tags = [environment] if tags is None else [environment, *tags]
return DeploymentSpec(
flow=flow,
name=f"{flow.name}_{environment}",
schedule=schedule,
tags=deploy_tags,
parameters=parameters,
# flow_runner=DockerFlowRunner()
)
set_deployment_spec(async_flow)
set_deployment_spec(crypto_prices_etl)
set_deployment_spec(repo_trending_check)
Sang Young Noh
05/03/2022, 1:44 PMreturn DeploymentSpec(
flow=flow,
name=f"{flow.name}_{environment}",
schedule=schedule,
tags=deploy_tags,
parameters=parameters,
# flow_runner=DockerFlowRunner()
)
The flow we wish to tag the deployment spec with, with the input variable flow = flow ; for example, if I were to construct a flow like this:
@flow(name="leonardo_dicapriflow")
def leonardo_dicapriflow(name: str):
log_message(name)
return
would flow be the decorator name, or the function name?
2. The functional file you put, for example, if I were wanting to deploy onto the prefect cloud, would it be the following command?
prefect deployment create functionalDeployment.py
which should show the async_flow, crypto_prices_etl, and repo_trending_check deployments scheduled on the prefect cloud ui?
Thanks!Anna Geller
05/04/2022, 11:09 AM#1 would flow be the decorator name, or the function name?neither nor - this need to be the function object - so in this case:
flow=leonardo_dicapriflow
#2 Exactly!Sang Young Noh
05/04/2022, 11:19 AMAnna Geller
05/04/2022, 11:22 AMSang Young Noh
05/04/2022, 11:22 AMAnna Geller
05/04/2022, 11:23 AMSang Young Noh
05/04/2022, 11:24 AMAnna Geller
05/09/2022, 3:16 PMSang Young Noh
05/09/2022, 3:17 PMAnna Geller
05/10/2022, 12:10 PMprefect work-queue preview --hours 12 'acffbcc8-ae65-4c83-a38a-96e2e5e5b441'
it returns: scheduled start time, flow run ID, flow run name, deployment IDSang Young Noh
05/10/2022, 12:21 PMAnna Geller
05/11/2022, 1:22 PMbuild a class that creates the 'flow' for meI wouldn't recommend that since classes are stateful and flow is supposed to be a dynamic concept
Sang Young Noh
05/11/2022, 1:25 PMAnna Geller
05/11/2022, 1:26 PMSang Young Noh
05/11/2022, 1:26 PMAnna Geller
05/11/2022, 1:27 PMSang Young Noh
05/11/2022, 1:28 PMAnna Geller
05/11/2022, 1:30 PMSang Young Noh
05/11/2022, 1:31 PMAnna Geller
05/11/2022, 3:36 PMSang Young Noh
05/12/2022, 1:14 PMAnna Geller
05/12/2022, 1:20 PMprefect deployment run
to run it on the agentSang Young Noh
05/12/2022, 1:57 PMAnna Geller
05/30/2022, 9:57 AMSang Young Noh
05/30/2022, 9:58 AMAnna Geller
05/30/2022, 10:39 AM