Robin
07/12/2020, 10:56 PMDaskKubernetesEnvironment
up and running, but it does not look very parallelized 🤔
Here is the code:
import prefect
from prefect import task, Flow
from prefect.environments.storage import Docker
from prefect.environments import DaskKubernetesEnvironment, LocalEnvironment
import time
import requests
# from prefect.tasks import kubernetes
from prefect.engine.executors import DaskExecutor
@task
def hello_task():
time.sleep(10)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Kubernetes!")
@task
def get_value():
time.sleep(10)
return 5
@task
def output_value(value):
print(value)
@task
def fetch_google():
r = requests.get("<http://www.google.com>")
print(r.content)
with Flow("hello-k8s", environment=DaskKubernetesEnvironment(min_workers=1, max_workers=5, labels=["yogasev"])) as flow:
value = get_value()
output_value(value)
fetch_google()
hello_task()
hello_task()
hello_task()
hello_task()
hello_task()
flow.storage = Docker(registry_url="<my-repo>")
flow.register(project_name="eks_test_01")
Did we miss anything?
On kubectl it looked parallelized (5 dask pods created in parallel), but prefects visualization suggests otherwise… 🤷♂️Kingsley Blatter
07/13/2020, 3:07 PMRobin
07/13/2020, 4:21 PMMarvin
07/13/2020, 5:27 PMjosh
07/13/2020, 6:13 PMflow.environment = LocalEnvironment(
executor=DaskExecutor(
adapt_kwargs={
"minimum": 1,
"maximum": 5
},
),
labels=["yogasev"],
)
Running with this environment gets me the parallelism I expect 🙂Robin
07/14/2020, 10:42 AM"maximum": 5
, why is that?
2. Does this workaround also work for an EKS kubernetes cluster? (I will try to check this on my own now)josh
07/14/2020, 11:13 AMflow.environment = LocalEnvironment(
executor=DaskExecutor(
cluster_kwargs={
"threads_per_worker": 1
},
adapt_kwargs={
"minimum": 1,
"maximum": 5
},
),
labels=["yogasev"],
)
Luis Muniz
07/14/2020, 2:54 PMbruno.corucho
07/14/2020, 3:16 PM