Tom Shaffner
02/08/2022, 8:42 PMTom Shaffner
02/08/2022, 8:43 PMTom Shaffner
02/08/2022, 8:44 PM@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
if cache_flag:
<http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
<http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
with Oracle_DB(server=server) as o_db:
df = o_db.pull_data_from_oracle(oracle_query_sql)
data_amount=len(df.index)
<http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
assert data_amount!=0 # Cause task failure if 0 is returned
logger.debug(f"Oracle pull completed; returning...")
return df
Tom Shaffner
02/08/2022, 8:45 PMwith Flow(FLOW_NAME,result=LocalResult(),schedule=schedule) as flow:
cache_invalid = Parameter('cache_invalid',default=False)
<http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
df = pull_oracle_data_via(oracle_query_sql=ORACLE_QUERY_SQL_PATH,server=ORACLE_SERVER,cache_flag = cache_invalid)
logger.debug('Oracle pull returned.')
df = set_data_types(df)
create_data_summary_artifacts(df)
u=upload_to_table(df, destination_table = DATA_DESTINATION_TABLE_NAME,append=False)
add_indices_to_table(upstream_tasks=[u],destination_table=DATA_DESTINATION_TABLE_NAME,column_list=INDEX_COLUMNS)
flow.run_config = LocalRun(env={'PREFECT__LOGGING__FORMAT':'[%(asctime)s-%(levelname)s - %(name)s]-[%(filename)s:%(funcName)s]-Line %(lineno)d: %(message)s',
'PREFECT__LOGGING__LEVEL':'DEBUG',
},
working_dir=file_path, labels=["normal-process"])
flow.executor = LocalDaskExecutor(scheduler='threads',num_workers=8)
flow.register(project_name=PROJECT_NAME,parameters={'cache_invalid':False},idempotency_key=(FLOW_NAME+datetime.now().strftime("%Y%m%d-%H%M%S")))
Kevin Kho
Tom Shaffner
02/08/2022, 8:45 PMTom Shaffner
02/08/2022, 8:46 PMTom Shaffner
02/08/2022, 8:47 PMpip install -U prefect
the above is what I get as the most current oneTom Shaffner
02/08/2022, 8:48 PMKevin Kho
pip install prefect==1.0rc1
. The -U
just gets the latest release (not pre-release)Tom Shaffner
02/08/2022, 8:48 PMTom Shaffner
02/08/2022, 8:49 PMKevin Kho
Kevin Kho
Tom Shaffner
02/08/2022, 8:52 PMKevin Kho
Tom Shaffner
02/08/2022, 8:59 PMTom Shaffner
02/08/2022, 8:59 PMKevin Kho
Zanie
Tom Shaffner
02/08/2022, 9:01 PMTom Shaffner
02/08/2022, 9:02 PMTom Shaffner
02/08/2022, 9:02 PMZanie
Zanie
Tom Shaffner
02/08/2022, 9:04 PMZanie
Tom Shaffner
02/08/2022, 9:06 PMTom Shaffner
02/08/2022, 9:06 PMTom Shaffner
02/08/2022, 9:08 PMZanie
Tom Shaffner
02/08/2022, 9:09 PMTom Shaffner
02/08/2022, 9:10 PMZanie
Zanie
Tom Shaffner
02/08/2022, 9:12 PMZanie
Tom Shaffner
02/08/2022, 9:12 PMTom Shaffner
02/08/2022, 9:12 PMZanie
Zanie
Tom Shaffner
02/08/2022, 9:14 PMTom Shaffner
02/08/2022, 9:15 PMTom Shaffner
02/08/2022, 9:15 PMTom Shaffner
02/08/2022, 9:16 PMTom Shaffner
02/08/2022, 9:16 PMZanie
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Attaching process based timeout handler...
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Sending execution to a new process...
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Waiting for process to return with 10800s timeout...
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Executing...
8 February 2022,03:31:10 prefect INFO Rows returned in oracle data query: 96,562...
8 February 2022,03:31:10 prefect DEBUG Oracle pull completed; returning...
8 February 2022,03:31:10 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Execution successful.
Zanie
Tom Shaffner
02/08/2022, 9:18 PMZanie
Tom Shaffner
02/08/2022, 9:18 PMZanie
Zanie
Tom Shaffner
02/08/2022, 9:19 PMZanie
Tom Shaffner
02/08/2022, 9:20 PMTom Shaffner
02/08/2022, 9:20 PMTom Shaffner
02/08/2022, 9:20 PMZanie
Tom Shaffner
02/08/2022, 9:21 PMTom Shaffner
02/08/2022, 9:22 PMTom Shaffner
02/08/2022, 9:22 PMZanie
PREFECT__LOGGING__LEVEL=DEBUG prefect run --name "hello-world" --project "example" --execute
Zanie
Tom Shaffner
02/08/2022, 9:25 PMTom Shaffner
02/08/2022, 9:26 PMZanie
Tom Shaffner
02/08/2022, 9:27 PMZanie
type(return_val)
call would have to fail which seems very unlikelyTom Shaffner
02/08/2022, 9:28 PMZanie
Tom Shaffner
02/08/2022, 9:28 PMZanie
Tom Shaffner
02/08/2022, 9:29 PMZanie
Tom Shaffner
02/08/2022, 9:29 PMZanie
Zanie
Tom Shaffner
02/08/2022, 9:30 PMTom Shaffner
02/08/2022, 9:30 PMZanie
Tom Shaffner
02/08/2022, 9:32 PMTom Shaffner
02/08/2022, 9:32 PMZanie
Tom Shaffner
02/08/2022, 9:33 PMTom Shaffner
02/08/2022, 9:33 PMZanie
Tom Shaffner
02/08/2022, 9:42 PMTom Shaffner
02/08/2022, 9:42 PMTom Shaffner
02/08/2022, 9:43 PMTom Shaffner
02/08/2022, 9:43 PMTom Shaffner
02/08/2022, 9:43 PMZanie
Tom Shaffner
02/08/2022, 9:44 PMZanie
Tom Shaffner
02/08/2022, 9:45 PMZanie
Zanie
Tom Shaffner
02/08/2022, 9:46 PMZanie
Tom Shaffner
02/08/2022, 9:51 PMTom Shaffner
02/08/2022, 9:51 PM@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via_with_pickle(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
if cache_flag:
<http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
<http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
with Oracle_DB(server=server) as o_db:
df = o_db.pull_data_from_oracle(oracle_query_sql)
data_amount=len(df.index)
<http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
assert data_amount!=0 # Cause task failure if 0 is returned
logger.debug(f"Oracle pull completed; returning...")
pickled_val = cloudpickle.dumps(df)
logger.debug(f"Pickling successful")
return df
Tom Shaffner
02/08/2022, 10:01 PMTom Shaffner
02/08/2022, 10:03 PMTom Shaffner
02/08/2022, 10:06 PMTom Shaffner
02/08/2022, 10:21 PMTom Shaffner
02/08/2022, 10:21 PMZanie
Zanie
Zanie
Tom Shaffner
02/09/2022, 2:43 PMTom Shaffner
02/09/2022, 2:44 PMTom Shaffner
02/09/2022, 2:44 PMKevin Kho
Zanie
Zanie
Zanie
<http://df.to|df.to>_json()
from your task instead of the raw data.