<@ULVA73B9P> ```15:11:10.914 | ERROR | Flow run ...
# ask-marvin
t
@Marvin
Copy code
15:11:10.914 | ERROR   | Flow run 'carrot-ibex' - Encountered exception during execution: AttributeError("'Flow' object has no attribute 'submit'")
Traceback (most recent call last):
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1357, in run_context
    yield self
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1419, in run_flow_async
    await engine.call_flow_fn()
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1371, in call_flow_fn
    result = await call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/var/folders/lv/kt38h52n3xs70337hx5s_xlm0000gp/T/tmpe6b0loczprefect/github-repository-hcm-core-repo/hcm_core/flows/orchestration/sync_videos.py", line 93, in main_sync_orchestrator_flow
    added_to_database_future = ingest_new_videos.submit(vids=result, concurrency_lim=12)
                               ^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'Flow' object has no attribute 'submit'
15:11:10.950 | ERROR   | Flow run 'carrot-ibex' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'Flow' object has no attribute 'submit'")
15:11:10.950 | ERROR   | prefect.engine - Execution of flow run '722f87ff-49cc-4270-a4a2-73e027122b4a' exited with unexpected exception
Traceback (most recent call last):
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/engine.py", line 57, in handle_engine_signals
    yield
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/engine.py", line 122, in <module>
    run_coro_as_sync(run_flow(flow, flow_run=flow_run, error_logger=run_logger))
    ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 207, in run_coro_as_sync
    return call.result()
           ~~~~~~~~~~~^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 365, in result
    return self.future.result(timeout=timeout)
           ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 192, in result
    return self.__get_result()
           ~~~~~~~~~~~~~~~~~^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python3.13/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 441, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 188, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1421, in run_flow_async
    return engine.state if return_type == "state" else await engine.result()
                                                       ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 934, in result
    raise self._raised
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1357, in run_context
    yield self
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1419, in run_flow_async
    await engine.call_flow_fn()
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1371, in call_flow_fn
    result = await call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/var/folders/lv/kt38h52n3xs70337hx5s_xlm0000gp/T/tmpe6b0loczprefect/github-repository-hcm-core-repo/hcm_core/flows/orchestration/sync_videos.py", line 93, in main_sync_orchestrator_flow
    added_to_database_future = ingest_new_videos.submit(vids=result, concurrency_lim=12)
                               ^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'Flow' object has no attribute 'submit'
15:11:11.297 | ERROR   | prefect.flow_runs.runner - Process for flow run 'carrot-ibex' exited with status code: 1
Shouldn't flow object have .submit?
m
thought for 288.2 seconds
t
okay i realized task have submit and flows doesn't...
m
Short answer: No — in Prefect 3.x a Flow does not have a
.submit()
method. Only Tasks have
.submit()
. What to do instead depends on what
ingest_new_videos
is: - If
ingest_new_videos
is a flow (@flow): - Call it like a normal function for a synchronous subflow:
Copy code
ingest_new_videos(vids=result, concurrency_lim=12)
- If you need to run multiple subflows concurrently, trigger them as separate flow runs via a deployment and wait on them:
Copy code
from prefect.deployments import run_deployment
    from prefect.flow_runs import wait_for_flow_run

    flow_runs = [
        run_deployment(
            name="ingest-new-videos/<your-deployment-name>",
            parameters={"vids": batch, "concurrency_lim": 12},
            as_subflow=True,  # keeps the run nested under the parent
        )
        for batch in batches
    ]

    # Optionally wait for completion
    for fr in flow_runs:
        wait_for_flow_run(fr.id)
- If you want in-process concurrency, wrap the subflow call in a task and submit the task:
Copy code
from prefect import task

    @task
    def run_ingest_subflow(params):
        return ingest_new_videos(**params)

    futures = [run_ingest_subflow.submit({"vids": b, "concurrency_lim": 12}) for b in batches]
    results = [f.result() for f in futures]
- If
ingest_new_videos
is a task (@task): - Then
.submit()
is correct; ensure it’s decorated with
@task
, not
@flow
. Relevant docs: - Task runners (how concurrency works in 3.x): Task runners - Running deployments programmatically (for concurrent subflows across workers): Run deployments - Global concurrency limits (if you need orchestration-level limits): Global concurrency limits If you can share whether
ingest_new_videos
is decorated with
@flow
or
@task
, I can suggest the exact snippet to fix your line.
t
Can I just change my flows into tasks?
@Marvin is there any downside on wrapping a flow in a task?
m
thought for 31.1 seconds
Yes — there are a few trade‑offs to wrapping a flow call inside a task in Prefect 3.x. Potential downsides: - Double orchestration and noisy UI: You’ll see both a task run and a nested subflow run for the same unit of work, which can add clutter and make lineage/logs harder to follow. - Retry/caching semantics can surprise you: Retries or caching on the wrapper task apply to the entire subflow call. If the subflow already has its own retries, you can end up retrying the whole subflow multiple times. - Concurrency slot usage: The wrapper task occupies a worker thread/process while it waits for the subflow to complete. If you launch many of these, you need enough task runner capacity or you can bottleneck parallelism. - Cancellation/propagation nuances: Cancelling the parent flow cancels the wrapper task; the subflow will usually receive cancellation too, but you’re adding another layer where behavior depends on your exception handling. - Result persistence and size: If the wrapper task returns a large subflow result and you persist task results, you may store more data than intended at the task level. When wrapping can make sense: - You want simple in‑process concurrency and
ingest_new_videos
must stay a flow. A thin wrapper task lets you use
.submit()
with your task runner to fan out calls concurrently. Tips if you do wrap: - Keep the wrapper thin and avoid extra behavior:
Copy code
from prefect import task

  @task(retries=0, persist_result=False)  # avoid double retries/persistence
  def run_ingest(params):
      return ingest_new_videos(**params)  # ingest_new_videos is a @flow
- Size your task runner appropriately for your intended parallelism. - Let the subflow manage its own retries and caching; don’t duplicate them on the wrapper. Alternatives (often cleaner): - If it doesn’t need to be a flow, make
ingest_new_videos
a
@task
and use
.submit()
directly. - If you want concurrent subflows across infrastructure, trigger deployments instead of wrapping, and optionally wait:
Copy code
from prefect.deployments import run_deployment
  from prefect.flow_runs import wait_for_flow_run

  runs = [
      run_deployment(
          name="ingest-new-videos/<deployment-name>",
          parameters={"vids": b, "concurrency_lim": 12},
          as_subflow=True,  # keep lineage under the parent
      )
      for b in batches
  ]
  for fr in runs:
      wait_for_flow_run(fr.id)
Refs: - Task runners and concurrency: Task runners - Running deployments programmatically: Run deployments - Global concurrency limits (if relevant): Global concurrency limits If you can share whether you want in‑process parallelism or distributed parallelism across workers, I can suggest the most direct pattern for your case.