Ruslan Aliev
09/21/2021, 12:25 PM@task(name="Download video", log_stdout=True)
def download_video(task_params, destination_dir='./data'):
remote_file_path = f"{task_params['storage']['source']}{task_params['video']['filepath']}"
destination_dir = Path(destination_dir)
destination_dir.mkdir(exist_ok=True)
destination_path = f"{destination_dir / Path(remote_file_path).name}"
file_download(remote_file_path, destination_path)
@task(name="Fetch previous result", log_stdout=True)
def fetch_prev_result(task):
prev_result = task['prev_result']
db_name = prev_result['db_name']
collection_name = prev_result['collection_name']
out_path = f"{db_name}_{collection_name}.csv"
mongo_fetch(mongo_client, db_name, collection_name, out_path=out_path)
is_detection = check_equal(task_type, 1)
ifelse(is_detection, download_video(task_params, destination_dir='./videos'), fetch_prev_result(task))
command = '''python -c "from prefect import Flow; f = Flow('empty'); f.run()"'''
container_name = StringFormatter(name="Container name", template="scenario_{scenario_id}_task_{task_id}")
container_id = create_container(image_name=image_name,
container_name=container_name(scenario_id=scenario_id, task_id=task_id),
command=command).set_upstream([download_video, fetch_prev_result])
started = start_container(container_id=container_id)
Kevin Kho
is_detection
onwargs inside a flow block?
I think what happens is you have both download_video
and fetch_prev_result
but only one will fire. The one that doesn’t raises a SKIP
signal which propagates a SKIP
to create_container
. To avoid the SKIP
, do
create_container(…, skip_on_upstream_skip=False).set_upstream(...)
and I think that will workRuslan Aliev
09/21/2021, 2:31 PMKevin Kho
case
task might serve your use case better?
with case(is_detection, True):
a = task_a()
task_b().set_upstream([a])
with case(is_detection, False):
a = other_task_a()
task_b().set_upstream([a])
What do you think of this?Kevin Kho
Ruslan Aliev
09/21/2021, 3:51 PMcase
statement works fine with merge:
is_detection = check_equal(task_type, 1)
with case(is_detection, True):
file_path1 = download_video(task_params, destination_dir='./videos')
with case(is_detection, False):
file_path2 = fetch_prev_result(task)
file_path = merge(file_path1, file_path2)
command = '''python -c "from prefect import Flow; f = Flow('empty'); f.run()"'''
container_name = StringFormatter(name="Container name", template="scenario_{scenario_id}_task_{task_id}")
container_id = create_container(image_name=image_name,
container_name=container_name(scenario_id=scenario_id, task_id=task_id),
command=command).set_upstream(file_path)
Kevin Kho