Jason Motley
12/01/2021, 9:37 PMflow = Flow("myflow")
with flow:
df = extract(stuff)
print(df.head(10)) # This does not work
Kevin Kho
df
is not a DataFrame
. It is a Task
that eventually resolves to a DataFrame so you need to do the df.head()
inside a task so that it is deferred also. Otherwise, this will try to execute immediately when df
doesn’t existJason Motley
12/01/2021, 9:40 PMKevin Kho
@task
def the_head(df):
<http://prefect.context.logger.info|prefect.context.logger.info>(df.head(10)
return
or you can add the logging statement inside your extract taskKevin Kho
Jason Motley
12/01/2021, 9:45 PMKevin Kho
import prefect
to your scriptJason Motley
12/01/2021, 9:48 PMKevin Kho
Kevin Kho
flow = Flow("myflow")
with flow:
df = extract(stuff)
the_head(df)
like this?Jason Motley
12/01/2021, 9:50 PMKevin Kho
Jason Motley
12/01/2021, 9:52 PMKevin Kho
import pandas as pd
from prefect import Flow, task
import prefect
@task
def extract():
return pd.DataFrame({"a": [1,2,3,4], "b":[1,2,3,4]})
@task
def the_head(df):
<http://prefect.context.logger.info|prefect.context.logger.info>(df.head(10))
return
with Flow("df_test") as flow:
df = extract()
the_head(df)
flow.run()
Jason Motley
12/01/2021, 9:54 PMKevin Kho
Jason Motley
12/01/2021, 11:12 PMJason Motley
12/01/2021, 11:12 PMbase_df=extract_base(connection_r)
Jason Motley
12/01/2021, 11:12 PMKevin Kho
base_df
Jason Motley
12/01/2021, 11:13 PM