Matt Alhonte
04/05/2022, 11:17 PMKevin Kho
04/05/2022, 11:18 PMMatt Alhonte
04/05/2022, 11:19 PMcreate_flow_run
at the end! (think I'll experiment with that!)Anna Geller
04/06/2022, 11:24 AMimport io
from prefect.executors import DaskExecutor
from prefect import Flow, task, Parameter
from prefect.engine.state import State
from prefect.utilities.aws import get_boto_client
import prefect
from typing import Set, Optional
def custom_terminal_state_handler(
flow: Flow, state: State, reference_task_states: Set[State],
) -> Optional[State]:
# get the html report as a string
report = flow.executor.performance_report
# now we can write to S3, GCS, Azure Blob
# or perform any other custom logic with the report
# for example, saving the report to s3
s3_client = get_boto_client("s3")
report_data = io.BytesIO(report.encode())
s3_object_name = prefect.context.parameters.get("s3_file_dask_performance_report")
s3_client.upload_fileobj(
report_data, Bucket="my-bucket", Key=f"{s3_object_name}.html"
)
@task
def hello():
return "hi"
with Flow(
"performance_report",
executor=DaskExecutor(performance_report_path="/tmp/performance_report.html"),
terminal_state_handler=custom_terminal_state_handler,
) as flow:
s3_path = Parameter("s3_file_dask_performance_report", default="performance_report")
x = hello()
flow.add_task(s3_path)