Tom Shaffner
02/08/2022, 8:42 PM@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
if cache_flag:
<http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
<http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
with Oracle_DB(server=server) as o_db:
df = o_db.pull_data_from_oracle(oracle_query_sql)
data_amount=len(df.index)
<http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
assert data_amount!=0 # Cause task failure if 0 is returned
logger.debug(f"Oracle pull completed; returning...")
return df
with Flow(FLOW_NAME,result=LocalResult(),schedule=schedule) as flow:
cache_invalid = Parameter('cache_invalid',default=False)
<http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
df = pull_oracle_data_via(oracle_query_sql=ORACLE_QUERY_SQL_PATH,server=ORACLE_SERVER,cache_flag = cache_invalid)
logger.debug('Oracle pull returned.')
df = set_data_types(df)
create_data_summary_artifacts(df)
u=upload_to_table(df, destination_table = DATA_DESTINATION_TABLE_NAME,append=False)
add_indices_to_table(upstream_tasks=[u],destination_table=DATA_DESTINATION_TABLE_NAME,column_list=INDEX_COLUMNS)
flow.run_config = LocalRun(env={'PREFECT__LOGGING__FORMAT':'[%(asctime)s-%(levelname)s - %(name)s]-[%(filename)s:%(funcName)s]-Line %(lineno)d: %(message)s',
'PREFECT__LOGGING__LEVEL':'DEBUG',
},
working_dir=file_path, labels=["normal-process"])
flow.executor = LocalDaskExecutor(scheduler='threads',num_workers=8)
flow.register(project_name=PROJECT_NAME,parameters={'cache_invalid':False},idempotency_key=(FLOW_NAME+datetime.now().strftime("%Y%m%d-%H%M%S")))
Kevin Kho
Tom Shaffner
02/08/2022, 8:45 PMpip install -U prefect
the above is what I get as the most current oneKevin Kho
pip install prefect==1.0rc1
. The -U
just gets the latest release (not pre-release)Tom Shaffner
02/08/2022, 8:48 PMKevin Kho
Tom Shaffner
02/08/2022, 8:52 PMKevin Kho
Tom Shaffner
02/08/2022, 8:59 PMKevin Kho
Zanie
Tom Shaffner
02/08/2022, 9:01 PMZanie
Tom Shaffner
02/08/2022, 9:04 PMZanie
Tom Shaffner
02/08/2022, 9:06 PMZanie
Tom Shaffner
02/08/2022, 9:09 PMZanie
Tom Shaffner
02/08/2022, 9:12 PMZanie
Tom Shaffner
02/08/2022, 9:12 PMZanie
Tom Shaffner
02/08/2022, 9:14 PMZanie
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Attaching process based timeout handler...
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Sending execution to a new process...
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Waiting for process to return with 10800s timeout...
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Executing...
8 February 2022,03:31:10 prefect INFO Rows returned in oracle data query: 96,562...
8 February 2022,03:31:10 prefect DEBUG Oracle pull completed; returning...
8 February 2022,03:31:10 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Execution successful.
Tom Shaffner
02/08/2022, 9:18 PMZanie
Tom Shaffner
02/08/2022, 9:18 PMZanie
Tom Shaffner
02/08/2022, 9:19 PMZanie
Tom Shaffner
02/08/2022, 9:20 PMZanie
Tom Shaffner
02/08/2022, 9:21 PMZanie
PREFECT__LOGGING__LEVEL=DEBUG prefect run --name "hello-world" --project "example" --execute
Tom Shaffner
02/08/2022, 9:25 PMZanie
Tom Shaffner
02/08/2022, 9:27 PMZanie
type(return_val)
call would have to fail which seems very unlikelyTom Shaffner
02/08/2022, 9:28 PMZanie
Tom Shaffner
02/08/2022, 9:28 PMZanie
Tom Shaffner
02/08/2022, 9:29 PMZanie
Tom Shaffner
02/08/2022, 9:29 PMZanie
Tom Shaffner
02/08/2022, 9:30 PMZanie
Tom Shaffner
02/08/2022, 9:32 PMZanie
Tom Shaffner
02/08/2022, 9:33 PMZanie
Tom Shaffner
02/08/2022, 9:42 PMZanie
Tom Shaffner
02/08/2022, 9:44 PMZanie
Tom Shaffner
02/08/2022, 9:45 PMZanie
Tom Shaffner
02/08/2022, 9:46 PMZanie
Tom Shaffner
02/08/2022, 9:51 PM@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via_with_pickle(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
if cache_flag:
<http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
<http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
with Oracle_DB(server=server) as o_db:
df = o_db.pull_data_from_oracle(oracle_query_sql)
data_amount=len(df.index)
<http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
assert data_amount!=0 # Cause task failure if 0 is returned
logger.debug(f"Oracle pull completed; returning...")
pickled_val = cloudpickle.dumps(df)
logger.debug(f"Pickling successful")
return df
Zanie
Tom Shaffner
02/09/2022, 2:43 PMKevin Kho
Zanie
<http://df.to|df.to>_json()
from your task instead of the raw data.