Mahesh
11/22/2022, 3:56 AM@task(task_run_name="{val}")
def shell_exec(val):
with prefect.context(task_run_name=val):
ShellTask(
helper_script="cd /opt/prefect/",
shell="bash",
return_all=True,
log_stderr=True,
log_stdout=True,
stream_output=True,
command="/venv/prefect-env/bin/python sql_run.py {}".format(val)
).run()
with Flow("flowname") as flow:
sql_list=get_values()
tasks = [
shell_exec(val)
for val in sql_list
]
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
I am trying same flow with prefect2,
1. I need to get task name at run time.
2. I have to for loop with dependency.
@flow(name="exec")
def exec():
list1=["run1.sql","run2.sql","run3.sql"]
tasks = [
shell_exec(val)
for val in list1
]
for i in range(1, len(tasks)):
tasks[i].wait_for(tasks[i - 1])
3. I have used wait_for
instead of up_stream
and stucked with below error.
08:57:59.052 | ERROR | Flow run 'vivacious-hawk' - Finished in state Failed("Flow run encountered an exception. AttributeError:
'coroutine' object has no attribute 'wait_for'\n")
Traceback (most recent call last):
File "flows/test.py", line 31, in <module>
exec()
File "/venv/prefect2/lib/python3.8/site-packages/prefect/flows.py", line 442, in __call__
return enter_flow_run_engine_from_flow_call(
File "/venv/prefect2/lib/python3.8/site-packages/prefect/engine.py", line 157, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/venv/prefect2/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/venv/prefect2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/venv/prefect2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/venv/prefect2/lib/python3.8/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/venv/prefect2/lib/python3.8/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
return await state.result(fetch=True)
File "/venv/prefect2/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
raise await get_state_exception(state)
File "/venv/prefect2/lib/python3.8/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/venv/prefect2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/venv/prefect2/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/venv/prefect2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/venv/prefect2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "flows/test.py", line 26, in exec
tasks[i].wait_for(tasks[i - 1])
AttributeError: 'coroutine' object has no attribute 'wait_for'
Please let me know what is missing hereZanie
11/22/2022, 5:14 AMimport prefect
from prefect import flow, task, get_run_logger
@task
def hi(msg):
get_run_logger().info(msg)
return 1
@flow(name="exec")
def exec():
list1=["run1.sql","run2.sql","run3.sql"]
for val in list1:
hi(val)
exec()
wait_for
from prefect import flow, task, get_run_logger
@task
def hi(msg):
get_run_logger().info(msg)
return 1
@flow(name="exec")
def exec():
list1=["run1.sql","run2.sql","run3.sql"]
last_future = None
for val in list1:
last_future = hi.submit(val, wait_for=[last_future])
exec()
Mahesh
11/22/2022, 5:27 AMZanie
11/22/2022, 5:34 AMhi(val)
would become hi.with_options(name="my-dynamic-name")(val)
Mahesh
11/22/2022, 6:13 AMimport prefect
from prefect import client, flow, task, get_run_logger
from prefect_shell import shell_run_command
@task
def run_sql(name):
return show_output(shell_run_command.fn(command="/venv/prefect2/bin/python run_sql.py -o {}".format(name),
return_all=True,
helper_command="cd /venv/prefect2/deployments",
))
@flow(name="exec")
def exec():
list1=["run1.sql","run2.sql","run3.sql"]
last_future = None
for val in list1:
last_future = run_sql.submit(val, wait_for=[last_future])
if __name__ == '__main__':
exec()
val
so that each task will get it name from val
def show_output(lines):
logger = get_run_logger()
return <http://logger.info|logger.info>(lines)
I tried with show_output(run_sql) but it is not showing logs.Khuyen Tran
11/22/2022, 5:39 PMshow_output
in your flow?