Hey there, I’ve been working on my prefect workfl...
# prefect-community
a
Hey there, I’ve been working on my prefect workflows for a while now and I want to take it to production grade. To that end, I’d like to have my flow turn on a dask cluster, run for a while and then turn it off at the end (even if it fails). I was thinking about having tasks for that but since I’m configuring my flow to run on the cluster, these tasks would need the cluster running in order to operate. I guess there’s something here I’m missing regarding best practices for deployment, can someone here advise based on their own setup, or point me to the right place in the documentation? Thanks!
j
HI @Avi A - thanks for the question. Hopefully others here will have some ideas for you but you might also want to open a Github discussion as they are much more publicly visible/discoverable than Slack (where answers can get hidden as more get added!)
j
Hi Avi, the proper way to do this is to configure a
DaskExecutor
with the
cluster_class
and
cluster_kwargs
for starting up your temporary dask cluster (by default
cluster_class
is a
LocalCluster
, but depending on where you want dask to run you many want to change this). You'd then specify this executor to your environment class, and prefect should handle the rest for you. We have some examples of doing this here: https://docs.prefect.io/orchestration/execution/local_environment.html#examples
🚀 1
upvote 2
a
Looks cool. Is there a way to provide my own cluster startup class? If yes, what should this class implement?
j
cluster_class
needs to be a callable that returns an instance of a
distributed.deploy.Cluster
object. There's existing instances of these for most deployment backends (e.g.
dask-kubernetes
for k8s,
dask-yarn
for yarn, ...). These can be non trivial to write, is there a reason you'd want to write your own?
a
The reason I’m not proficient with k8s and I don’t need the scaling and have preemptible machines standing by and turning on the cluster simply means starting them up. I guess I can inherit
Cluster
and inject my startup commands to the init function, but it’s going to be better in the long run to make the effort and move to k8s now so I might do just that
j
Not at all! How would you deploy normally if you could? There may be an existing class that would work for you.
K8s can be nice, but if you don't need it you shouldn't have to use it.
a
I was thinking I could do something like this:
Copy code
class NoneK8sCluster(distributed.deploy.Cluster):
    def __init__(self, **kwargs):
        scheduler_address = my_custom_startup()
        super().__init__(address=scheduler_address, **kwargs)
WDTYT?
my normal deployment is simply a bunch of
gcloud start <machine_name>
commands
j
Sorry, my question was more about what your
my_custom_startup()
method would do. Many people have already written dask cluster classes, I'd be surprised if yours was different.
Ah. You might be interested in dask-cloudprovider then, which handles a similar use case. I don't think they have gce support yet, but that's on the roadmap. One sec, let me grab ya a link.
j
That's the docs, yeah, I thought there was an open issue about google cloud support.
a
that’s actually pretty cool b/c we might be migrating to Azure soon
j
Oh, well in that case you might just be able to use cloudprovider as is.
a
well, if we use AzureML then yes, but I don’t know what it is yet
I see they support ECS but I don’t understand why anyone would use this when there’s dask-kubernetes
j
Some regulated industries haven't approved k8s for security yet apparently.
a
ohh ok, cool. I was certain ECS is basically managed k8s
j
ECS also doesn't incur any costs if you have no nodes running, where EKS will always have a control plane node running.
a
but i got confused with EKS
👍 1
Thanks for your patience Jim!
j
If existing options don't work for you, you can pass in any callable to
cluster_class
- this is usually a class, but it doesn't have to be. So you could write a custom function that spins up your google nodes and returns a
Cluster
object after you've done the setup. You might also make use of https://docs.dask.org/en/latest/setup/ssh.html which uses ssh to setup a cluster, given already provisioned nodes.
🚀 1
💯 1
a
Jim, I just realized I’m not sure how to decommission the cluster once the flow run is complete! What do I need to implement to do that? Do I need to implement a
close
method?
or is tearing down the cluster done by calling
scale(0)
?
j
Shutting down a cluster is done by the
Cluster
interface's context manager, so yeah, the
close
method would need to teardown your resources. In this case you might be interested in implementing your own cluster using the
SpecCluster
interface: https://distributed.dask.org/en/latest/api.html?highlight=SpecCluster#distributed.SpecCluster
a
yeah that’s the class I was looking at, thanks again!
j
There's also nothing wrong with provisioning a dask cluster outside of a flow run if that's easier for you. My recommendation to let prefect handle it for you is that long running dask clusters can lead to weird behavior over time if multiple flows are running on them (since they're shared python processes, previous flow runs may leave around imports/configuration that subtly affects future flow runs - there's no reliable way to cleanup all state in a python process without restarting it).
a
yes, I want the cluster provisioning / decommission managed by the prefect agent
j
Another option, you might make use of
on_start
/
on_exit
hooks in the environments to spin up your google containers. This won't give you all the benefits of implementing your own
SpecCluster
for integration with prefect, but it might be easier for you.
a
you guys really cover a whole lot of options!! :)
👍 1
j
One of the things on the roadmap right now is to consolidate these options - right now we have lots of hooks that overlap, so we're trying to simplify things to make it clearer to users what the best way to do things is :)
👍 1
P 1
upvote 1
👏🏼 1
a
Hey @Jim Crist-Harif, I’m trying to use the
LocalEnvironment
with the
on_start
 / 
on_exit
functions set as you suggested. From what I can see, they are not being called by the agent before running the flow on the executor. I added some logging and printing in the function and it’s not being printed by the agent. I also tried adding lines before and after the call to
on_start
on the
_RunMixin.run
method (which is the function the
LocalEnvironment
seems to be using to run a flow), and they are not printed. Can you help me figure out what’s going on?
this is my
on_start
function
Copy code
def toggle_dask_machine_state(change_to: MachineState, machine_name: str = None):
    from googleapiclient import discovery
    from oauth2client.client import GoogleCredentials

    credentials = GoogleCredentials.get_application_default()
    service = discovery.build('compute', 'v1', credentials=credentials)
    instances = service.instances()

    instance, project, zone = DASK_CONFIG.gcp_machine_info
    if machine_name:
        instance = machine_name
    toggle_fn = getattr(instances, change_to.value)
    request = toggle_fn(
        project=project,
        zone=zone,
        instance=instance
    )
    response = request.execute()
    pprint(response)
    get_logger().info("%s instance %s.\nResponse from server:\n%s", change_to.value, instance, pformat(response))


def start_dask_cluster():
    return toggle_dask_machine_state(MachineState.START)
Copy code
def get_dask_environment(address: str = DASK_CONFIG.scheduler_addr):
    return LocalEnvironment(
        executor=DaskExecutor(address=address),
        on_start=start_dask_cluster,
        on_exit=stop_dask_cluster,
    )
j
The agent doesn't run the callbacks (agents never run user-defined code), those will run in the subprocess executing the flow. If you're using a local agent, you might need to add
--show-flow-logs
to
prefect agent start
to see the output of your print statements.
a
I’m running it with
-vf
ohh wait it’s working now and I have no idea why
worst kind of error 😞