Kamilly Victória Ruseler
06/25/2024, 4:47 PMMarvin
06/25/2024, 4:47 PMpython
import dask.dataframe as dd
from dask.distributed import Client
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
from prefect.logging import get_run_logger
# Initialize the Dask client
client = Client()
@task
def read_data(start: str, end: str) -> dd.DataFrame:
df = dd.datasets.timeseries(start, end, partition_freq="4w")
return df
@task
def process_data(df: dd.DataFrame) -> dd.DataFrame:
df_yearly_avg = df.groupby(df.index.year).mean()
return df_yearly_avg.compute()
@flow(task_runner=DaskTaskRunner())
def dask_pipeline():
logger = get_run_logger()
df = read_data("1988", "2022")
df_yearly_average = process_data(df)
<http://logger.info|logger.info>("Dask pipeline executed successfully.", extra={"custom": "event"})
return df_yearly_average
if __name__ == "__main__":
dask_pipeline()
### Key Points
1. Initialize the Dask Client: Set up a Dask client to manage your distributed tasks.
2. Define Prefect Tasks and Flow: Use @task
to define tasks and @flow
to define the workflow.
3. Use Prefect's DaskTaskRunner: Specify the DaskTaskRunner
in the flow decorator to distribute tasks.
4. Log Events: Use Prefect's logger (get_run_logger
) to log messages and events.
### Documentation Links
For further reading and examples, you can check the following documentation and GitHub issue:
- Prefect Dask Integration
- Prefect Task Runners
- GitHub Issue: KubernetesFlowRunner ends asyncio in error even on success
These resources should get you started with integrating Dask and Prefect for logging purposes.