https://prefect.io logo
Title
j

Jason Bertman

10/06/2022, 6:40 PM
Does anyone know if it is possible to figure out in which context a flow is executing, local or remote - and accordingly modifying a task runner? I have a Ray cluster in k8s that our remote deployments will use, but have a requirement to allow local exec too. Problem is, users won't have access to our Ray cluster, which wouldn't be a problem, but the flow def has the address hardcoded. Looking to do something like (local exec has a temp Ray cluster, k8s exec has remote cluster):
@flow(
    task_runner=RayTaskRunner(
        address="<ray://ray-cluster-kuberay-head-svc:10001>"
    ),
)
def main(...):
    ...

if __name__ == "__main__": # If this flow is called directly, don't use the address
    main(..., task_runner=RayTaskRunner)
1
z

Zanie

10/06/2022, 6:44 PM
You can check for a flow run id in the environment to see if it’s being run via deployment
PREFECT__FLOW_RUN_ID
j

Jason Bertman

10/06/2022, 6:45 PM
ha, that'll do it. Any idea about modding the flow task runner accordingly?
r

Ryan Peden

10/06/2022, 6:45 PM
Would something like this work for your use case when running the flow directly?
if __name__ == "__main__":
  flow_run = main.with_options(task_runner=RayTaskRunner())
  flow_run()
j

Jason Bertman

10/06/2022, 6:45 PM
oo I hadn't seen
with_options
, will give that a try
z

Zanie

10/06/2022, 6:47 PM
Oh yeah that’d do it too.
You can also do like..
if __name__ == "__prefect_loader__"
to detect a deployment (I’d double check the name but I think that’s it).
1
j

Jason Bertman

10/06/2022, 6:47 PM
working 🙂 Thank you both 👍
z

Zanie

10/06/2022, 6:48 PM
We execute your script to load the flow, so you can do
def get_task_runner():
   if "PREFECT__FLOW_RUN_ID" in os.environ:
       return RayTaskRunner(
        address="<ray://ray-cluster-kuberay-head-svc:10001>"
       )
    else:
      ...
@flow(
    task_runner=get_task_runner()
)
A couple valid patterns here!
j

Jason Bertman

10/06/2022, 6:49 PM
Ha, good point 😄