https://prefect.io logo
Title
v

Vadym Dytyniak

05/26/2022, 8:47 AM
Hi. Is it possible to access current flow object in custom task implementation?
k

Kevin Kho

05/26/2022, 2:09 PM
Not quite. What are you trying to do?
v

Vadym Dytyniak

05/26/2022, 2:13 PM
Set terminal_state_handler inside the task
k

Kevin Kho

05/26/2022, 2:22 PM
That can’t be done because it’s fixed during registration time. I need to ask you again what the goal is haha.
v

Vadym Dytyniak

05/26/2022, 2:24 PM
Actually I can do it during running, but I have to pass flow as aa argument in task run 😉
I am creating the task to create clusters and I want to shutdown them in terminal_state_handler
k

Kevin Kho

05/26/2022, 2:25 PM
Oh it works? Can I see a sample?
v

Vadym Dytyniak

05/26/2022, 2:28 PM
class CreateClusterTask(Task):
    # pylint: disable=arguments-differ
    def run(
        self,
        context: Flow,
        name: str,
        dependencies: list[str] | None = None,
        **cluster_kwargs: Any,
    ) -> str:
        manager = ClusterManager(dependencies)
        cluster = manager(name, **cluster_kwargs)

        if context.terminal_state_handler is not None:
            current_terminal_state_handler = context.terminal_state_handler

            def terminal_state_handler(*args: Any) -> None:
                current_terminal_state_handler(*args)
                manager.teardown(*args)

            context.terminal_state_handler = terminal_state_handler
        else:
            context.terminal_state_handler = manager.teardown

        <http://self.logger.info|self.logger.info>('Dashboard link: %s', cluster.dashboard_link)

        return cluster.scheduler_address  # type: ignore
I believe it works because it is LocalDaskExecutor
k

Kevin Kho

05/26/2022, 2:30 PM
Ahh nice! That’s news to me. Curious why you couldn’t use the resource manager instead?
v

Vadym Dytyniak

05/26/2022, 3:08 PM
Hm, I wasn't aware about this feature. Should return object from setup be serialisable?
k

Kevin Kho

05/26/2022, 3:11 PM
I dont think so cuz the example is a Dask Client
v

Vadym Dytyniak

05/26/2022, 3:13 PM
Let me try to implement via ResourceManager
k

Kevin Kho

05/26/2022, 3:14 PM
If what you have works, no need to change i guess
v

Vadym Dytyniak

05/26/2022, 3:18 PM
I see few possible issues with that approach
Resource manager is more native
Do you probably know how to use Dask Client as context manages to make the same task work with different clusters?
because currently if I leave set_as_default=True, they submit to the same cluster
if I set set_as_default=True - it is running locally
k

Kevin Kho

05/26/2022, 4:10 PM
I don’t…but don’t you just have two clients and then use other_client.submit() with the other one?
v

Vadym Dytyniak

05/26/2022, 4:13 PM
I don't want users to submit to client directly, I thought that context manager exists and any DataFrame manipulation will be submitted to needed cluster.
k

Kevin Kho

05/26/2022, 4:15 PM
Ohh for the resource manager? Just make the client and then
with DaskCluster(n_workers=n_workers) as client:
        # These tasks rely on a dask cluster to run, so we create them inside
        # the `DaskCluster` resource manager
        df = load_data()
        summary = summarize(df, client)
pass it down and those guys inside can use the client. I believe the load_data should also use the cluster