Kamilly Victória Ruseler
06/25/2024, 4:47 PMMarvin
06/25/2024, 4:47 PMpython
import dask.dataframe as dd
from dask.distributed import Client
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
from prefect.logging import get_run_logger
# Initialize the Dask client
client = Client()
@task
def read_data(start: str, end: str) -> dd.DataFrame:
df = dd.datasets.timeseries(start, end, partition_freq="4w")
return df
@task
def process_data(df: dd.DataFrame) -> dd.DataFrame:
df_yearly_avg = df.groupby(df.index.year).mean()
return df_yearly_avg.compute()
@flow(task_runner=DaskTaskRunner())
def dask_pipeline():
logger = get_run_logger()
df = read_data("1988", "2022")
df_yearly_average = process_data(df)
<http://logger.info|logger.info>("Dask pipeline executed successfully.", extra={"custom": "event"})
return df_yearly_average
if __name__ == "__main__":
dask_pipeline()
### Key Points
1. Initialize the Dask Client: Set up a Dask client to manage your distributed tasks.
2. Define Prefect Tasks and Flow: Use @task
to define tasks and @flow
to define the workflow.
3. Use Prefect's DaskTaskRunner: Specify the DaskTaskRunner
in the flow decorator to distribute tasks.
4. Log Events: Use Prefect's logger (get_run_logger
) to log messages and events.
### Documentation Links
For further reading and examples, you can check the following documentation and GitHub issue:
- Prefect Dask Integration
- Prefect Task Runners
- GitHub Issue: KubernetesFlowRunner ends asyncio in error even on success
These resources should get you started with integrating Dask and Prefect for logging purposes.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by