Avi A
07/24/2020, 7:21 PMJenny
07/24/2020, 7:55 PMJim Crist-Harif
07/24/2020, 8:10 PMDaskExecutor
with the cluster_class
and cluster_kwargs
for starting up your temporary dask cluster (by default cluster_class
is a LocalCluster
, but depending on where you want dask to run you many want to change this). You'd then specify this executor to your environment class, and prefect should handle the rest for you. We have some examples of doing this here: https://docs.prefect.io/orchestration/execution/local_environment.html#examplesAvi A
07/24/2020, 8:22 PMJim Crist-Harif
07/24/2020, 8:25 PMcluster_class
needs to be a callable that returns an instance of a distributed.deploy.Cluster
object. There's existing instances of these for most deployment backends (e.g. dask-kubernetes
for k8s, dask-yarn
for yarn, ...). These can be non trivial to write, is there a reason you'd want to write your own?Avi A
07/24/2020, 8:38 PMCluster
and inject my startup commands to the init function, but it’s going to be better in the long run to make the effort and move to k8s now so I might do just thatJim Crist-Harif
07/24/2020, 8:39 PMAvi A
07/24/2020, 8:45 PMclass NoneK8sCluster(distributed.deploy.Cluster):
def __init__(self, **kwargs):
scheduler_address = my_custom_startup()
super().__init__(address=scheduler_address, **kwargs)
WDTYT?gcloud start <machine_name>
commandsJim Crist-Harif
07/24/2020, 8:46 PMmy_custom_startup()
method would do. Many people have already written dask cluster classes, I'd be surprised if yours was different.Avi A
07/24/2020, 8:47 PMJim Crist-Harif
07/24/2020, 8:48 PMAvi A
07/24/2020, 8:48 PMJim Crist-Harif
07/24/2020, 8:50 PMAvi A
07/24/2020, 8:50 PMJim Crist-Harif
07/24/2020, 8:51 PMAvi A
07/24/2020, 8:51 PMJim Crist-Harif
07/24/2020, 8:52 PMAvi A
07/24/2020, 8:52 PMJim Crist-Harif
07/24/2020, 8:54 PMcluster_class
- this is usually a class, but it doesn't have to be. So you could write a custom function that spins up your google nodes and returns a Cluster
object after you've done the setup.
You might also make use of https://docs.dask.org/en/latest/setup/ssh.html which uses ssh to setup a cluster, given already provisioned nodes.Avi A
07/24/2020, 9:26 PMclose
method?scale(0)
?Jim Crist-Harif
07/24/2020, 9:32 PMCluster
interface's context manager, so yeah, the close
method would need to teardown your resources. In this case you might be interested in implementing your own cluster using the SpecCluster
interface: https://distributed.dask.org/en/latest/api.html?highlight=SpecCluster#distributed.SpecClusterAvi A
07/24/2020, 9:34 PMJim Crist-Harif
07/24/2020, 9:35 PMAvi A
07/24/2020, 9:36 PMJim Crist-Harif
07/24/2020, 9:43 PMon_start
/ on_exit
hooks in the environments to spin up your google containers. This won't give you all the benefits of implementing your own SpecCluster
for integration with prefect, but it might be easier for you.Avi A
07/24/2020, 9:45 PMJim Crist-Harif
07/24/2020, 9:47 PMAvi A
08/04/2020, 4:16 PMLocalEnvironment
with the on_start
/ on_exit
functions set as you suggested. From what I can see, they are not being called by the agent before running the flow on the executor. I added some logging and printing in the function and it’s not being printed by the agent. I also tried adding lines before and after the call to on_start
on the _RunMixin.run
method (which is the function the LocalEnvironment
seems to be using to run a flow), and they are not printed. Can you help me figure out what’s going on?on_start
function
def toggle_dask_machine_state(change_to: MachineState, machine_name: str = None):
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
credentials = GoogleCredentials.get_application_default()
service = discovery.build('compute', 'v1', credentials=credentials)
instances = service.instances()
instance, project, zone = DASK_CONFIG.gcp_machine_info
if machine_name:
instance = machine_name
toggle_fn = getattr(instances, change_to.value)
request = toggle_fn(
project=project,
zone=zone,
instance=instance
)
response = request.execute()
pprint(response)
get_logger().info("%s instance %s.\nResponse from server:\n%s", change_to.value, instance, pformat(response))
def start_dask_cluster():
return toggle_dask_machine_state(MachineState.START)
def get_dask_environment(address: str = DASK_CONFIG.scheduler_addr):
return LocalEnvironment(
executor=DaskExecutor(address=address),
on_start=start_dask_cluster,
on_exit=stop_dask_cluster,
)
Jim Crist-Harif
08/04/2020, 4:24 PM--show-flow-logs
to prefect agent start
to see the output of your print statements.Avi A
08/04/2020, 8:21 PM-vf