Hi, folks. I need to run “create_container” after ...
# ask-community
r
Hi, folks. I need to run “create_container” after successfully finished any task in ifelse statement. How to implement upstream dependency for create_container and ifelse statement?
Copy code
@task(name="Download video", log_stdout=True)
def download_video(task_params, destination_dir='./data'):
    remote_file_path = f"{task_params['storage']['source']}{task_params['video']['filepath']}"
    destination_dir = Path(destination_dir)
    destination_dir.mkdir(exist_ok=True)
    destination_path = f"{destination_dir / Path(remote_file_path).name}"
    file_download(remote_file_path, destination_path)


@task(name="Fetch previous result", log_stdout=True)
def fetch_prev_result(task):
    prev_result = task['prev_result']
    db_name = prev_result['db_name']
    collection_name = prev_result['collection_name']
    out_path = f"{db_name}_{collection_name}.csv"
    mongo_fetch(mongo_client, db_name, collection_name, out_path=out_path)


is_detection = check_equal(task_type, 1)
ifelse(is_detection, download_video(task_params, destination_dir='./videos'), fetch_prev_result(task))
command = '''python -c "from prefect import Flow; f = Flow('empty'); f.run()"'''
container_name = StringFormatter(name="Container name", template="scenario_{scenario_id}_task_{task_id}")
container_id = create_container(image_name=image_name,
                                    container_name=container_name(scenario_id=scenario_id, task_id=task_id),
                                    command=command).set_upstream([download_video, fetch_prev_result])
started = start_container(container_id=container_id)
k
Hey @Ruslan Aliev, just making sure. Is the
is_detection
onwargs inside a flow block? I think what happens is you have both
download_video
and
fetch_prev_result
but only one will fire. The one that doesn’t raises a
SKIP
signal which propagates a
SKIP
to
create_container
. To avoid the
SKIP
, do
create_container(…, skip_on_upstream_skip=False).set_upstream(...)
and I think that will work
r
@Kevin Kho Thank you for reply. I tried you solution, but it doesn’t work for me. Schematic view show 2 instances for each task: • fetch_prev_result and download_video after elif statement • fetch_prev_result and download_video before create_container
k
Gotcha, in that case, maybe the
case
task might serve your use case better?
Copy code
with case(is_detection, True):
     a = task_a()
     task_b().set_upstream([a])

with case(is_detection, False):
     a = other_task_a()
     task_b().set_upstream([a])
What do you think of this?
👍 1
Have you seen this , I think you have but jsut making sure
r
@Kevin Kho Thank you!
case
statement works fine with
merge:
Copy code
is_detection = check_equal(task_type, 1)
with case(is_detection, True):
    file_path1 = download_video(task_params, destination_dir='./videos')
with case(is_detection, False):
    file_path2 = fetch_prev_result(task)

file_path = merge(file_path1, file_path2)
command = '''python -c "from prefect import Flow; f = Flow('empty'); f.run()"'''
container_name = StringFormatter(name="Container name", template="scenario_{scenario_id}_task_{task_id}")
container_id = create_container(image_name=image_name,
                                container_name=container_name(scenario_id=scenario_id, task_id=task_id),
                                command=command).set_upstream(file_path)
k
nice! glad you got it working