https://prefect.io logo
d

Dexter Antonio

02/19/2022, 5:58 PM
I currently have a prefect flow that operates on a single row of a pandas dataframe. Is there a straighforward way to map this flow to all of the rows in a pandas dataframe? In other words, can I create a flow and then map it? If I cannot map each row of a dataframe to a flow, is there a straightforward way of nesting different tasks into each other and then mapping that “super” task to a series of inputs?
Here is a code example that demonstrates the concept behind what I am trying to do: Operation on one row :
Copy code
@task
def extract():
    return [1, 2, 3]

@task
def transform1(x):
    return [x, x * 2, x * 3]

@task
def load(x):
    print(f"output: {x}")


if __name__ == '__main__':
    with Flow("mapping test") as flow:
        e = extract()
        t1 = transform1.map(e)
        l = load(t1)
    flow.run()
operation on multiple rows:
Copy code
@task
def extract():
    return [[1, 2, 3], [3, 8, 3], [35, 5, 1]]

@task
def transform1(x):
    return [x, x * 2, x * 3]

@task
def load(x):
    print(f"output: {x}")


if __name__ == '__main__':
    i = 0 # could I cycle through all rows and perform this operation? 
    with Flow("mapping test") as flow:
        e = extract()[i]
        t1 = transform1.map(e)
        l = load(t1)
    flow.run()
k

Kevin Kho

02/19/2022, 6:48 PM
You can do
df.values.tolist()
or something like that which returns a List of rows. You can get it as a List of lists or List of dicts, but this is not a good use of Prefect most of the time because we charge per task. Do you need observability and retries for every row? If you do not, you can just use the Pandas apply directly
d

Dexter Antonio

02/19/2022, 6:57 PM
Ideally, I would like to have a different flow for each row because I am interested in ensuring that the processing of each row is consistent. I have been messing around with mapping but it is a little cumbersome to covert an entire flow. Is there a straightforward way to just define a flow and map it to different inputs?
k

Kevin Kho

02/19/2022, 6:59 PM
You can register the processing Flow, and then you have an outer Flow that uses the
create_flow_run
task in Prefect, which allows you to pass in Parameters to the subflow. You can do
create_flow_run.map()
and pass in each row like that. But it’s not as straightforward to pull the results back the main Flow. It is easier in Prefect 2.0, but it’s a bit harder in Prefect 1.0
Here is an example converting the DataFrame to List of Dict
Copy code
from prefect import Flow, task
import pandas as pd

@task
def load_dataframe():
    return pd.DataFrame({"a": [1,2,3], "b":[1,2,3]})

@task
def df_to_rows(df: pd.DataFrame):
    return df.to_dict('records')

@task
def transform(row):
    row["c"] = row["b"] + row["a"]
    return row

@task
def load(rows):
    df = pd.DataFrame.from_records(rows)
    print(df.head())
    return df

with Flow("test") as flow:
    df = load_dataframe()
    rows = df_to_rows(df)
    transformed_rows = transform.map(rows)
    load(transformed_rows)

flow.run()
d

Dexter Antonio

02/19/2022, 7:11 PM
Thanks! I am going to look into create_flow_run. I think that will be the most natural solution to my use case.
k

Kevin Kho

02/19/2022, 7:13 PM
Yeah no problem. And of course for the example above, it doesn’t need retries/observability so there’s no point in paying Prefect for each row when you can just do:
Copy code
df = pd.DataFrame({"a": [1,2,3], "b":[1,2,3]})
def transform(row):
    return row["b"] + row["a"]

df['c'] = df.apply(transform, axis=1)
print(df.head())
d

Dexter Antonio

02/19/2022, 7:54 PM
Can I use create_flow_run with the prefect core backend or does that command only work with prefect cloud?
I am getting an
AuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated. See prefect auth login --help
when I try to run the following code. I just want to run the code locally for now.
Copy code
import prefect
from prefect import task, Flow, Parameter
from prefect.tasks.prefect.flow_run import (
    create_flow_run,
    get_task_run_result,
)
@task
def create_some_data(length: int):
    return list(range(length))
with Flow("child") as child_flow:
    data_size = Parameter("data_size", default=5)
    data = create_some_data(data_size)
with Flow("parent") as parent_flow:
    child_run_id = create_flow_run(
        flow_name=child_flow.name, parameters=dict(data_size=10)
    )
    child_data = get_task_run_result(child_run_id, "create_some_data-1")
    
# Register the sub flow
child_flow.register("default")
# Run the parent flow locally
#flow_run = parent_flow.run()
# Print the retrieved data
#print(flow_run.result[child_data].result)
k

Kevin Kho

02/20/2022, 12:13 AM
Only Prefect Cloud because the code under the hoods is an API call to Prefect Cloud
👍 1
40 Views