Jason
05/09/2022, 7:49 PMJason
05/09/2022, 7:50 PMUpdated
9 May 2022 2:31pm
Result Type
Result
Result Location
None
Kevin Kho
Jason
05/09/2022, 8:02 PM@task(name="Extract Owners from Streamline")
def extract_owners() -> DataFrame:
"""
Grab Owners from Streamline and serialize as a Pandas DataFrame.
"""
logger = prefect.context.get("logger")
streamline = Streamline(token_key=TOKEN_KEY, token_secret=TOKEN_SECRET)
owners_df: DataFrame = streamline.get_owners()
if owners_df.empty:
raise FAIL(
"Owners DataFrame is empty. Something is wrong with the API pull from Streamline."
)
<http://logger.info|logger.info>(f"Pulled {owners_df.shape[0]} owners for {FLOW_NAME}")
return owners_df
Jason
05/09/2022, 8:03 PMowners_df = extract_owners()
load_owners_s3(owners_df, database=glue_database)
Jason
05/09/2022, 8:03 PMload_owners_s3
, it's a Nonetype, which raises the appropriate error with aws-data-wrangler: wr.s3.to_parquet.Jason
05/09/2022, 8:07 PMJason
05/09/2022, 8:29 PMby default, when you use e.g. S3 storage, Prefect will use S3 also for results automatically, unless you explicitly turn off checkpointing
Kevin Kho
Kevin Kho
Jason
05/09/2022, 8:54 PMJason
05/09/2022, 8:54 PMJason
05/09/2022, 8:55 PMKevin Kho
Jason
05/09/2022, 8:57 PMKevin Kho
glue_database
?Jason
05/09/2022, 8:57 PMJason
05/09/2022, 8:58 PMKevin Kho
Kevin Kho