<@ULVA73B9P> This task used to work fine until one...
# ask-marvin
j
@Marvin This task used to work fine until one of the last releases. WHat could go wrong?
Copy code
@task
def deploy_subflows() -> None:

    flow.from_source(
        source=GitRepository(
            url="<https://github.com/myorg/prefect.git>",
            branch="main",
            credentials={"access_token": Secret.load("github-access-token")},
        ),
        entrypoint="entrypoint/main.py:my_async_flow",
    ).deploy(
        name="my-deployment-name",
        work_pool_name="workpool_name",
        image="url_to_my_image:latest",
        build=False,
    )
I get this error:
Copy code
TypeError: object RunnerDeployment can't be used in 'await' expression
m
The error you're encountering,
TypeError: object RunnerDeployment can't be used in 'await' expression
, seems to be related to changes in how Prefect handles deployments. The
RunnerDeployment
object is not designed to be used with an 'await' expression, indicating a possible mismatch in the expected asynchronous behavior. Here are a few things to consider: 1. Asynchronous vs Synchronous: Ensure that your flow and task definitions align with the expected asynchronous or synchronous behavior. The error suggests that the code might be trying to use an asynchronous feature incorrectly. 2. Prefect Version: Verify that your Prefect version is compatible with the syntax and methods you're using. Recent updates might have changed how certain features are implemented or expected to be used. 3. Deployment Method: The method
flow.from_source()
and its usage with
GitRepository
might have undergone changes. Ensure that the method is still valid and correctly implemented in the latest Prefect version. 4. Documentation: Review the Prefect documentation for any changes in deployment methods, especially related to
RunnerDeployment
and
flow.from_source()
. Here is a link to the Prefect documentation that discusses Git-based storage and deployment. 5. Code Example: Typically, deployments are prepared and served using functions like
to_deployment()
and
serve()
. Ensure that your code aligns with these practices. If these steps don't resolve the issue, consider reviewing the Prefect changelogs or release notes for any breaking changes that might have affected your code.
j
@Marvin That doesn't help much. What I really want is to run the deployment within a flow.
m
To run a deployment within a flow in Prefect 3.x, you should utilize the
flow.deploy()
method. This method allows you to create and manage deployments from within your flow code, providing a more integrated approach to handling subflows or deployments. Here's a concise guide on how you can achieve this: 1. Create a Deployment: Use the
flow.deploy()
method to create a deployment. This method replaces older methods like
Deployment.build_from_flow
and integrates with Prefect's new worker model. 2. Trigger the Deployment: Once the deployment is created, you can trigger it from within another flow or task. This could be done by calling the deployment's API endpoint or using Prefect's orchestration features. 3. Example Usage:
Copy code
python
   from prefect import flow, task

   @flow
   def parent_flow():
       # Deploy a subflow
       my_deployment = my_subflow.deploy(name="my-deployment")
       # You can trigger the deployment here if needed

   @flow
   def my_subflow():
       # Define your subflow tasks
       pass
4. Documentation Reference: For more detailed steps and options, refer to the Prefect deployment documentation. This approach allows you to manage subflows and their deployments directly within your flow code, aligning with Prefect's infrastructure and orchestration capabilities. If you encounter specific issues, ensure that your Prefect version supports these features and check the documentation for any additional configurations required.
j
@Marvin Because I am running parallel subflow in a push workpool, show me an example using GitRepository to fetch the code
m
To use
GitRepository
in Prefect 3.x for fetching code from a repository, especially when running parallel subflows in a push work pool, follow the example below: ### Example Setup 1. Create a GitRepository Block: This block will handle fetching your flow code from a Git repository. 2. Use the GitRepository in your Flow: Reference this block in your flow deployment to fetch the code. Here is a basic example:
Copy code
python
from prefect import flow, task
from prefect.filesystems import GitRepository
from prefect.blocks.system import Secret

# Define a GitRepository block
git_repo = GitRepository(
    url="<https://github.com/myorg/prefect.git>",
    branch="main",
    credentials={"access_token": Secret.load("github-access-token")}
)

@flow
def parent_flow():
    # Deploy a subflow using the GitRepository block
    my_deployment = my_subflow.deploy(
        name="my-deployment",
        work_pool_name="my-workpool",
        source=git_repo
    )
    # Optionally, trigger the deployment here

@flow
def my_subflow():
    # Define your subflow tasks
    pass

if __name__ == "__main__":
    parent_flow()
### Key Points: - GitRepository Block: This block is configured with your repository URL, branch, and credentials. It allows Prefect to fetch the flow code from the specified Git repository. - Credentials: Use a
Secret
block to securely manage your Git credentials, especially for private repositories. - Deployment: Use the
deploy()
method within your flow to handle the deployment process, specifying the
GitRepository
as the source. For more details, refer to the Prefect documentation on Git-based storage. This provides additional insights on setting up and using Git repositories within Prefect flows.
n
hi @Javier SG can you share the whole stack trace here?
j
Thanks @Nate! I changed to the CLI to make the deployments, so it is not blocking right now. Let me know if this helps:
Copy code
09:00:47.336 | ERROR   | Flow run 'dramatic-ant' - Finished in state Failed("Flow run encountered an exception: TypeError: object RunnerDeployment can't be used in 'await' expression")
Traceback (most recent call last):
  File "/workspaces/prefect/tmp.py", line 28, in <module>
    my_flow()
  File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 1635, in __call__
    return run_flow(
           ^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1520, in run_flow
    ret_val = run_flow_sync(**kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1365, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 340, in result
    raise self._raised
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 755, in run_context
    yield self
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1363, in run_flow_sync
    engine.call_flow_fn()
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 775, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/workspaces/prefect/clean-assistants/tmp.py", line 25, in my_flow
    deploy_subflows()
  File "/usr/local/lib/python3.11/site-packages/prefect/tasks.py", line 1037, in __call__
    return run_task(
           ^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1576, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1389, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 482, in result
    raise self._raised
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 805, in run_context
    yield self
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1387, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 828, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/workspaces/prefect/clean-assistants/tmp.py", line 16, in deploy_subflows
    ).deploy(
      ^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 353, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 209, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 330, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 193, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 403, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 190, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 343, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 1479, in deploy
    deployment = await to_deployment_coro
                 ^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object RunnerDeployment can't be used in 'await' expression
v
Hi @Nate I've encountered the same issue in the code which worked fine before. Do you have any insights on it? My Prefect version is 3.1.15.
n
hi @Vitaly Saversky it’s hard to say without seeing your code but in general the above error is some sync/async issue, ie something is being awaited that should not be
v
Hi @Nate Thank you for your quick response. The code which cause the issue. The error is pointing to
deploy
command.
Copy code
flow.from_source(
        source=GitRepository(
        url=CODE_GITHUB_URL,
        branch=CODE_BRANCH_NAME,
        credentials={"access_token": Secret.load("my_secret")}),
        entrypoint=f'{module_name()}:trigger_flow'
    ).deploy(
        name=deployment_name(),
        work_pool_name='k8',
        tags=tags,
        version=deployment_version,
        parameters=parameters
    )
Running code in K8. The image is from different releases. Working setup
Copy code
Version:             3.1.11
API version:         0.8.4
Python version:      3.11.11
Git commit:          e448bd34
Built:               Thu, Jan 2, 2025 1:11 PM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         server
Pydantic version:    2.10.6
Integrations:
  prefect-github:    0.3.1
  prefect-shell:     0.3.1
  prefect-dbt:       0.6.3
  prefect-redis:     0.2.2
Not working
Copy code
Version:             3.1.15
API version:         0.8.4
Python version:      3.11.11
Git commit:          3ac3d548
Built:               Thu, Jan 30, 2025 11:31 AM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         server
Pydantic version:    2.10.6
Integrations:
  prefect-github:    0.3.1
  prefect-dbt:       0.6.6
  prefect-shell:     0.3.1
  prefect-redis:     0.2.2
I tried to play with
prefect-dbt==0.6.3
, but it didn't help. I don't use any async functions or commands (await) in the code. Thanks.
n
hmm do you happen to have your actual stack trace? or is it effectively the same as the one above
v
Hi @Nate My stack
Copy code
Engine execution exited with unexpected exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1520, in run_flow
    ret_val = run_flow_sync(**kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1365, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 340, in result
    raise self._raised
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 755, in run_context
    yield self
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1363, in run_flow_sync
    engine.call_flow_fn()
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 775, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/prefect/dts-prefect-flow-develop/flow_migration_tool.py", line 499, in trigger_flow
    ).deploy(
      ^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 353, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 209, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 330, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 193, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 403, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 190, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 343, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 1479, in deploy
    deployment = await to_deployment_coro
                 ^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object RunnerDeployment can't be used in 'await' expression
a
I'm facing the same issue now with the latest release of v3.2.11
v
The issue exists in 3.2.7 version as well.
flow.from_source(...).deploy(...)
works in Prefect 3.1.11 version.
1
n
sorry about the delay here, PR to fix here https://github.com/PrefectHQ/prefect/pull/17436
🙏 1
👍 1
a
Hey, question for you @Abhishek Singh and @Vitaly Saversky to satisfy my curiosity. What's your use case for calling
.deploy
in a task or flow? Is it part of your larger deployment process?
a
In my case, I’m creating a flow that functions similarly to Liquibase, where code is added within the flow to deploy it exclusively to the X environment. The “Project Deployment” flow runs on X Prefect instances and deploys the code only to the labels specified in the code.
thank you 1
So i dont have to deploy the project in dev, uat, demo and prods separately.
v
Hi @alex Our users don't have Python skills to create Prefect flows and we don't want them to do this job. They run a master flow which does this job for them. User flows are pretty similar to each other with variation of parameters which driving the logic. Also, it helps standardize all flows.
a
Gotcha! And what sort of work are the created flows doing?
v
Flows do data transformations on different platforms Trino, SQL Server, dbt. The "master" tool creates flow deployments based on scripts and dbt projects located in GitHub repository.
gratitude thank you 1
Thank you, @Nate and the Prefect team! Tested the issue and it's resolved in version 3.2.12.
catjam 1