https://prefect.io logo
Title
r

Ross Rochford

08/09/2021, 11:14 AM
When I inspect the agent logs (run with "prefect agent local start" inside a container), it looks like it is only picking up flow-runs with LocalExecutor. I see there is also LocalDaskExecutor but I don't want the agent to run its own local dask cluster, I want it to send work to the scheduler, which is in a separate container.
k

Kevin Kho

08/09/2021, 1:21 PM
Hey @Ross Rochford, it sounds like Prefect can’t send work to the scheduler of the cluster. Could you share how to define the executor in the Flow?
r

Ross Rochford

08/09/2021, 1:39 PM
with Flow("list-sum-dask") as flow:
    flow.executor = DaskExecutor(address=SCHEDULER_ADDRESS)
k

Kevin Kho

08/09/2021, 1:39 PM
Could you try it outside the Flow block?
with Flow("list-sum-dask") as flow:
     ....
    
flow.executor = DaskExecutor(address=SCHEDULER_ADDRESS)
But if that doesn’t work, would there be any reason the Dask cluster can’t accept incoming requests? If it’s in a container, are the needed ports open?
r

Ross Rochford

08/09/2021, 1:43 PM
That didn't work.
No I don't think so, I'm able to send work to the cluster from the dask client inside the agent's container.
Sorry, I confused two flows, the run seems to have been scheduled this time
k

Kevin Kho

08/09/2021, 1:47 PM
No worries. Just let me know if it’s still not working.
r

Ross Rochford

08/09/2021, 1:47 PM
ok, the tasks are still pending
maybe that didn't make a difference after all
This time the agent stdout showed:
[2021-08-09 13:42:09+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'list-sum-dask2'
which I don't think it did previously (though I'm not 100% sure)
Is it strange that it says "CloudFlowRunner"? I'm using prefect server, not Prefect Cloud
The dashboard says it's "running" but isn't making progress, all the tasks are pending.
k

Kevin Kho

08/09/2021, 1:52 PM
Do you have access to the Dask UI to see if anything was scheduled there?
The CloudFlowRunner is not out of the ordinary. It’s just the class name that makes the logs. Maybe you can run the Flow with debug level logs?
r

Ross Rochford

08/09/2021, 1:58 PM
Yes, I have access to the Dask UI, no tasks are shown
trying now with debug logs, you mean on the agent?
[2021-08-09 13:58:54+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'list-sum-dask2'
[2021-08-09 13:58:55,282] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
/usr/local/lib/python3.8/site-packages/distributed/client.py:1190: VersionMismatchWarning: Mismatched versions found

+---------+----------------+---------------+---------------+
| Package | client         | scheduler     | workers       |
+---------+----------------+---------------+---------------+
| blosc   | None           | 1.9.2         | 1.9.2         |
| lz4     | None           | 3.1.3         | 3.1.3         |
| msgpack | 1.0.2          | 1.0.0         | 1.0.0         |
| numpy   | None           | 1.21.1        | 1.21.1        |
| pandas  | None           | 1.3.0         | 1.3.0         |
| python  | 3.8.11.final.0 | 3.8.0.final.0 | 3.8.0.final.0 |
+---------+----------------+---------------+---------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
[2021-08-09 13:58:55,390] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
[2021-08-09 13:58:55,394] DEBUG - agent | Sleeping flow run poller for 2.0 seconds...
DEBUG:agent:Sleeping flow run poller for 2.0 seconds...
[2021-08-09 13:58:57,396] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
[2021-08-09 13:58:57,456] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
k

Kevin Kho

08/09/2021, 2:01 PM
This seems fine. Agent and flow. On the Flow you can click “Advanced Configuration” in the UI under “Run” and then either add an env variable with
PREFECT___LOGGING___LEVEL="DEBUG"
or you might be able to set the logging level.
r

Ross Rochford

08/09/2021, 2:18 PM
ok, I tried that, I don't see anything different in the logs
2021-08-09 14:13:11+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'list-sum-dask2'
[2021-08-09 14:13:12,755] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
/usr/local/lib/python3.8/site-packages/distributed/client.py:1190: VersionMismatchWarning: Mismatched versions found

+---------+----------------+---------------+---------------+
| Package | client         | scheduler     | workers       |
+---------+----------------+---------------+---------------+
| blosc   | None           | 1.9.2         | 1.9.2         |
| lz4     | None           | 3.1.3         | 3.1.3         |
| msgpack | 1.0.2          | 1.0.0         | 1.0.0         |
| numpy   | None           | 1.21.1        | 1.21.1        |
| pandas  | None           | 1.3.0         | 1.3.0         |
| python  | 3.8.11.final.0 | 3.8.0.final.0 | 3.8.0.final.0 |
+---------+----------------+---------------+---------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
[2021-08-09 14:13:12,877] DEBUG - agent | No ready flow runs found.
k

Kevin Kho

08/09/2021, 2:19 PM
Did you check the Flow logs in the UI? The debug level logs of the flow won’t show up on the agent unless you pass the
--show-flow-logs
flag.
r

Ross Rochford

08/09/2021, 2:19 PM
I notice when I run "prefect config" on the agent, there are a lot of default values that look wrong. But I'm assuming these are values that the server would use, not the agent.
ok, one sec
9 August 2021,04:09:20 	agent	INFO	Submitted for execution: PID: 158
9 August 2021,04:09:21 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'list-sum-dask2'
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Flow 'list-sum-dask2': Handling state change from Scheduled to Running
and nothing else...
k

Kevin Kho

08/09/2021, 2:23 PM
When you said you were able to send work to the client on the same container as the agent, that’s just using Dask commands without Prefect right? Could you show me what code you used?
r

Ross Rochford

08/09/2021, 2:24 PM
actually, a few more lines just arrived
9 August 2021,04:09:20 	agent	INFO	Submitted for execution: PID: 158
9 August 2021,04:09:21 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'list-sum-dask2'
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Flow 'list-sum-dask2': Handling state change from Scheduled to Running
9 August 2021,04:23:01 	prefect-server.Lazarus.FlowRun	INFO	Rescheduled by a Lazarus process. This is attempt 1.
9 August 2021,04:23:10 	agent	INFO	Submitted for execution: PID: 197
9 August 2021,04:23:11 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'list-sum-dask2'
9 August 2021,04:23:11 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
9 August 2021,04:23:11 	prefect.CloudFlowRunner	DEBUG	Flow 'list-sum-dask2': Handling state change from Scheduled to Running
9 August 2021,04:23:12 	prefect.DaskExecutor	INFO	Connecting to an existing Dask cluster at dask-scheduler:8890
9 August 2021,04:23:15 	prefect.CloudFlowRunner	DEBUG	Checking flow run state...
9 August 2021,04:23:15 	prefect.CloudFlowRunner	INFO	Flow run RUNNING: terminal tasks are incomplete.
yes, just with dask
import os

import dask
from dask.distributed import Client
import distributed


SCHEDULER_ADDRESS = os.environ['PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS']


def inc(x):
    dask.distributed.get_worker().log_event('inc-log', {'x': x})
    return x + 1

def square(x):
    return x*x


client = Client(SCHEDULER_ADDRESS)
x = client.submit(inc, 10)
print(x.result())

y = client.submit(inc, x)
print(y.result())
L = client.map(inc, range(1000))
list_incremented = client.gather(L)
print(list_incremented)


total = client.submit(sum, L)
print(total.result())

events = client.get_events('inc-log')
print(events)
I don't want to take any more of your time, if you like I can post a repo with my docker-compose setup
k

Kevin Kho

08/09/2021, 2:28 PM
That’s weird This is basically what the DaskExecutor does as well. Will ask the team if there are other ideas.
r

Ross Rochford

08/09/2021, 2:28 PM
or submit it as a github issue
k

Kevin Kho

08/09/2021, 2:30 PM
I’ll have someone else on the team look at this in a bit so no need to submit as a github issue quite yet. i don’t think the repo would help since the Dask cluster stuff is not in it?
r

Ross Rochford

08/09/2021, 2:38 PM
it is all together in one repo
I can document the scenario
z

Zanie

08/09/2021, 2:56 PM
Hey @Ross Rochford -- if you put a reproducible example up I'll give it a go for you
r

Ross Rochford

08/09/2021, 3:19 PM
thanks!
@Zanie friendly poke in case you forgot :)
z

Zanie

08/10/2021, 11:20 PM
Ah yes; I'll try to get to this in the morning. Sorry about that.
r

Ross Rochford

08/10/2021, 11:31 PM
Thanks! No major rush.
z

Zanie

08/11/2021, 4:40 PM
Giving this a look now
Hi! Your dask workers were not receiving your Prefect configuration so when tasks were submitted to them for execution they were attempting to contact Cloud instead of Server
❯ git diff                                      
diff --git a/docker-compose.yml b/docker-compose.yml
index 793dbde..aaddc0a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -169,11 +169,13 @@ services:
     environment:
       - DASK_SCHEDULER_ADDRESS=dask-scheduler:8890
       - EXTRA_PIP_PACKAGES=prefect
     hostname: dask-worker
     networks:
       - prefect-network
     volumes:
       - './dask/config:/etc/dask'
+      - './agent/prefect_backend_config.toml:/root/.prefect/backend.toml'
     restart: "always"
     depends_on:
       - dask-scheduler
👍 1
prefect run --name list-sum-dask --watch --log-level DEBUG
succeeds now
r

Ross Rochford

08/11/2021, 5:47 PM
awesome, I have it working now, thank you