Hi Prefect team, I have currently implemented a ta...
# prefect-community
r
Hi Prefect team, I have currently implemented a task to load from disk a file and perform some action serially as mapping did not work due to memory constraints. Is there a way of mapping this task and putting some memory constraints on the task or mapping ? like for example telling prefect to map only if there is sufficient memory ?
a
You could attach a LocalExecutor to your flow (which is the default), then your mapped tasks will be executed sequentially one after the other, rather than in parallel.
What you're describing with resource allocation for parallel tasks seems possible with Dask annotations in Prefect 2 though https://orion-docs.prefect.io/concepts/task-runners/#dask-annotations
Just to clarify the memory question - it depends a lot on how you structure your flow. If you have no reduce-step at the end and process/load this data directly, then you shouldn't have any memory issues:
Copy code
import pandas as pd
import prefect
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor, LocalExecutor
from prefect.run_configs import LocalRun


FLOW_NAME = "01_extract_load"


@task
def extract_and_load(dataset: str) -> None:
    logger = prefect.context.get("logger")
    file = f"<https://raw.githubusercontent.com/anna-geller/jaffle_shop/main/data/{dataset}.csv>"
    df = pd.read_csv(file)
    # load_df_to_snowflake(df, dataset)
    <http://logger.info|logger.info>("Dataset %s with %d rows loaded to DB", dataset, len(df))
    del df  # just to be sure memory is cleaned up


with Flow(
    FLOW_NAME, executor=LocalDaskExecutor(), run_config=LocalRun(labels=["dev"]),
) as flow:
    datasets = ["raw_customers", "raw_orders", "raw_payments"]
    dataframes = extract_and_load.map(datasets)

if __name__ == "__main__":
    flow.run()  # running all tasks in parallel
    print("=== Now running sequentially: ===")
    flow.executor = LocalExecutor()
    flow.run()
You should see the following logs:
Copy code
[2022-03-25 14:51:54+0100] INFO - prefect.FlowRunner | Beginning Flow run for '01_extract_load'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load': Finished task run for task with final state: 'Mapped'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[2]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[1]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[0]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[2] | Dataset raw_payments with 113 rows loaded to DB
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[2]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[0] | Dataset raw_customers with 100 rows loaded to DB
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[0]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[1] | Dataset raw_orders with 99 rows loaded to DB
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[1]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:54+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
=== Now running sequentially: ===
[2022-03-25 14:51:54+0100] INFO - prefect.FlowRunner | Beginning Flow run for '01_extract_load'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load': Finished task run for task with final state: 'Mapped'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[0]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[0] | Dataset raw_customers with 100 rows loaded to DB
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[0]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[1]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[1] | Dataset raw_orders with 99 rows loaded to DB
[2022-03-25 14:51:55+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[1]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:55+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[2]': Starting task run...
[2022-03-25 14:51:55+0100] INFO - prefect.extract_and_load[2] | Dataset raw_payments with 113 rows loaded to DB
[2022-03-25 14:51:55+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[2]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:55+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
🙌 1
r
Thanks @Anna Geller what I am trying to do is very similar to your snippet above. I am using the LocalDask Executor but not prefect 2.0 I will try and delete my dataframe(s) at the end of the task call and see if that saves me from the memory crashes.
👍 1