Jason
05/09/2022, 7:49 PMUpdated
9 May 2022 2:31pm
Result Type
Result
Result Location
None
Kevin Kho
05/09/2022, 7:59 PMJason
05/09/2022, 8:02 PM@task(name="Extract Owners from Streamline")
def extract_owners() -> DataFrame:
"""
Grab Owners from Streamline and serialize as a Pandas DataFrame.
"""
logger = prefect.context.get("logger")
streamline = Streamline(token_key=TOKEN_KEY, token_secret=TOKEN_SECRET)
owners_df: DataFrame = streamline.get_owners()
if owners_df.empty:
raise FAIL(
"Owners DataFrame is empty. Something is wrong with the API pull from Streamline."
)
<http://logger.info|logger.info>(f"Pulled {owners_df.shape[0]} owners for {FLOW_NAME}")
return owners_df
owners_df = extract_owners()
load_owners_s3(owners_df, database=glue_database)
load_owners_s3
, it's a Nonetype, which raises the appropriate error with aws-data-wrangler: wr.s3.to_parquet.by default, when you use e.g. S3 storage, Prefect will use S3 also for results automatically, unless you explicitly turn off checkpointing
Kevin Kho
05/09/2022, 8:37 PMJason
05/09/2022, 8:54 PMKevin Kho
05/09/2022, 8:56 PMJason
05/09/2022, 8:57 PMKevin Kho
05/09/2022, 8:57 PMglue_database
?Jason
05/09/2022, 8:57 PMKevin Kho
05/09/2022, 8:59 PM