https://prefect.io logo
Title
m

Matt Alhonte

04/05/2022, 11:17 PM
Upgrading to 1.0 (using the nightly version that has the Capacity Provider fix), digging a lot! Trying to test out the Dask Performance Reports - is there a way to dynamically generate a filename for it based on the Run's arguments? (not totally sure what Event Handlers have access to 😅 )
k

Kevin Kho

04/05/2022, 11:18 PM
I don’t think so because that is made during registration time. Only the cluster class can be dynamic so this would have to be made explicitly dynamic in the Prefect code that it can handle callables
m

Matt Alhonte

04/05/2022, 11:19 PM
right on!
Hrm...maybe another Flow that archives the Reports into a different folder, kicked off with
create_flow_run
at the end! (think I'll experiment with that!)
a

Anna Geller

04/06/2022, 11:24 AM
You could leverage the Parameter task for it and use it in your terminal state handler.
import io
from prefect.executors import DaskExecutor
from prefect import Flow, task, Parameter
from prefect.engine.state import State
from prefect.utilities.aws import get_boto_client
import prefect
from typing import Set, Optional


def custom_terminal_state_handler(
    flow: Flow, state: State, reference_task_states: Set[State],
) -> Optional[State]:
    # get the html report as a string
    report = flow.executor.performance_report

    # now we can write to S3, GCS, Azure Blob
    # or perform any other custom logic with the report

    # for example, saving the report to s3
    s3_client = get_boto_client("s3")
    report_data = io.BytesIO(report.encode())
    s3_object_name = prefect.context.parameters.get("s3_file_dask_performance_report")
    s3_client.upload_fileobj(
        report_data, Bucket="my-bucket", Key=f"{s3_object_name}.html"
    )


@task
def hello():
    return "hi"


with Flow(
    "performance_report",
    executor=DaskExecutor(performance_report_path="/tmp/performance_report.html"),
    terminal_state_handler=custom_terminal_state_handler,
) as flow:
    s3_path = Parameter("s3_file_dask_performance_report", default="performance_report")
    x = hello()
    flow.add_task(s3_path)