Salohy
04/28/2021, 1:24 PMimport prefect
from prefect import task, Flow
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor
STORAGE = Docker(registry_url="<http://prefect.azurecr.io|prefect.azurecr.io>", image_name="prefect/test")
RUN_CONFIG = KubernetesRun(image="<http://prefect.azurecr.io/prefect/test|prefect.azurecr.io/prefect/test>")
EXECUTOR = DaskExecutor(address="<tcp://my-ip:8786>")
@task
def hello():
logger = prefect.context.get('logger')
<http://logger.info|logger.info>("Hello!")
with Flow("changeme", storage=STORAGE, run_config = RUN_CONFIG, executor = EXECUTOR) as flow:
hello()
When I run this flow, the flow is successfully submitted but never got executed and the status is forever pending. Do I miss something? Do I need to specify flow.run()
in my code? Many thanks already for helping me 🙂🙏emre
04/28/2021, 1:27 PMKubernetesAgent
to pick up your submitted flow runs and deploy them over at your cluster.
https://docs.prefect.io/orchestration/agents/overview.html#kubernetes-agentSalohy
04/28/2021, 1:28 PMINFO:agent:Found 1 flow run(s) to submit for execution.
INFO:agent:Deploying flow run aaf55884-717a-422e-95cb-e8393c879978
[2021-04-28 13:20:11,482] INFO - agent | Deploying flow run aaf55884-717a-422e-95cb-e8393c879978
flavienbwk
04/28/2021, 1:32 PMemre
04/28/2021, 1:34 PM--log-level DEBUG
maybe?Salohy
04/28/2021, 1:35 PMflavienbwk
04/28/2021, 1:37 PMSalohy
04/28/2021, 1:40 PM--log-level DEBUG
emre
04/28/2021, 1:41 PMprefect agent kubernetes start --log-level DEBUG
No idea how that translates to a manifestSalohy
04/28/2021, 1:53 PMSalohy
04/28/2021, 1:55 PMhttp://:8080
I do no really understand this env variable so I let is as it is generated. But may be it should be something elseSalohy
04/28/2021, 1:58 PMKevin Kho
Salohy
04/28/2021, 2:03 PM15:54:01
ERROR
prefect-server.Lazarus.FlowRun
A Lazarus process attempted to reschedule this run 3 times without success. Marking as failed.
Kevin Kho
Salohy
04/28/2021, 2:17 PMKevin Kho
Salohy
04/28/2021, 2:18 PMZanie
executor
?Zanie
--disable-job-deletion
to your agent instantiation then running kubectl describe jobs
to inspect information at your cluster level.Salohy
04/28/2021, 2:23 PMZanie
Docker
storage, you do not need to usually need to supply an image name to the KubernetesRun
-- it'll infer that.Salohy
04/28/2021, 2:27 PMexecutor
I have the same situation. But where is the flow executed without executor then?Salohy
04/28/2021, 2:29 PMdistributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 501, in handle_comm
result = await result
File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 4718, in add_client
await self.handle_stream(comm=comm, extra={"client": client})
File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 573, in handle_stream
handler(**merge(extra, msg))
File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 3850, in update_graph_hlg
unpacked_graph = HighLevelGraph.__dask_distributed_unpack__(hlg, annotations)
File "/opt/conda/lib/python3.8/site-packages/dask/highlevelgraph.py", line 996, in __dask_distributed_unpack__
hlg = loads_msgpack(*packed_hlg)
TypeError: loads_msgpack() missing 1 required positional argument: 'payload
Salohy
04/28/2021, 2:30 PMpass
and the dask scheduler logs was
distributed.scheduler - INFO - Receive client connection: Client-e3d07cee-a82c-11eb-9f01-acde48001122
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-e3d07cee-a82c-11eb-9f01-acde48001122
distributed.scheduler - INFO - Remove client Client-e3d07cee-a82c-11eb-9f01-acde48001122
distributed.scheduler - INFO - Close client connection: Client-e3d07cee-a82c-11eb-9f01-acde48001122
Zanie
DaskExecutor
without arguments and it will spin up a temporary dask cluster within the pod that your flow executes in.Zanie
Salohy
04/28/2021, 4:37 PMSalohy
04/29/2021, 2:03 PMZanie
Salohy
04/29/2021, 2:42 PMError creating: pods "prefect-job-65829e40-" is forbidden: error looking up service account default/test-sa: serviceaccount "test-sa" not found
this can be configured in the RoleBinding manifest, right?Salohy
04/29/2021, 3:12 PMZanie
KubernetesRun
the agent creates a Job
which will schedule a Pod
which executes your flow in your image. During execution of your flow, the Executor
you have chosen will be setup and tasks will be submitted to it. Here, your tasks would be submitted to your Dask cluster.Zanie
Zanie
DockerRun
to run the flow (ie walk the dag) and a DaskExecutor
to submit your task execution to your dask cluster.Salohy
04/29/2021, 3:38 PM