Avi A
07/24/2020, 7:21 PMJenny
07/24/2020, 7:55 PMJim Crist-Harif
07/24/2020, 8:10 PMDaskExecutor with the cluster_class and cluster_kwargs for starting up your temporary dask cluster (by default cluster_class is a LocalCluster, but depending on where you want dask to run you many want to change this). You'd then specify this executor to your environment class, and prefect should handle the rest for you. We have some examples of doing this here: https://docs.prefect.io/orchestration/execution/local_environment.html#examplesAvi A
07/24/2020, 8:22 PMJim Crist-Harif
07/24/2020, 8:25 PMcluster_class needs to be a callable that returns an instance of a distributed.deploy.Cluster object. There's existing instances of these for most deployment backends (e.g. dask-kubernetes for k8s, dask-yarn for yarn, ...). These can be non trivial to write, is there a reason you'd want to write your own?Avi A
07/24/2020, 8:38 PMCluster and inject my startup commands to the init function, but it’s going to be better in the long run to make the effort and move to k8s now so I might do just thatJim Crist-Harif
07/24/2020, 8:39 PMJim Crist-Harif
07/24/2020, 8:40 PMAvi A
07/24/2020, 8:45 PMclass NoneK8sCluster(distributed.deploy.Cluster):
    def __init__(self, **kwargs):
        scheduler_address = my_custom_startup()
        super().__init__(address=scheduler_address, **kwargs)
WDTYT?Avi A
07/24/2020, 8:46 PMgcloud start <machine_name>  commandsJim Crist-Harif
07/24/2020, 8:46 PMmy_custom_startup() method would do. Many people have already written dask cluster classes, I'd be surprised if yours was different.Jim Crist-Harif
07/24/2020, 8:47 PMAvi A
07/24/2020, 8:47 PMJim Crist-Harif
07/24/2020, 8:48 PMAvi A
07/24/2020, 8:48 PMJim Crist-Harif
07/24/2020, 8:50 PMAvi A
07/24/2020, 8:50 PMAvi A
07/24/2020, 8:50 PMJim Crist-Harif
07/24/2020, 8:51 PMAvi A
07/24/2020, 8:51 PMJim Crist-Harif
07/24/2020, 8:52 PMAvi A
07/24/2020, 8:52 PMAvi A
07/24/2020, 8:52 PMJim Crist-Harif
07/24/2020, 8:54 PMcluster_class - this is usually a class, but it doesn't have to be. So you could write a custom function that spins up your google nodes and returns a Cluster object after you've done the setup.
You might also make use of https://docs.dask.org/en/latest/setup/ssh.html which uses ssh to setup a cluster, given already provisioned nodes.Avi A
07/24/2020, 9:26 PMclose method?Avi A
07/24/2020, 9:26 PMscale(0)?Jim Crist-Harif
07/24/2020, 9:32 PMCluster interface's context manager, so yeah, the close method would need to teardown your resources. In this case you might be interested in implementing your own cluster using the SpecCluster interface: https://distributed.dask.org/en/latest/api.html?highlight=SpecCluster#distributed.SpecClusterAvi A
07/24/2020, 9:34 PMJim Crist-Harif
07/24/2020, 9:35 PMAvi A
07/24/2020, 9:36 PMJim Crist-Harif
07/24/2020, 9:43 PMon_start / on_exit hooks in the environments to spin up your google containers. This won't give you all the benefits of implementing your own SpecCluster for integration with prefect, but it might be easier for you.Jim Crist-Harif
07/24/2020, 9:44 PMAvi A
07/24/2020, 9:45 PMJim Crist-Harif
07/24/2020, 9:47 PMAvi A
08/04/2020, 4:16 PMLocalEnvironment with the on_start / on_exit  functions set as you suggested. From what I can see, they are not being called by the agent before running the flow on the executor. I added some logging and printing in the function and it’s not being printed by the agent. I also tried adding lines before and after the call to on_start on the _RunMixin.run method (which is the function the LocalEnvironment seems to be using to run a flow), and they are not printed. Can you help me figure out what’s going on?Avi A
08/04/2020, 4:16 PMon_start function
def toggle_dask_machine_state(change_to: MachineState, machine_name: str = None):
    from googleapiclient import discovery
    from oauth2client.client import GoogleCredentials
    credentials = GoogleCredentials.get_application_default()
    service = discovery.build('compute', 'v1', credentials=credentials)
    instances = service.instances()
    instance, project, zone = DASK_CONFIG.gcp_machine_info
    if machine_name:
        instance = machine_name
    toggle_fn = getattr(instances, change_to.value)
    request = toggle_fn(
        project=project,
        zone=zone,
        instance=instance
    )
    response = request.execute()
    pprint(response)
    get_logger().info("%s instance %s.\nResponse from server:\n%s", change_to.value, instance, pformat(response))
def start_dask_cluster():
    return toggle_dask_machine_state(MachineState.START)
def get_dask_environment(address: str = DASK_CONFIG.scheduler_addr):
    return LocalEnvironment(
        executor=DaskExecutor(address=address),
        on_start=start_dask_cluster,
        on_exit=stop_dask_cluster,
    )Jim Crist-Harif
08/04/2020, 4:24 PM--show-flow-logs to prefect agent start to see the output of your print statements.Avi A
08/04/2020, 8:21 PM-vfAvi A
08/04/2020, 8:23 PMAvi A
08/04/2020, 8:23 PM