<@U011EKN35PT> quick q about <https://github.com/P...
# prefect-community
b
@Jim Crist-Harif quick q about https://github.com/PrefectHQ/prefect/pull/3333: where would a callback like https://docs.prefect.io/orchestration/execution/overview.html#environment-callbacks be go once environments are ✂️ ?
j
I'm planning on porting those over to the executors
Since they'll be new on the executors, we might rethink what callbacks we support. What do you use these for?
b
I actually don't use them exactly but the same kind of thing lives on my custom environment class right now: basically
distributed.Client(executor.address).profile()
and save to HTML
j
Would a callback on start and on end that takes the executor itself be sufficient? You could do that then with the
DaskExecutor
in an
on_exit
callback.
b
yep definitley
j
Or perhaps we'd make the callbacks executor specific. The dask one might take in the client object, but the local one might take no args.
👍 1
j
@Jim Crist-Harif FWIW, we currently use the
on_execute
callback in
DaskCloudProviderEnvironment
to size the number of Dask workers based on Flow Parameters. Effectively something like "oh, you have a list of 12 models to train, let's get you 12 workers for that."
j
Hmmm, interesting. I think that's pointing towards executor-specific callbacks. I'll push something up, thanks for the use-cases y'all.
👍 2
So I don't think the
on_execute
callback is necessary with the new scheme. We may want to add something to make things more composable, but for now I'd like to avoid it. To get the behavior you'd want, you can write a function to create your cluster object and pass it to
cluster_class
. When that function gets called the parameters (and anything else provided as
context
) will already be in
context
, so you'd have full access to configure the cluster however you wanted.
This should already work, with either an environment (
LocalEnvironment
/`FargateTaskEnvironment` /
KubernetesJobEnvironment
only), or the new
KubernetesRun
run config.
Use cases I see for callbacks: • Do something completely unrelated to the executor before the job starts (maybe ping an external service 🤷). • Dynamically configure the dask cluster. This can already be done with
cluster_class
as a function, but maybe we'd want a clearer way? • Dynamically configure the cluster scale/adapt. Currently either you'd have to do that in
cluster_class
(create the cluster and call `scale`/`adapt` before returning), or use static kwargs in
adapt_kwargs
. • Do something before the cluster shuts down (@Brett Naul's case of saving the job profile) • Do something completely unrelated to the executor before the job stops
All possible callbacks for
DaskExecutor
(names subject to change): •
on_start(executor) -> None
, first thing called in
executor.start()
. •
on_cluster_start(executor) -> None
, called after the cluster starts but before the flow is run. Could do scaling here if needed •
on_flow_run_stop(executor) -> None
, called after the flow run has completed, but before shutting the cluster down. Could save the profile here. •
on_stop(executor) -> None
, last thing called before exiting
executor.start()
. We could merge last two together into a single
on_stop
if we don't care about having a callback after the cluster has stopped (would happen before cluster shutdown but after flow execution). Likewise, we could merge the first two if we don't care about having a callback before the cluster starts. I'd like to minimize the number of configurables if possible, so minimizing possible callbacks would be nice unless they're all needed. Y'all have more experience actually running prefect than I do, so your thoughts would be useful here :).