Joël Luijmes
12/03/2020, 12:31 PMDimitris Stafylarakis
12/03/2020, 12:33 PMJoël Luijmes
12/03/2020, 12:33 PM~> kgs
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
dask-scheduler ClusterIP 10.32.4.76 <none> 8786/TCP,80/TCP 43s
prefect-postgresql ClusterIP 10.32.0.123 <none> 5432/TCP 20d
prefect-postgresql-headless ClusterIP None <none> 5432/TCP 20d
prefect-server-apollo LoadBalancer 10.32.15.162 34.91.229.28 4200:32755/TCP 20d
prefect-server-graphql ClusterIP 10.32.6.217 <none> 4201/TCP 20d
prefect-server-hasura ClusterIP 10.32.13.168 <none> 3000/TCP 20d
prefect-server-ui LoadBalancer 10.32.7.203 34.91.165.136 8080:32471/TCP 20d
~> kgp
NAME READY STATUS RESTARTS AGE
dask-scheduler-9b4bdffc-vrq9g 1/1 Running 0 14m
dask-worker-6c9755b48-jg26c 1/1 Running 0 14m
dask-worker-6c9755b48-lxptc 1/1 Running 0 14m
dask-worker-6c9755b48-tk2bz 1/1 Running 0 14m
hello-7t8qs 0/2 Completed 0 9d
prefect-agent-594d644fd-cpzxt 1/1 Running 0 52m
prefect-postgresql-0 1/1 Running 0 20d
prefect-server-apollo-77db95d99-st9l4 1/1 Running 0 3h22m
prefect-server-graphql-846796fdcd-9xkf6 1/1 Running 0 3h22m
prefect-server-hasura-59b994c54-nd55h 1/1 Running 3 20d
prefect-server-towel-67d57669b9-nrszp 1/1 Running 0 20d
prefect-server-ui-78ff94b998-lfmj6 1/1 Running 0 3h22m
Dimitris Stafylarakis
12/03/2020, 12:33 PMJoël Luijmes
12/03/2020, 12:33 PMif __name__ == "__main__":
# Store the flow in the following GCR
storage_gcr = Docker(
registry_url="XX",
)
# Use LocalDaskExecutor to parallelize tasks within flow
executor_dask = DaskExecutor(address="<tcp://dask-scheduler:8786>", )
# Generate Prefect tasks based on the generated job specs
tasks_syncs = [create_job_task(job_spec) for job_spec in generate_job_specs()]
# Sends notification to Slack on completion
task_slack_completed = SlackTask(
name="slack",
message=f"Completed syncing {len(tasks_syncs)} jobs"
)
with Flow("CloudSQL to BigQuery", storage=storage_gcr, executor=executor_dask) as flow:
task_slack_completed.set_dependencies(upstream_tasks=tasks_syncs)
flow.register(project_name="root")
Dimitris Stafylarakis
12/03/2020, 12:34 PMJoël Luijmes
12/03/2020, 12:36 PMmin_workesr
but that was with legacy environment thingy.Dimitris Stafylarakis
12/03/2020, 12:36 PMJoël Luijmes
12/03/2020, 12:41 PMDimitris Stafylarakis
12/03/2020, 12:47 PMJoël Luijmes
12/03/2020, 12:49 PMDimitris Stafylarakis
12/03/2020, 12:49 PMJoël Luijmes
12/03/2020, 12:55 PM- name: PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS
value: prefect.engine.executors.DaskExecutor
- name: PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS
value: <tcp://dask-scheduler:8786>
On the Kubernetes agent, but still no luckDimitris Stafylarakis
12/03/2020, 12:56 PMJoël Luijmes
12/03/2020, 12:58 PMDimitris Stafylarakis
12/03/2020, 1:09 PMfrom prefect import task, Flow
from prefect.engine.executors import DaskExecutor
from prefect.engine.result_handlers import LocalResultHandler
from prefect.environments.storage import Docker
from prefect.run_configs import KubernetesRun
import datetime
import random
from time import sleep
@task
def inc(x):
sleep(random.random() / 10)
return x + 1
@task
def dec(x):
sleep(random.random() / 10)
return x - 1
@task
def add(x, y):
sleep(random.random() / 10)
return x + y
@task(name="sum")
def list_sum(arr):
return sum(arr)
with Flow("dask-example", result_handler=LocalResultHandler()) as flow:
incs = inc.map(x=range(100))
decs = dec.map(x=range(100))
adds = add.map(x=incs, y=decs)
total = list_sum(adds)
flow.executor = DaskExecutor(address="<tcp://dask-scheduler:8786>")
flow.storage = Docker()
flow.run_config = KubernetesRun(labels=["dev"])
flow.register(project_name="Hello World")
Joël Luijmes
12/03/2020, 1:17 PMDimitris Stafylarakis
12/03/2020, 1:25 PMJoël Luijmes
12/03/2020, 1:32 PMrun_config
after adding that, it works like a charm 👍
Thanks @Dimitris Stafylarakis for thinking with me! 🙂Dimitris Stafylarakis
12/03/2020, 1:33 PM