Hello every one,  This is my example flow that I w...
# ask-community
s
Hello every one,  This is my example flow that I want to run with docker as Storage, KubernetesRun as run_config and DaskExecutor in an existing cluster as executor.
Copy code
import prefect
from prefect import task, Flow
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor

STORAGE = Docker(registry_url="<http://prefect.azurecr.io|prefect.azurecr.io>", image_name="prefect/test")

RUN_CONFIG = KubernetesRun(image="<http://prefect.azurecr.io/prefect/test|prefect.azurecr.io/prefect/test>")

EXECUTOR = DaskExecutor(address="<tcp://my-ip:8786>")


@task
def hello():
  logger = prefect.context.get('logger')
  <http://logger.info|logger.info>("Hello!")


with Flow("changeme", storage=STORAGE, run_config = RUN_CONFIG, executor = EXECUTOR) as flow:
  hello()
 When I run this flow, the flow is successfully submitted but never got executed and the status is forever pending. Do I miss something? Do I need to specify
flow.run()
in my code? Many thanks already for helping me 🙂🙏
e
Are you running an agent? You need a
KubernetesAgent
to pick up your submitted flow runs and deploy them over at your cluster. https://docs.prefect.io/orchestration/agents/overview.html#kubernetes-agent
s
Yes I am running a KubernetesAgent and the agent logs looks like this
Copy code
INFO:agent:Found 1 flow run(s) to submit for execution.
INFO:agent:Deploying flow run aaf55884-717a-422e-95cb-e8393c879978
[2021-04-28 13:20:11,482] INFO - agent | Deploying flow run aaf55884-717a-422e-95cb-e8393c879978
f
Check labels Your agent labels MUST match exactly your Flow labels I often have issues with labels that are created automatically.
e
I think the agent is fine, if labels didn't match the deploying flow run log wouldn't have happened. At least I think so. I'm just guessing here, but maybe debug logs of agent has some more info. Try to run your agent with
--log-level DEBUG
maybe?
s
Do you mean these labels? They are the same. Trying the debug mode now
f
Exactly, so it doesn't seem to be the problem.
s
hmm @emre I am using a manifest file for the agent kubernetes and I am not sure where I can set
--log-level DEBUG
e
I have absolutely no idea 😅 i just checked the cli for prefect and the cli way goes like this:
Copy code
prefect agent kubernetes start --log-level DEBUG
No idea how that translates to a manifest
s
do you know what is this env variable in the kubernetes agent for PREFECT__CLOUD__AGENT__AGENT_ADDRESS ? may be the value of this variable is my problem
from prefect cli generated manifest file, the value is
http://:8080
I do no really understand this env variable so I let is as it is generated. But may be it should be something else
k
Hi @Salohy! Do you have any logs on the flow side?
s
hey @Kevin Kho, yes I do. I mean now I do, during the run I saw nothing but then the flow fails at the end and I have the following
Copy code
15:54:01
ERROR
prefect-server.Lazarus.FlowRun
A Lazarus process attempted to reschedule this run 3 times without success. Marking as failed.
k
It seems like you have a Dask cluster already up and running. Are you trying to use Kubernetes on top of that?
s
yes, I have a Dask Cluster deployed in Kubernetes using helm, it is already up and running. Is that a bad practice?
k
I’ll grab a hold of someone with more k8s experience
s
thanks a lot 🙏
z
Does this flow run correctly if you do not supply an
executor
?
You may also be able to get more information by adding
--disable-job-deletion
to your agent instantiation then running
kubectl describe jobs
to inspect information at your cluster level.
s
okay, trying without exeutor
z
Also note that since you're using
Docker
storage, you do not need to usually need to supply an image name to the
KubernetesRun
-- it'll infer that.
👍 1
s
if I do not supply an
executor
I have the same situation. But where is the flow executed without executor then?
may be the problem is the Dask cluster, tried to run directly with flow.run() and I’ve got this error in my dask scheduler
Copy code
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 501, in handle_comm
    result = await result
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 4718, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 573, in handle_stream
    handler(**merge(extra, msg))
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 3850, in update_graph_hlg
    unpacked_graph = HighLevelGraph.__dask_distributed_unpack__(hlg, annotations)
  File "/opt/conda/lib/python3.8/site-packages/dask/highlevelgraph.py", line 996, in __dask_distributed_unpack__
    hlg = loads_msgpack(*packed_hlg)
TypeError: loads_msgpack() missing 1 required positional argument: 'payload
then I removed the task in the flow, just wrote
pass
and the dask scheduler logs was
Copy code
distributed.scheduler - INFO - Receive client connection: Client-e3d07cee-a82c-11eb-9f01-acde48001122
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-e3d07cee-a82c-11eb-9f01-acde48001122
distributed.scheduler - INFO - Remove client Client-e3d07cee-a82c-11eb-9f01-acde48001122
distributed.scheduler - INFO - Close client connection: Client-e3d07cee-a82c-11eb-9f01-acde48001122
z
If you don't supply an executor, a basic executor is used that runs things sequentially. You can also use a
DaskExecutor
without arguments and it will spin up a temporary dask cluster within the pod that your flow executes in.
The traceback you've posted looks like a version difference
s
Many thanks for your help, will dig more into the Dask Executor
hey all, I fixed the problem with the Dask Executor but still I have the same situation. It seems like prefect does not find where to execute the flow. After submission the log says `Submitted for execution:`but nothing is coming to the dask scheduler
z
Submitted for execution means that the agent created the K8s Job. Does this work when you run it locally instead of on K8s?
s
hmmm I found the error on k8s,
Copy code
Error creating: pods "prefect-job-65829e40-" is forbidden: error looking up service account default/test-sa: serviceaccount "test-sa" not found
this can be configured in the RoleBinding manifest, right?
hey 🙂 I could fix the problem and my flow is running. Small victory 😄 Many Thanks for the help…. So it turned out that the flow did not execute the task in the Dask cluster using the DaskExecutor but as a Kubernetes job, it’s because I am using the KubernetesRun as run_config. My question is, what is the corresponding run_config for DaskExecutor in an existing cluster? Many thanks for helping me out and sorry for the question if it seems to be a stupid one, I am a very beginner with prefect 😬
z
No problem, this is a bit tricky. So the run config determines where your flow runs and the executor determines where your tasks run. With a
KubernetesRun
the agent creates a
Job
which will schedule a
Pod
which executes your flow in your image. During execution of your flow, the
Executor
you have chosen will be setup and tasks will be submitted to it. Here, your tasks would be submitted to your Dask cluster.
1
It's generally a bit heavy to use a K8s run config if you're going to push work to a dask executor. Since your tasks are going to be executed elsewhere, you're probably best off using a simple
DockerRun
to run the flow (ie walk the dag) and a
DaskExecutor
to submit your task execution to your dask cluster.
👍 1
s
Okay, I understand better now. Many Thanks 🙏