When I inspect the agent logs (run with "prefect a...
# prefect-server
r
When I inspect the agent logs (run with "prefect agent local start" inside a container), it looks like it is only picking up flow-runs with LocalExecutor. I see there is also LocalDaskExecutor but I don't want the agent to run its own local dask cluster, I want it to send work to the scheduler, which is in a separate container.
k
Hey @Ross Rochford, it sounds like Prefect can’t send work to the scheduler of the cluster. Could you share how to define the executor in the Flow?
r
Copy code
with Flow("list-sum-dask") as flow:
    flow.executor = DaskExecutor(address=SCHEDULER_ADDRESS)
k
Could you try it outside the Flow block?
Copy code
with Flow("list-sum-dask") as flow:
     ....
    
flow.executor = DaskExecutor(address=SCHEDULER_ADDRESS)
But if that doesn’t work, would there be any reason the Dask cluster can’t accept incoming requests? If it’s in a container, are the needed ports open?
r
That didn't work.
No I don't think so, I'm able to send work to the cluster from the dask client inside the agent's container.
Sorry, I confused two flows, the run seems to have been scheduled this time
k
No worries. Just let me know if it’s still not working.
r
ok, the tasks are still pending
maybe that didn't make a difference after all
This time the agent stdout showed:
Copy code
[2021-08-09 13:42:09+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'list-sum-dask2'
which I don't think it did previously (though I'm not 100% sure)
Is it strange that it says "CloudFlowRunner"? I'm using prefect server, not Prefect Cloud
The dashboard says it's "running" but isn't making progress, all the tasks are pending.
k
Do you have access to the Dask UI to see if anything was scheduled there?
The CloudFlowRunner is not out of the ordinary. It’s just the class name that makes the logs. Maybe you can run the Flow with debug level logs?
r
Yes, I have access to the Dask UI, no tasks are shown
trying now with debug logs, you mean on the agent?
Copy code
[2021-08-09 13:58:54+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'list-sum-dask2'
[2021-08-09 13:58:55,282] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
/usr/local/lib/python3.8/site-packages/distributed/client.py:1190: VersionMismatchWarning: Mismatched versions found

+---------+----------------+---------------+---------------+
| Package | client         | scheduler     | workers       |
+---------+----------------+---------------+---------------+
| blosc   | None           | 1.9.2         | 1.9.2         |
| lz4     | None           | 3.1.3         | 3.1.3         |
| msgpack | 1.0.2          | 1.0.0         | 1.0.0         |
| numpy   | None           | 1.21.1        | 1.21.1        |
| pandas  | None           | 1.3.0         | 1.3.0         |
| python  | 3.8.11.final.0 | 3.8.0.final.0 | 3.8.0.final.0 |
+---------+----------------+---------------+---------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
[2021-08-09 13:58:55,390] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
[2021-08-09 13:58:55,394] DEBUG - agent | Sleeping flow run poller for 2.0 seconds...
DEBUG:agent:Sleeping flow run poller for 2.0 seconds...
[2021-08-09 13:58:57,396] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
[2021-08-09 13:58:57,456] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
k
This seems fine. Agent and flow. On the Flow you can click “Advanced Configuration” in the UI under “Run” and then either add an env variable with
PREFECT___LOGGING___LEVEL="DEBUG"
or you might be able to set the logging level.
r
ok, I tried that, I don't see anything different in the logs
Copy code
2021-08-09 14:13:11+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'list-sum-dask2'
[2021-08-09 14:13:12,755] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
/usr/local/lib/python3.8/site-packages/distributed/client.py:1190: VersionMismatchWarning: Mismatched versions found

+---------+----------------+---------------+---------------+
| Package | client         | scheduler     | workers       |
+---------+----------------+---------------+---------------+
| blosc   | None           | 1.9.2         | 1.9.2         |
| lz4     | None           | 3.1.3         | 3.1.3         |
| msgpack | 1.0.2          | 1.0.0         | 1.0.0         |
| numpy   | None           | 1.21.1        | 1.21.1        |
| pandas  | None           | 1.3.0         | 1.3.0         |
| python  | 3.8.11.final.0 | 3.8.0.final.0 | 3.8.0.final.0 |
+---------+----------------+---------------+---------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
[2021-08-09 14:13:12,877] DEBUG - agent | No ready flow runs found.
k
Did you check the Flow logs in the UI? The debug level logs of the flow won’t show up on the agent unless you pass the
--show-flow-logs
flag.
r
I notice when I run "prefect config" on the agent, there are a lot of default values that look wrong. But I'm assuming these are values that the server would use, not the agent.
ok, one sec
Copy code
9 August 2021,04:09:20 	agent	INFO	Submitted for execution: PID: 158
9 August 2021,04:09:21 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'list-sum-dask2'
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Flow 'list-sum-dask2': Handling state change from Scheduled to Running
and nothing else...
k
When you said you were able to send work to the client on the same container as the agent, that’s just using Dask commands without Prefect right? Could you show me what code you used?
r
actually, a few more lines just arrived
Copy code
9 August 2021,04:09:20 	agent	INFO	Submitted for execution: PID: 158
9 August 2021,04:09:21 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'list-sum-dask2'
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Flow 'list-sum-dask2': Handling state change from Scheduled to Running
9 August 2021,04:23:01 	prefect-server.Lazarus.FlowRun	INFO	Rescheduled by a Lazarus process. This is attempt 1.
9 August 2021,04:23:10 	agent	INFO	Submitted for execution: PID: 197
9 August 2021,04:23:11 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'list-sum-dask2'
9 August 2021,04:23:11 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
9 August 2021,04:23:11 	prefect.CloudFlowRunner	DEBUG	Flow 'list-sum-dask2': Handling state change from Scheduled to Running
9 August 2021,04:23:12 	prefect.DaskExecutor	INFO	Connecting to an existing Dask cluster at dask-scheduler:8890
9 August 2021,04:23:15 	prefect.CloudFlowRunner	DEBUG	Checking flow run state...
9 August 2021,04:23:15 	prefect.CloudFlowRunner	INFO	Flow run RUNNING: terminal tasks are incomplete.
yes, just with dask
Copy code
import os

import dask
from dask.distributed import Client
import distributed


SCHEDULER_ADDRESS = os.environ['PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS']


def inc(x):
    dask.distributed.get_worker().log_event('inc-log', {'x': x})
    return x + 1

def square(x):
    return x*x


client = Client(SCHEDULER_ADDRESS)
x = client.submit(inc, 10)
print(x.result())

y = client.submit(inc, x)
print(y.result())
L = client.map(inc, range(1000))
list_incremented = client.gather(L)
print(list_incremented)


total = client.submit(sum, L)
print(total.result())

events = client.get_events('inc-log')
print(events)
I don't want to take any more of your time, if you like I can post a repo with my docker-compose setup
k
That’s weird This is basically what the DaskExecutor does as well. Will ask the team if there are other ideas.
r
or submit it as a github issue
k
I’ll have someone else on the team look at this in a bit so no need to submit as a github issue quite yet. i don’t think the repo would help since the Dask cluster stuff is not in it?
r
it is all together in one repo
I can document the scenario
z
Hey @Ross Rochford -- if you put a reproducible example up I'll give it a go for you
r
thanks!
@Zanie friendly poke in case you forgot :)
z
Ah yes; I'll try to get to this in the morning. Sorry about that.
r
Thanks! No major rush.
z
Giving this a look now
Hi! Your dask workers were not receiving your Prefect configuration so when tasks were submitted to them for execution they were attempting to contact Cloud instead of Server
Copy code
❯ git diff                                      
diff --git a/docker-compose.yml b/docker-compose.yml
index 793dbde..aaddc0a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -169,11 +169,13 @@ services:
     environment:
       - DASK_SCHEDULER_ADDRESS=dask-scheduler:8890
       - EXTRA_PIP_PACKAGES=prefect
     hostname: dask-worker
     networks:
       - prefect-network
     volumes:
       - './dask/config:/etc/dask'
+      - './agent/prefect_backend_config.toml:/root/.prefect/backend.toml'
     restart: "always"
     depends_on:
       - dask-scheduler
👍 1
prefect run --name list-sum-dask --watch --log-level DEBUG
succeeds now
r
awesome, I have it working now, thank you