Владислав Богучаров
02/22/2022, 4:43 PMAnna Geller
02/22/2022, 5:05 PMВладислав Богучаров
02/22/2022, 5:08 PMAnna Geller
02/22/2022, 6:23 PMwr.s3.to_parquet(df1, path1)
Владислав Богучаров
02/22/2022, 7:21 PMAnna Geller
02/22/2022, 7:28 PM@task
def do_sth():
return f"{prefect.context.get('task_name')}.csv"
Kevin Kho
02/22/2022, 7:35 PM@task1
def create_df():
location = df.to_csv(...)
return location
@task
def load_df(location):
df = pd.read_csv(location)
# more stuff
return
with Flow(..) as flow:
loc = create_df()
load_df(loc)
Or you can have Prefect handle it for you with the Result interface and then in this case it is both persisted and passed on to the next task.
@task1(result = S3Result(location="{task_name}.csv", serializer=PandaSerializer())
def create_df():
return df
@task
def load_df(df):
# more stuff
return
with Flow(..) as flow:
df = create_df()
load_df(df)
If you don’t like persisting the file, you can turn off checkpointing too.