How can I setup in Orion the task runner at deploy...
# prefect-community
d
How can I setup in Orion the task runner at deployment? I want to be able to change the mode between local dev and prod. I can only see how to do that for flow runners at the deployment level
👀 2
a
It's not possible to do that directly atm since currently the task runner must be specified on the
flow
decorator rather than on the
DeploymentSpec
. And the more I think about it, the more it actually makes sense. The problem you are describing is not "How can I override the task runner used by a flow on a `DeploymentSpec`" - this is one possible solution. The actual problem is: "How can I use a different task runner for development and production deployments". And to solve that problem, the intended solution is to have two different deployments: one for dev, and one for prod. The entire dev vs. prod story is not yet fully established - we are working on adding e.g. GitHub storage. What would likely be a good solution to your issue (that is currently not possible yet) is that you may have in the end two different branches on the same repo: • the "`dev`" branch may have flow code with, say
DaskTaskRunner
, • the "`main`" or "`prod`" branch may have flow code with a different task runner e.g.
ConcurrentTaskRunner
and then you can have two different deployments - one for dev and one for prod - each of those references the flow code on a given branch. This would give a clear separation of code and environments and would make building CI/CD pipelines much easier. Having said that, it could be worth exploring adding some sort of override on the
DeploymentSpec
(exactly as you mentioned) - I will open an issue to open this up for discussion. The only problem I currently see with that is that it goes a bit against the runtime discoverability in Orion - the way I understand how deployments work is that you should be able to create a single deployment for
your_flow.py
- then, you can run your deployed flow first using say
DaskTaskRunner
. But then you can modify the flow code
your_flow.py
, referenced as
flow_location
on the
DeploymentSpec
, change the task runner to e.g.
ConcurrentTaskRunner
and you can create a flow run of this deployed flow with this new task runner without having to recreate the deployment as Orion allows for runtime discoverability and doesn't force you to preregister any DAG's metadata. Frankly, even in Prefect 1.0, Prefect doesn't store the executor (effectively the same as task runner in 2.0) information in the backend for privacy reasons because it may contain private information such as your Dask/Ray cluster address. Instead, Prefect retrieves this information at runtime from storage - this is another reason I would be leaning more towards two different versions of this flow in dev and prod branches as a solution to this problem. For now, you could certainly introduce a hack by setting some custom parameter value (that can be set on your
DeploymentSpec
) that determines which subflow to call:
Copy code
from prefect import flow
from prefect.task_runners import DaskTaskRunner, SequentialTaskRunner


@flow(task_runner=DaskTaskRunner())
def dask_flow():
    pass  # your flow logic here - it may be some extra function


@flow(task_runner=SequentialTaskRunner())
def sequential_flow():
    pass  # your flow logic here - it may be some extra function


@flow
def parent_flow(environment: str = "dev"):
    if environment == "dev":
        dask_flow()
    else:
        sequential_flow()
@Marvin open "Orion: as a user, how can I use a different task runner for development and production deployments?"
🙏 1
m
d
Thank you for opening a discussion, will follow it and add comments to it as well. In prefect 1.0 we create our own command-line tool that we use to run and register our flow. This allows us to configure the runConfig how we want when we register or run the flow. As the deploymentSpec is allowing your to change the flowRunner I was hoping the taskRunner to be set as well. The way all our setup is done is using Docker. All our workflows and dependencies are packaged in a container that we use to instantiate prefect agent and dask cluster. This ensures that all the setup is consistent. In Orion, I used the same concept in the current PoC, and use the flow python name referenced. Looking at moving to the next idea and this is what I'm currently missing.
a
Do you happen to develop your PoC in a public repo you could share? Documenting what specifically you would like to see is certainly helpful feedback but deciding what should be set on the
flow
vs.
DeploymentSpec
is something we think a lot about and we need to approach it carefully to allow various flexible deployment patterns without risking that we lose the runtime discoverability that is possible with Orion
d
Not at the moment, But I can replicate the idea to a public repo. Let me do that indeed. That will be the best to have support to talk about 👍
👍 1
z
We’ll probably allow task runners to be set at the deployment level eventually
What do you think of
Copy code
import os

from prefect import flow, task
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import UniversalFlowRunner
from prefect.task_runners import ConcurrentTaskRunner, DaskTaskRunner


@flow
def my_flow(
    task_runner=DaskTaskRunner()
    if os.environ.get("MY_ENV") == "prod"
    else ConcurrentTaskRunner(),
):
    ...


prod = DeploymentSpec(
    flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "prod"})
)

dev = DeploymentSpec(
    flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "dev"})
)
upvote 1
d
That is what I was thinking to do but at deployment time. Why can we not set the TaskRuinner on the FlowRunner to override the one set to the flow? Why the TaskRunner has to be attached to the flow definition ? I haven't checked the code, just thinking
Copy code
import os

from prefect import flow, task
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import UniversalFlowRunner
from prefect.task_runners import ConcurrentTaskRunner, DaskTaskRunner


@flow
def my_flow(
    task_runner=ConcurrentTaskRunner(),
):
    ...


prod = DeploymentSpec(
    flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "prod"}, task_runner=DaskTaskRunner())
)
# or

other_prod = DeploymentSpec(
    flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "prod"}), task_runner=DaskTaskRunner()
)
dev = DeploymentSpec(
    flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "dev"})
)
a
@davzucky as Michael, we will likely support that directly in the future, but until now you can use the workaround with env variables as for why: probably because no user asked about that until now 🙂 and because of this hard problem of what should be set on the flow vs. on the deployment, and all the implications of this in the backend
z
I probably won’t nest the task runner setting inside the flow runner, they’re intended to be separate concerns.
Overrides at the deployment level are feasible though and quite likely.
d
I was wondering as well during the day (sorry late here). How could you test your flow if you cannot override the TaskRunner ? This is something we are using heavily with Prefect at the moment for all our flow unit test I like the idea of having deployment level capability
z
We’ll also certainly provide a way to create a copy of a flow changing options the same way you can with tasks
e.g.
my_flow.with_options(task_runner=…)(call_args, …)
Fyi
with_options
is now added to flows
d
That great. Good to know thank you @Zanie will look at it