James Brink
05/05/2021, 4:05 PMimport prefect
from prefect import task, Flow
from prefect.engine.signals import RETRY, FAIL, PAUSE
from datetime import datetime, timedelta
@task
def start_collection(timeout_interval, collection_name):
invoke_outside_service(collection_name)
timeout_time = datetime.now() + timedelta(seconds=timeout_interval)
return timeout_time
#can I use code to define max retries? ideally 'max_retries=timeout_interval//600 + 2' to retry every 5 minutes until timeout period ends
@task(max_retries=50, retry_delay=timedelta(seconds=300))
def check_collection_status(timeout_time, collection_name):
status = poll_outside_service_status(collection_name)
if status == 'complete':
return '<s3://nitorum-data/>' + collection_name + '/' + datetime.now(
).strftime('%Y-%m-%d') + '.csv'
elif status == 'failed':
raise FAIL(message='Collection failed.')
elif status == 'running':
if datetime.now() > timeout_time:
raise PAUSE(message='Collection timed out.')
else:
raise RETRY(message='Collection still running.')
@task
def load_data(path):
data = download_data_from_s3(path)
return data
@task
def transform_task(data):
data = transform(data)
return data
@task
def save_data(data):
save_to_database(data)
with Flow("ETL with Outside Service Data Collection") as flow:
timeout_interval = Parameter("timeout_interval", default=3600)
collection_name = Parameter("collection_name", default="colleciton_1")
timeout_time = start_collection(timeout_interval, collection_name)
path = check_collection_status(timeout_time, collection_name)
data = load_data(path)
data = transform_task(data)
save_data(data)
Kevin Kho
check_collection_status
. If the status != complete
, recursively call the inner function until it is complete then raise SUCCESS
. What do you think of this?Kevin Kho
James Brink
05/05/2021, 4:34 PMJames Brink
05/05/2021, 4:35 PMKevin Kho
@task()
def check_collection_status(timeout_time, collection_name):
def recursive_call():
if datetime.now() > timeout_time:
raise PAUSE(message='Collection timed out.')
status = poll_outside_service_status(collection_name)
if status == 'complete':
return '<s3://nitorum-data/>' + collection_name + '/' + datetime.now(
).strftime('%Y-%m-%d') + '.csv'
elif status == 'failed':
raise FAIL(message='Collection failed.')
time.sleep(some_interval_here)
recursive_call()
Kevin Kho
James Brink
05/05/2021, 5:35 PMJames Brink
05/05/2021, 5:35 PMimport prefect
from prefect import task, Flow
from prefect.engine.signals import RETRY, FAIL, PAUSE
from datetime import datetime, timedelta
@task
def start_collection(timeout_interval, collection_name):
invoke_outside_service(collection_name)
timeout_time = datetime.now() + timedelta(seconds=timeout_interval)
return timeout_time
@task()
def check_collection_status(timeout_time, collection_name):
def recursive_call():
status = poll_outside_service_status(collection_name)
if status == 'complete':
return 's3://nitorum-data/' + collection_name + '/' + datetime.now(
).strftime('%Y-%m-%d') + '.csv'
elif status == 'failed':
raise FAIL(message='Collection failed.')
else:
if datetime.now() > timeout_time:
raise PAUSE(message='Collection timed out.')
time.sleep(3600)
recursive_call()
recursive_call()
@task
def load_data(path):
data = download_data_from_s3(path)
return data
@task
def transform_task(data):
data = transform(data)
return data
@task
def save_data(data):
save_to_database(data)
with Flow("ETL with Outside Service Data Collection") as flow:
timeout_interval = Parameter("timeout_interval", default=3600)
collection_name = Parameter("collection_name", default="colleciton_1")
timeout_time = start_collection(timeout_interval, collection_name)
path = check_collection_status(timeout_time, collection_name)
data = load_data(path)
data = transform_task(data)
save_data(data)Kevin Kho
Kevin Kho
James Brink
05/05/2021, 7:14 PM