davzucky
03/16/2022, 6:21 AMAnna Geller
03/16/2022, 11:44 AMflow
decorator rather than on the DeploymentSpec
. And the more I think about it, the more it actually makes sense.
The problem you are describing is not "How can I override the task runner used by a flow on a `DeploymentSpec`" - this is one possible solution. The actual problem is: "How can I use a different task runner for development and production deployments". And to solve that problem, the intended solution is to have two different deployments: one for dev, and one for prod. The entire dev vs. prod story is not yet fully established - we are working on adding e.g. GitHub storage. What would likely be a good solution to your issue (that is currently not possible yet) is that you may have in the end two different branches on the same repo:
• the "`dev`" branch may have flow code with, say DaskTaskRunner
,
• the "`main`" or "`prod`" branch may have flow code with a different task runner e.g. ConcurrentTaskRunner
and then you can have two different deployments - one for dev and one for prod - each of those references the flow code on a given branch. This would give a clear separation of code and environments and would make building CI/CD pipelines much easier.
Having said that, it could be worth exploring adding some sort of override on the DeploymentSpec
(exactly as you mentioned) - I will open an issue to open this up for discussion. The only problem I currently see with that is that it goes a bit against the runtime discoverability in Orion - the way I understand how deployments work is that you should be able to create a single deployment for your_flow.py
- then, you can run your deployed flow first using say DaskTaskRunner
. But then you can modify the flow code your_flow.py
, referenced as flow_location
on the DeploymentSpec
, change the task runner to e.g. ConcurrentTaskRunner
and you can create a flow run of this deployed flow with this new task runner without having to recreate the deployment as Orion allows for runtime discoverability and doesn't force you to preregister any DAG's metadata.
Frankly, even in Prefect 1.0, Prefect doesn't store the executor (effectively the same as task runner in 2.0) information in the backend for privacy reasons because it may contain private information such as your Dask/Ray cluster address. Instead, Prefect retrieves this information at runtime from storage - this is another reason I would be leaning more towards two different versions of this flow in dev and prod branches as a solution to this problem.
For now, you could certainly introduce a hack by setting some custom parameter value (that can be set on your DeploymentSpec
) that determines which subflow to call:
from prefect import flow
from prefect.task_runners import DaskTaskRunner, SequentialTaskRunner
@flow(task_runner=DaskTaskRunner())
def dask_flow():
pass # your flow logic here - it may be some extra function
@flow(task_runner=SequentialTaskRunner())
def sequential_flow():
pass # your flow logic here - it may be some extra function
@flow
def parent_flow(environment: str = "dev"):
if environment == "dev":
dask_flow()
else:
sequential_flow()
@Marvin open "Orion: as a user, how can I use a different task runner for development and production deployments?"Marvin
03/16/2022, 11:44 AMdavzucky
03/16/2022, 1:31 PMAnna Geller
03/16/2022, 1:38 PMflow
vs. DeploymentSpec
is something we think a lot about and we need to approach it carefully to allow various flexible deployment patterns without risking that we lose the runtime discoverability that is possible with Oriondavzucky
03/16/2022, 1:40 PMZanie
03/16/2022, 3:21 PMimport os
from prefect import flow, task
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import UniversalFlowRunner
from prefect.task_runners import ConcurrentTaskRunner, DaskTaskRunner
@flow
def my_flow(
task_runner=DaskTaskRunner()
if os.environ.get("MY_ENV") == "prod"
else ConcurrentTaskRunner(),
):
...
prod = DeploymentSpec(
flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "prod"})
)
dev = DeploymentSpec(
flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "dev"})
)
davzucky
03/16/2022, 11:00 PMimport os
from prefect import flow, task
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import UniversalFlowRunner
from prefect.task_runners import ConcurrentTaskRunner, DaskTaskRunner
@flow
def my_flow(
task_runner=ConcurrentTaskRunner(),
):
...
prod = DeploymentSpec(
flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "prod"}, task_runner=DaskTaskRunner())
)
# or
other_prod = DeploymentSpec(
flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "prod"}), task_runner=DaskTaskRunner()
)
dev = DeploymentSpec(
flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "dev"})
)
Anna Geller
03/17/2022, 9:45 AMZanie
03/17/2022, 2:46 PMdavzucky
03/17/2022, 3:06 PMZanie
03/17/2022, 3:08 PMmy_flow.with_options(task_runner=…)(call_args, …)
with_options
is now added to flowsdavzucky
03/17/2022, 11:35 PM