Hi. Is it possible to access current flow object i...
# prefect-community
v
Hi. Is it possible to access current flow object in custom task implementation?
k
Not quite. What are you trying to do?
v
Set terminal_state_handler inside the task
k
That can’t be done because it’s fixed during registration time. I need to ask you again what the goal is haha.
v
Actually I can do it during running, but I have to pass flow as aa argument in task run 😉
I am creating the task to create clusters and I want to shutdown them in terminal_state_handler
k
Oh it works? Can I see a sample?
v
Copy code
class CreateClusterTask(Task):
    # pylint: disable=arguments-differ
    def run(
        self,
        context: Flow,
        name: str,
        dependencies: list[str] | None = None,
        **cluster_kwargs: Any,
    ) -> str:
        manager = ClusterManager(dependencies)
        cluster = manager(name, **cluster_kwargs)

        if context.terminal_state_handler is not None:
            current_terminal_state_handler = context.terminal_state_handler

            def terminal_state_handler(*args: Any) -> None:
                current_terminal_state_handler(*args)
                manager.teardown(*args)

            context.terminal_state_handler = terminal_state_handler
        else:
            context.terminal_state_handler = manager.teardown

        <http://self.logger.info|self.logger.info>('Dashboard link: %s', cluster.dashboard_link)

        return cluster.scheduler_address  # type: ignore
I believe it works because it is LocalDaskExecutor
k
Ahh nice! That’s news to me. Curious why you couldn’t use the resource manager instead?
v
Hm, I wasn't aware about this feature. Should return object from setup be serialisable?
k
I dont think so cuz the example is a Dask Client
v
Let me try to implement via ResourceManager
k
If what you have works, no need to change i guess
v
I see few possible issues with that approach
Resource manager is more native
Do you probably know how to use Dask Client as context manages to make the same task work with different clusters?
because currently if I leave set_as_default=True, they submit to the same cluster
if I set set_as_default=True - it is running locally
k
I don’t…but don’t you just have two clients and then use other_client.submit() with the other one?
v
I don't want users to submit to client directly, I thought that context manager exists and any DataFrame manipulation will be submitted to needed cluster.
k
Ohh for the resource manager? Just make the client and then
Copy code
with DaskCluster(n_workers=n_workers) as client:
        # These tasks rely on a dask cluster to run, so we create them inside
        # the `DaskCluster` resource manager
        df = load_data()
        summary = summarize(df, client)
pass it down and those guys inside can use the client. I believe the load_data should also use the cluster