https://prefect.io logo
Title
m

Mahesh

11/22/2022, 3:56 AM
Hello Team, am new to prefect2. Am working on migrating flow from prefect1 to prefect2.
👀 1
In prefect1, I did for-looping dynamic tasks like below,
@task(task_run_name="{val}")
def shell_exec(val):
    with prefect.context(task_run_name=val):
        ShellTask(
            helper_script="cd /opt/prefect/",
            shell="bash",
            return_all=True,
            log_stderr=True,
            log_stdout=True,
            stream_output=True,
            command="/venv/prefect-env/bin/python sql_run.py {}".format(val)
        ).run()

with Flow("flowname") as flow:

    sql_list=get_values()
    tasks = [
        shell_exec(val)
        for val in sql_list
    ]

    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])
I am trying same flow with prefect2, 1. I need to get task name at run time. 2. I have to for loop with dependency.
@flow(name="exec")
def exec():
    list1=["run1.sql","run2.sql","run3.sql"]
    tasks = [
        shell_exec(val)
        for val in list1
    ]
    for i in range(1, len(tasks)):
        tasks[i].wait_for(tasks[i - 1])
3. I have used
wait_for
instead of
up_stream
and stucked with below error.
08:57:59.052 | ERROR   | Flow run 'vivacious-hawk' - Finished in state Failed("Flow run encountered an exception. AttributeError:
'coroutine' object has no attribute 'wait_for'\n")
Traceback (most recent call last):
  File "flows/test.py", line 31, in <module>
    exec()
  File "/venv/prefect2/lib/python3.8/site-packages/prefect/flows.py", line 442, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/venv/prefect2/lib/python3.8/site-packages/prefect/engine.py", line 157, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/venv/prefect2/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/venv/prefect2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/venv/prefect2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/venv/prefect2/lib/python3.8/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/venv/prefect2/lib/python3.8/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/venv/prefect2/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/venv/prefect2/lib/python3.8/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/venv/prefect2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/venv/prefect2/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/venv/prefect2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/venv/prefect2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "flows/test.py", line 26, in exec
    tasks[i].wait_for(tasks[i - 1])
AttributeError: 'coroutine' object has no attribute 'wait_for'
Please let me know what is missing here
z

Zanie

11/22/2022, 5:14 AM
Hi! With
import prefect

from prefect import flow, task, get_run_logger

@task
def hi(msg):
    get_run_logger().info(msg)
    return 1

@flow(name="exec")
def exec():
    list1=["run1.sql","run2.sql","run3.sql"]
    for val in list1:
        hi(val)

exec()
Your tasks will run sequentially
Here’s how you’d use
wait_for
from prefect import flow, task, get_run_logger

@task
def hi(msg):
    get_run_logger().info(msg)
    return 1

@flow(name="exec")
def exec():
    list1=["run1.sql","run2.sql","run3.sql"]
    last_future = None
    for val in list1:
        last_future = hi.submit(val, wait_for=[last_future])

exec()
m

Mahesh

11/22/2022, 5:27 AM
@Zanie Thanks for the quick reply. I tried this solution, with this am not getting my shell task's logs and also with this approach how can i pass task name on task run time.
z

Zanie

11/22/2022, 5:34 AM
How are you defining your task?
hi(val)
would become
hi.with_options(name="my-dynamic-name")(val)
🙌 1
m

Mahesh

11/22/2022, 6:13 AM
Below is my code,
import prefect
from prefect import client, flow, task, get_run_logger
from prefect_shell import shell_run_command

@task
def run_sql(name):
        return show_output(shell_run_command.fn(command="/venv/prefect2/bin/python run_sql.py -o {}".format(name),
                                      return_all=True,
                                      helper_command="cd /venv/prefect2/deployments",
                                ))
@flow(name="exec")
def exec():
    list1=["run1.sql","run2.sql","run3.sql"]
    last_future = None
    for val in list1:
        last_future = run_sql.submit(val, wait_for=[last_future])
if __name__ == '__main__':
    exec()
order should be run1.sql-->run2.sql-->run3.sql with dependency, if run2.sql failed then flow should fail and run3.sql never run
i want to give my task name as
val
so that each task will get it name from
val
And i should get logs from each tasks also,
def show_output(lines):
    logger = get_run_logger()
    return <http://logger.info|logger.info>(lines)
I tried with show_output(run_sql) but it is not showing logs.
k

Khuyen Tran

11/22/2022, 5:39 PM
Where do you put
show_output
in your flow?