Tom Han
10/06/2025, 10:12 PM15:11:10.914 | ERROR | Flow run 'carrot-ibex' - Encountered exception during execution: AttributeError("'Flow' object has no attribute 'submit'")
Traceback (most recent call last):
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1357, in run_context
yield self
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1419, in run_flow_async
await engine.call_flow_fn()
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1371, in call_flow_fn
result = await call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/private/var/folders/lv/kt38h52n3xs70337hx5s_xlm0000gp/T/tmpe6b0loczprefect/github-repository-hcm-core-repo/hcm_core/flows/orchestration/sync_videos.py", line 93, in main_sync_orchestrator_flow
added_to_database_future = ingest_new_videos.submit(vids=result, concurrency_lim=12)
^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'Flow' object has no attribute 'submit'
15:11:10.950 | ERROR | Flow run 'carrot-ibex' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'Flow' object has no attribute 'submit'")
15:11:10.950 | ERROR | prefect.engine - Execution of flow run '722f87ff-49cc-4270-a4a2-73e027122b4a' exited with unexpected exception
Traceback (most recent call last):
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/engine.py", line 57, in handle_engine_signals
yield
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/engine.py", line 122, in <module>
run_coro_as_sync(run_flow(flow, flow_run=flow_run, error_logger=run_logger))
~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 207, in run_coro_as_sync
return call.result()
~~~~~~~~~~~^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 365, in result
return self.future.result(timeout=timeout)
~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 192, in result
return self.__get_result()
~~~~~~~~~~~~~~~~~^^
File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python3.13/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 441, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 188, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1421, in run_flow_async
return engine.state if return_type == "state" else await engine.result()
^^^^^^^^^^^^^^^^^^^^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 934, in result
raise self._raised
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1357, in run_context
yield self
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1419, in run_flow_async
await engine.call_flow_fn()
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1371, in call_flow_fn
result = await call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/private/var/folders/lv/kt38h52n3xs70337hx5s_xlm0000gp/T/tmpe6b0loczprefect/github-repository-hcm-core-repo/hcm_core/flows/orchestration/sync_videos.py", line 93, in main_sync_orchestrator_flow
added_to_database_future = ingest_new_videos.submit(vids=result, concurrency_lim=12)
^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'Flow' object has no attribute 'submit'
15:11:11.297 | ERROR | prefect.flow_runs.runner - Process for flow run 'carrot-ibex' exited with status code: 1
Shouldn't flow object have .submit?Marvin
10/06/2025, 10:12 PMTom Han
10/06/2025, 10:13 PMMarvin
10/06/2025, 10:17 PM.submit() method. Only Tasks have .submit().
What to do instead depends on what ingest_new_videos is:
- If ingest_new_videos is a flow (@flow):
- Call it like a normal function for a synchronous subflow:
ingest_new_videos(vids=result, concurrency_lim=12)
- If you need to run multiple subflows concurrently, trigger them as separate flow runs via a deployment and wait on them:
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
flow_runs = [
run_deployment(
name="ingest-new-videos/<your-deployment-name>",
parameters={"vids": batch, "concurrency_lim": 12},
as_subflow=True, # keeps the run nested under the parent
)
for batch in batches
]
# Optionally wait for completion
for fr in flow_runs:
wait_for_flow_run(fr.id)
- If you want in-process concurrency, wrap the subflow call in a task and submit the task:
from prefect import task
@task
def run_ingest_subflow(params):
return ingest_new_videos(**params)
futures = [run_ingest_subflow.submit({"vids": b, "concurrency_lim": 12}) for b in batches]
results = [f.result() for f in futures]
- If ingest_new_videos is a task (@task):
- Then .submit() is correct; ensure it’s decorated with @task, not @flow.
Relevant docs:
- Task runners (how concurrency works in 3.x): Task runners
- Running deployments programmatically (for concurrent subflows across workers): Run deployments
- Global concurrency limits (if you need orchestration-level limits): Global concurrency limits
If you can share whether ingest_new_videos is decorated with @flow or @task, I can suggest the exact snippet to fix your line.Tom Han
10/06/2025, 10:20 PMTom Han
10/06/2025, 10:22 PMMarvin
10/06/2025, 10:22 PMMarvin
10/06/2025, 10:23 PMingest_new_videos must stay a flow. A thin wrapper task lets you use .submit() with your task runner to fan out calls concurrently.
Tips if you do wrap:
- Keep the wrapper thin and avoid extra behavior:
from prefect import task
@task(retries=0, persist_result=False) # avoid double retries/persistence
def run_ingest(params):
return ingest_new_videos(**params) # ingest_new_videos is a @flow
- Size your task runner appropriately for your intended parallelism.
- Let the subflow manage its own retries and caching; don’t duplicate them on the wrapper.
Alternatives (often cleaner):
- If it doesn’t need to be a flow, make ingest_new_videos a @task and use .submit() directly.
- If you want concurrent subflows across infrastructure, trigger deployments instead of wrapping, and optionally wait:
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
runs = [
run_deployment(
name="ingest-new-videos/<deployment-name>",
parameters={"vids": b, "concurrency_lim": 12},
as_subflow=True, # keep lineage under the parent
)
for b in batches
]
for fr in runs:
wait_for_flow_run(fr.id)
Refs:
- Task runners and concurrency: Task runners
- Running deployments programmatically: Run deployments
- Global concurrency limits (if relevant): Global concurrency limits
If you can share whether you want in‑process parallelism or distributed parallelism across workers, I can suggest the most direct pattern for your case.