Javier SG
02/04/2025, 10:30 AM@task
def deploy_subflows() -> None:
flow.from_source(
source=GitRepository(
url="<https://github.com/myorg/prefect.git>",
branch="main",
credentials={"access_token": Secret.load("github-access-token")},
),
entrypoint="entrypoint/main.py:my_async_flow",
).deploy(
name="my-deployment-name",
work_pool_name="workpool_name",
image="url_to_my_image:latest",
build=False,
)
I get this error:
TypeError: object RunnerDeployment can't be used in 'await' expression
Marvin
02/04/2025, 10:30 AMTypeError: object RunnerDeployment can't be used in 'await' expression
, seems to be related to changes in how Prefect handles deployments. The RunnerDeployment
object is not designed to be used with an 'await' expression, indicating a possible mismatch in the expected asynchronous behavior.
Here are a few things to consider:
1. Asynchronous vs Synchronous: Ensure that your flow and task definitions align with the expected asynchronous or synchronous behavior. The error suggests that the code might be trying to use an asynchronous feature incorrectly.
2. Prefect Version: Verify that your Prefect version is compatible with the syntax and methods you're using. Recent updates might have changed how certain features are implemented or expected to be used.
3. Deployment Method: The method flow.from_source()
and its usage with GitRepository
might have undergone changes. Ensure that the method is still valid and correctly implemented in the latest Prefect version.
4. Documentation: Review the Prefect documentation for any changes in deployment methods, especially related to RunnerDeployment
and flow.from_source()
. Here is a link to the Prefect documentation that discusses Git-based storage and deployment.
5. Code Example: Typically, deployments are prepared and served using functions like to_deployment()
and serve()
. Ensure that your code aligns with these practices.
If these steps don't resolve the issue, consider reviewing the Prefect changelogs or release notes for any breaking changes that might have affected your code.Javier SG
02/04/2025, 11:06 AMMarvin
02/04/2025, 11:06 AMflow.deploy()
method. This method allows you to create and manage deployments from within your flow code, providing a more integrated approach to handling subflows or deployments.
Here's a concise guide on how you can achieve this:
1. Create a Deployment: Use the flow.deploy()
method to create a deployment. This method replaces older methods like Deployment.build_from_flow
and integrates with Prefect's new worker model.
2. Trigger the Deployment: Once the deployment is created, you can trigger it from within another flow or task. This could be done by calling the deployment's API endpoint or using Prefect's orchestration features.
3. Example Usage:
python
from prefect import flow, task
@flow
def parent_flow():
# Deploy a subflow
my_deployment = my_subflow.deploy(name="my-deployment")
# You can trigger the deployment here if needed
@flow
def my_subflow():
# Define your subflow tasks
pass
4. Documentation Reference: For more detailed steps and options, refer to the Prefect deployment documentation.
This approach allows you to manage subflows and their deployments directly within your flow code, aligning with Prefect's infrastructure and orchestration capabilities. If you encounter specific issues, ensure that your Prefect version supports these features and check the documentation for any additional configurations required.Javier SG
02/04/2025, 11:08 AMMarvin
02/04/2025, 11:09 AMGitRepository
in Prefect 3.x for fetching code from a repository, especially when running parallel subflows in a push work pool, follow the example below:
### Example Setup
1. Create a GitRepository Block: This block will handle fetching your flow code from a Git repository.
2. Use the GitRepository in your Flow: Reference this block in your flow deployment to fetch the code.
Here is a basic example:
python
from prefect import flow, task
from prefect.filesystems import GitRepository
from prefect.blocks.system import Secret
# Define a GitRepository block
git_repo = GitRepository(
url="<https://github.com/myorg/prefect.git>",
branch="main",
credentials={"access_token": Secret.load("github-access-token")}
)
@flow
def parent_flow():
# Deploy a subflow using the GitRepository block
my_deployment = my_subflow.deploy(
name="my-deployment",
work_pool_name="my-workpool",
source=git_repo
)
# Optionally, trigger the deployment here
@flow
def my_subflow():
# Define your subflow tasks
pass
if __name__ == "__main__":
parent_flow()
### Key Points:
- GitRepository Block: This block is configured with your repository URL, branch, and credentials. It allows Prefect to fetch the flow code from the specified Git repository.
- Credentials: Use a Secret
block to securely manage your Git credentials, especially for private repositories.
- Deployment: Use the deploy()
method within your flow to handle the deployment process, specifying the GitRepository
as the source.
For more details, refer to the Prefect documentation on Git-based storage. This provides additional insights on setting up and using Git repositories within Prefect flows.Nate
02/04/2025, 3:52 PMJavier SG
02/05/2025, 9:03 AM09:00:47.336 | ERROR | Flow run 'dramatic-ant' - Finished in state Failed("Flow run encountered an exception: TypeError: object RunnerDeployment can't be used in 'await' expression")
Traceback (most recent call last):
File "/workspaces/prefect/tmp.py", line 28, in <module>
my_flow()
File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 1635, in __call__
return run_flow(
^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1520, in run_flow
ret_val = run_flow_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1365, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 340, in result
raise self._raised
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 755, in run_context
yield self
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1363, in run_flow_sync
engine.call_flow_fn()
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 775, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/workspaces/prefect/clean-assistants/tmp.py", line 25, in my_flow
deploy_subflows()
File "/usr/local/lib/python3.11/site-packages/prefect/tasks.py", line 1037, in __call__
return run_task(
^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1576, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1389, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 482, in result
raise self._raised
File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 805, in run_context
yield self
File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1387, in run_task_sync
engine.call_task_fn(txn)
File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 828, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/workspaces/prefect/clean-assistants/tmp.py", line 16, in deploy_subflows
).deploy(
^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 353, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 209, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 330, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 193, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 403, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 190, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 343, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 1479, in deploy
deployment = await to_deployment_coro
^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object RunnerDeployment can't be used in 'await' expression
Vitaly Saversky
02/16/2025, 6:28 PMNate
02/16/2025, 6:30 PMVitaly Saversky
02/16/2025, 7:56 PMdeploy
command.
flow.from_source(
source=GitRepository(
url=CODE_GITHUB_URL,
branch=CODE_BRANCH_NAME,
credentials={"access_token": Secret.load("my_secret")}),
entrypoint=f'{module_name()}:trigger_flow'
).deploy(
name=deployment_name(),
work_pool_name='k8',
tags=tags,
version=deployment_version,
parameters=parameters
)
Running code in K8. The image is from different releases.
Working setup
Version: 3.1.11
API version: 0.8.4
Python version: 3.11.11
Git commit: e448bd34
Built: Thu, Jan 2, 2025 1:11 PM
OS/Arch: linux/x86_64
Profile: ephemeral
Server type: server
Pydantic version: 2.10.6
Integrations:
prefect-github: 0.3.1
prefect-shell: 0.3.1
prefect-dbt: 0.6.3
prefect-redis: 0.2.2
Not working
Version: 3.1.15
API version: 0.8.4
Python version: 3.11.11
Git commit: 3ac3d548
Built: Thu, Jan 30, 2025 11:31 AM
OS/Arch: linux/x86_64
Profile: ephemeral
Server type: server
Pydantic version: 2.10.6
Integrations:
prefect-github: 0.3.1
prefect-dbt: 0.6.6
prefect-shell: 0.3.1
prefect-redis: 0.2.2
I tried to play with prefect-dbt==0.6.3
, but it didn't help.
I don't use any async functions or commands (await) in the code.
Thanks.Nate
02/16/2025, 7:57 PMVitaly Saversky
02/16/2025, 7:58 PMEngine execution exited with unexpected exception
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1520, in run_flow
ret_val = run_flow_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1365, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 340, in result
raise self._raised
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 755, in run_context
yield self
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 1363, in run_flow_sync
engine.call_flow_fn()
File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 775, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/opt/prefect/dts-prefect-flow-develop/flow_migration_tool.py", line 499, in trigger_flow
).deploy(
^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 353, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 209, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 330, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 193, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 403, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 190, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 343, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 1479, in deploy
deployment = await to_deployment_coro
^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object RunnerDeployment can't be used in 'await' expression
Abhishek Singh
03/06/2025, 3:07 PMVitaly Saversky
03/07/2025, 6:26 PMflow.from_source(...).deploy(...)
works in Prefect 3.1.11 version.Nate
03/10/2025, 4:02 PMalex
03/10/2025, 4:18 PM.deploy
in a task or flow? Is it part of your larger deployment process?Abhishek Singh
03/10/2025, 4:20 PMAbhishek Singh
03/10/2025, 4:21 PMVitaly Saversky
03/10/2025, 4:39 PMalex
03/10/2025, 4:56 PMVitaly Saversky
03/10/2025, 5:01 PMVitaly Saversky
03/13/2025, 2:36 AM