<@ULVA73B9P> @flow( flow_run_name="remove_env...
# ask-marvin
c
@Marvin @flow( flow_run_name="remove_environment_" + time.strftime("%Y-%m-%d"), log_prints=True, on_failure=[ccl_handling.hook], ) async def remove_environment( env: str, email_recipients=None, ): logger = get_run_logger() current_time = time.ctime() logger.info(f"Now running process: {flow_run.name}") logger.info(f"Current time: {current_time!s}") logger.info(f"User: {os.getlogin()}") ports_killed = await kill_ports.with_options(tags=["Environments"]).submit( source=env ) result = ( await remove_env.with_options(tags=["Environments"]) .submit( env=env, ) .wait_for([ports_killed_confirmation]) ) return result For this i keep getting this error: Encountered exception during execution: Traceback (most recent call last): File "C:\Users\qtask\AppData\Roaming\Python\Python39\site-packages\prefect\engine.py", line 877, in orchestrate_flow_run result = await flow_call.aresult() File "C:\Users\qtask\AppData\Roaming\Python\Python39\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "C:\Users\qtask\AppData\Roaming\Python\Python39\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async result = await coro File "Purestorage\remove_env.py", line 116, in remove_environment await remove_env.with_options(tags=["Environments"]) AttributeError: 'coroutine' object has no attribute 'wait_for'
m
Your message was too long by 119 tokens...
c
@Marvin @flow( flow_run_name="remove_environment_" + time.strftime("%Y-%m-%d"), log_prints=True, on_failure=[ccl_handling.hook], ) async def remove_environment( env: str, email_recipients=None, ): logger = get_run_logger() current_time = time.ctime() logger.info(f"Now running process: {flow_run.name}") logger.info(f"Current time: {current_time!s}") logger.info(f"User: {os.getlogin()}") ports_killed = await kill_ports.with_options(tags=["Environments"]).submit( source=env ) result = ( await remove_env.with_options(tags=["Environments"]) .submit( env=env, ) .wait_for([ports_killed_confirmation]) ) return result For this i keep getting this error: Encountered exception during execution: File "Purestorage\remove_env.py", line 116, in remove_environment await remove_env.with_options(tags=["Environments"]) AttributeError: 'coroutine' object has no attribute 'wait_for'
m
It appears you're encountering an issue because you're trying to use
wait_for
on a coroutine in Prefect 3.x. The
wait_for
method is part of Prefect's task dependency management but is not directly applicable to coroutines. Here's what you can do: 1. Await the Task Submission: Ensure that you await the
submit
method of the task, as it returns a future that you can work with. You should not use
wait_for
directly on a coroutine. 2. Use
wait_for
Correctly
: You should use
wait_for
when defining dependencies between tasks, not on the coroutine itself. Instead, you can pass the future from one task to another to enforce dependencies. Here's an example of how you might refactor your code:
Copy code
python
from prefect import flow, get_run_logger

@flow(
    flow_run_name="remove_environment_" + time.strftime("%Y-%m-%d"),
    log_prints=True,
    on_failure=[ccl_handling.hook],
)
async def remove_environment(env: str, email_recipients=None):
    logger = get_run_logger()
    current_time = time.ctime()

    <http://logger.info|logger.info>(f"Now running process: {flow_run.name}")
    <http://logger.info|logger.info>(f"Current time: {current_time!s}")
    <http://logger.info|logger.info>(f"User: {os.getlogin()}")

    ports_killed = await kill_ports.with_options(tags=["Environments"]).submit(source=env)

    # Ensure you await the task submission and pass the future to the next task
    result = await remove_env.with_options(tags=["Environments"]).submit(env=env, wait_for=[ports_killed])

    return result
For more information on task dependencies and the correct usage of Prefect's async features, you can refer to the Prefect 3.x documentation.
c
@Marvin I am on PRefect 2.18.3
n
wait_for
is a kwarg for submit, not a method on a future
m
In Prefect 2.18.3, the error you're encountering is likely due to the misuse of the
wait_for
parameter. In Prefect 2.x,
wait_for
is used to specify upstream dependencies between tasks, not directly on coroutine objects. Here's how you can refactor your code to correctly use `wait_for`: 1. Use
wait_for
in Task Definitions
: When defining tasks, you can specify other tasks that must complete before the current task runs by using the
wait_for
parameter. 2. Avoid Using
wait_for
on Coroutines
: Ensure you are not trying to call
wait_for
on coroutine objects directly. Instead, use it to set task dependencies. Here's a refactored version of your code:
Copy code
python
from prefect import flow, task, get_run_logger

@task
async def kill_ports(source):
    # Implementation of kill_ports
    pass

@task
async def remove_env(env):
    # Implementation of remove_env
    pass

@flow(
    flow_run_name="remove_environment_" + time.strftime("%Y-%m-%d"),
    log_prints=True,
    on_failure=[ccl_handling.hook],
)
async def remove_environment(env: str, email_recipients=None):
    logger = get_run_logger()
    current_time = time.ctime()

    <http://logger.info|logger.info>(f"Now running process: {remove_environment.name}")
    <http://logger.info|logger.info>(f"Current time: {current_time!s}")
    <http://logger.info|logger.info>(f"User: {os.getlogin()}")

    ports_killed = await kill_ports.submit(source=env)

    # Use wait_for in task definition, not on coroutine
    result = await remove_env.submit(env=env, wait_for=[ports_killed])

    return result
For more details on task dependencies and the
wait_for
parameter, you can check the Prefect 2.x documentation on specifying upstream dependencies.
n
@Marvin show an example of introducing a state dep with submit's wait_for
ope beat me to it
m
Here's an example of using the
wait_for
parameter in Prefect 2.x to introduce state dependencies between tasks. This allows you to control the execution order of tasks that do not share data dependencies:
Copy code
python
from prefect import flow, task

@task
def task_a():
    return "Task A completed"

@task
def task_b():
    return "Task B completed"

@task
def task_c():
    return "Task C completed"

@task
def task_d():
    return "Task D completed"

@flow
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    # Task C will wait for Task A and Task B to complete
    c = task_c.submit(wait_for=[a, b])
    # Task D will wait for Task C to complete
    d = task_d.submit(wait_for=[c])

my_flow()
In this example: -
task_c
will only start after both
task_a
and
task_b
have completed. -
task_d
will start only after
task_c
has completed. This pattern helps enforce execution order without explicitly passing data between the tasks. For more details, you can refer to the Prefect 2.x documentation on task runners.
c
@Nate oh yes omg my bad haha
youre right
Thanks a lot 😅
n
catjam