Dexter Antonio
02/19/2022, 5:58 PM@task
def extract():
return [1, 2, 3]
@task
def transform1(x):
return [x, x * 2, x * 3]
@task
def load(x):
print(f"output: {x}")
if __name__ == '__main__':
with Flow("mapping test") as flow:
e = extract()
t1 = transform1.map(e)
l = load(t1)
flow.run()
operation on multiple rows:
@task
def extract():
return [[1, 2, 3], [3, 8, 3], [35, 5, 1]]
@task
def transform1(x):
return [x, x * 2, x * 3]
@task
def load(x):
print(f"output: {x}")
if __name__ == '__main__':
i = 0 # could I cycle through all rows and perform this operation?
with Flow("mapping test") as flow:
e = extract()[i]
t1 = transform1.map(e)
l = load(t1)
flow.run()
Kevin Kho
df.values.tolist()
or something like that which returns a List of rows. You can get it as a List of lists or List of dicts, but this is not a good use of Prefect most of the time because we charge per task. Do you need observability and retries for every row? If you do not, you can just use the Pandas apply directlyDexter Antonio
02/19/2022, 6:57 PMKevin Kho
create_flow_run
task in Prefect, which allows you to pass in Parameters to the subflow. You can do create_flow_run.map()
and pass in each row like that.
But it’s not as straightforward to pull the results back the main Flow. It is easier in Prefect 2.0, but it’s a bit harder in Prefect 1.0from prefect import Flow, task
import pandas as pd
@task
def load_dataframe():
return pd.DataFrame({"a": [1,2,3], "b":[1,2,3]})
@task
def df_to_rows(df: pd.DataFrame):
return df.to_dict('records')
@task
def transform(row):
row["c"] = row["b"] + row["a"]
return row
@task
def load(rows):
df = pd.DataFrame.from_records(rows)
print(df.head())
return df
with Flow("test") as flow:
df = load_dataframe()
rows = df_to_rows(df)
transformed_rows = transform.map(rows)
load(transformed_rows)
flow.run()
Dexter Antonio
02/19/2022, 7:11 PMKevin Kho
df = pd.DataFrame({"a": [1,2,3], "b":[1,2,3]})
def transform(row):
return row["b"] + row["a"]
df['c'] = df.apply(transform, axis=1)
print(df.head())
Dexter Antonio
02/19/2022, 7:54 PMAuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated. See prefect auth login --help
when I try to run the following code. I just want to run the code locally for now.
import prefect
from prefect import task, Flow, Parameter
from prefect.tasks.prefect.flow_run import (
create_flow_run,
get_task_run_result,
)
@task
def create_some_data(length: int):
return list(range(length))
with Flow("child") as child_flow:
data_size = Parameter("data_size", default=5)
data = create_some_data(data_size)
with Flow("parent") as parent_flow:
child_run_id = create_flow_run(
flow_name=child_flow.name, parameters=dict(data_size=10)
)
child_data = get_task_run_result(child_run_id, "create_some_data-1")
# Register the sub flow
child_flow.register("default")
# Run the parent flow locally
#flow_run = parent_flow.run()
# Print the retrieved data
#print(flow_run.result[child_data].result)
Kevin Kho