Pedro Martins
01/06/2021, 6:17 PMcustom_confs = {
"run_config": KubernetesRun(
image="drtools/prefect:dask-test",
image_pull_secrets=["regcred"],
),
"storage": S3(bucket="dr-prefect"),
"executor": DaskExecutor(address="dask-scheduler.dask.svc.cluster.local:8786")
}
with Flow("dask-example", **custom_confs) as flow:
incs = inc.map(x=range(100))
decs = dec.map(x=range(100))
adds = add.map(x=incs, y=decs)
total = list_sum(adds)
Ideally I want to have incs
and decs
running in parallel in different workers but they are running asynchronously as you can see in the image below.
I tried with 3, 5 and 10 dask-workers but the tasks doesn't run in parallel.
What do you suggest?josh
01/06/2021, 6:25 PMPedro Martins
01/06/2021, 6:30 PM$ kubectl exec -ti prefect-server-agent-654b7857f5-g2qt8 -- python -c "import prefect; print(prefect.__version__)"
0.14.1
josh
01/06/2021, 6:40 PMenv = {
"PREFECT__LOG__LEVEL": "DEBUG"
}
and then look for a line in your run’s logs that mentions something along the lines of “Using executor type …”Pedro Martins
01/06/2021, 7:25 PMPREFECT__LOGGING__LEVEL
🙂[2021-01-06 19:23:48+0000] INFO - prefect.S3 | Downloading dask-example/2021-01-06t19-23-40-187375-00-00 from dr-prefect
[2021-01-06 19:23:49+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'dask-example'
[2021-01-06 19:23:49+0000] DEBUG - prefect.CloudFlowRunner | Using executor type DaskExecutor
[2021-01-06 19:23:49+0000] DEBUG - prefect.CloudFlowRunner | Flow 'dask-example': Handling state change from Scheduled to Running
[2021-01-06 19:24:05+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-01-06 19:24:20+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-01-06 19:24:26+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-01-06 19:24:26+0000] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-01-06 19:24:26+0000] DEBUG - prefect.CloudFlowRunner | Flow 'dask-example': Handling state change from Running to Success
/usr/local/lib/python3.6/site-packages/distributed/client.py:1129: VersionMismatchWarning: Mismatched versions found
+-------------+----------------+---------------+---------------+
| Package | client | scheduler | workers |
+-------------+----------------+---------------+---------------+
| blosc | None | 1.9.2 | 1.9.2 |
| distributed | 2.30.0 | 2.30.0 | 2.30.1 |
| lz4 | None | 3.1.0 | 3.1.1 |
| msgpack | 1.0.2 | 1.0.0 | 1.0.0 |
| numpy | None | 1.18.1 | 1.18.1 |
| python | 3.6.12.final.0 | 3.8.0.final.0 | 3.7.0.final.0 |
| tornado | 6.1 | 6.0.4 | 6.1 |
+-------------+----------------+---------------+---------------+
Notes:
- msgpack: Variation is ok, as long as everything is above 0.6
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
josh
01/06/2021, 7:26 PMtime.sleep(1)
or something inside of your inc/dec tasks to slow them down?Arnulfo Soriano
01/06/2021, 7:31 PMimport prefect
from prefect import task, Flow, Parameter
from prefect.environments.storage import Docker
from prefect.run_configs import KubernetesRun
import time
import requests
from prefect.executors import DaskExecutor
from dask_kubernetes import KubeCluster, make_pod_spec
@task
def hello_task(delay):
time.sleep(delay)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Kubernetes!")
@task
def get_value():
time.sleep(10)
return 5
@task
def output_value(value):
print(value)
def register_flow():
with Flow("kubernetesRun2") as flow:
delay = Parameter('delay', default=30)
value = get_value()
output_value(value)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
hello_task(delay)
flow.storage = Docker(image_name="prefect-sample-worker",
image_tag="demo_0.0.34",
local_image=True,
base_image='test/base:0.0.1',
registry_url="test")
flow.storage.add_flow(flow)
flow.storage.build(push=False)
print("Registering containerdemo")
flow.run_config = KubernetesRun(labels=["TEST"], env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
flow.executor = DaskExecutor(cluster_class=lambda: KubeCluster(make_pod_spec(image="test/prefect-sample-worker:demo_0.0.34")), adapt_kwargs={"minimum": 2, "maximum": 5},)
flow.register(project_name="test", build=False)
if __name__ == "__main__":
register_flow()
[2021-01-06 19:29:15+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'kubernetesRun2'
[2021-01-06 19:29:15+0000] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
[2021-01-06 19:29:15+0000] DEBUG - prefect.CloudFlowRunner | Flow 'kubernetesRun2': Handling state change from Scheduled to Running
[2021-01-06 19:29:15+0000] INFO - prefect.CloudTaskRunner | Task 'get_value': Starting task run...
[2021-01-06 19:29:15+0000] DEBUG - prefect.CloudTaskRunner | Task 'get_value': Handling state change from Pending to Running
[2021-01-06 19:29:15+0000] DEBUG - prefect.CloudTaskRunner | Task 'get_value': Calling task.run() method...
[2021-01-06 19:29:25+0000] DEBUG - prefect.CloudTaskRunner | Task 'get_value': Handling state change from Running to Success
[2021-01-06 19:29:25+0000] INFO - prefect.CloudTaskRunner | Task 'get_value': Finished task run for task with final state: 'Success'
[2021-01-06 19:29:26+0000] INFO - prefect.CloudTaskRunner | Task 'delay': Starting task run...
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'delay': Handling state change from Pending to Running
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'delay': Calling task.run() method...
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'delay': Handling state change from Running to Success
[2021-01-06 19:29:26+0000] INFO - prefect.CloudTaskRunner | Task 'delay': Finished task run for task with final state: 'Success'
[2021-01-06 19:29:26+0000] INFO - prefect.CloudTaskRunner | Task 'hello_task': Starting task run...
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Handling state change from Pending to Running
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Calling task.run() method...
[2021-01-06 19:29:30+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-01-06 19:29:36+0000] INFO - prefect.hello_task | Hello, Kubernetes!
[2021-01-06 19:29:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Handling state change from Running to Success
[2021-01-06 19:29:36+0000] INFO - prefect.CloudTaskRunner | Task 'hello_task': Finished task run for task with final state: 'Success'
[2021-01-06 19:29:36+0000] INFO - prefect.CloudTaskRunner | Task 'hello_task': Starting task run...
[2021-01-06 19:29:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Handling state change from Pending to Running
[2021-01-06 19:29:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Calling task.run() method...
josh
01/06/2021, 7:32 PMUsing executor type LocalExecutor
which means it isn’t using the DaskExecutor you set on your flow. Are both your flow version and your agent version 0.14+?Pedro Martins
01/06/2021, 7:37 PMsleep(random.random() / 10)
Arnulfo Soriano
01/06/2021, 7:40 PMjosh
01/06/2021, 7:41 PMPedro Martins
01/06/2021, 7:41 PMjosh
01/06/2021, 7:43 PMArnulfo Soriano
01/06/2021, 7:43 PMPedro Martins
01/06/2021, 7:47 PMdec
is computed at same time. However, I was expecting dec
and inc
to run parallel - smth like the green in the middle of the purple.Arnulfo Soriano
01/06/2021, 7:49 PMflow.run_config = KubernetesRun(labels=["TEST"], env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
flow.executor = DaskExecutor(cluster_class=lambda: KubeCluster(make_pod_spec(image="test/prefect-sample-worker:demo_0.0.34")), adapt_kwargs={"minimum": 2, "maximum": 5},)
on top of flow.storage...josh
01/06/2021, 7:51 PMArnulfo Soriano
01/06/2021, 7:51 PMPedro Martins
01/06/2021, 7:57 PMwith Flow("dask-example") as flow:
incs = inc.map(x=range(100))
decs = dec.map(x=range(100))
adds = add.map(x=incs, y=decs)
total = list_sum(adds)
custom_confs = {
"run_config": KubernetesRun(
image="drtools/prefect:dask-test",
image_pull_secrets=["regcred"],
env = {
"PREFECT__LOGGING__LEVEL": "DEBUG"
}
),
"executor": DaskExecutor(address="dask-scheduler.dask.svc.cluster.local:8786"),
"storage": S3(bucket="dr-prefect"),
}
flow.run_config = custom_confs["run_config"]
flow.executor = custom_confs["executor"]
flow.storage = custom_confs["storage"]
Arnulfo Soriano
01/06/2021, 8:10 PMwiretrack
01/07/2021, 2:34 AMfrom prefect import task
from prefect import Flow
from prefect.environments import LocalEnvironment
from prefect.executors import DaskExecutor
import time
@task
def inc(x):
time.sleep(10)
return x + 1
@task
def dec(x):
time.sleep(10)
return x - 1
@task
def add(x, y):
time.sleep(10)
return x + y
executor = DaskExecutor(address='<tcp://192.168.68.103:8786>')
environment = LocalEnvironment(executor)
with Flow("dask-example", environment=environment) as flow:
incs = inc.map(x=range(3))
decs = dec.map(x=range(3))
adds = add.map(x=incs, y=decs)
flow.register(project_name='ETL')
dask-scheduler
and 3 dask-workers
running locally. I don’t know if can be a proper way to test your use case, but anyways…