<@ULVA73B9P> Is there a way to catch the return va...
# ask-marvin
t
@Marvin Is there a way to catch the return value of a deployment run? My flow returns a list that is needed for the next step:
Copy code
flowrun: FlowRun = run_deployment(
        name = "name",
        parameters={
            ...
        },
        as_subflow=True
    )
m
thought for 177.8 seconds
Yes.
run_deployment
returns a
FlowRun
object; to get the actual return value of the flow, wait for the run to finish and then resolve the state’s result. Here’s a pattern that works reliably in Prefect 3.x:
Copy code
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

# Trigger the deployment as a subflow
flow_run = run_deployment(
    name="name",
    parameters={...},
    as_subflow=True,
)

# Ensure it reaches a terminal state
flow_run = wait_for_flow_run(flow_run.id)

# Retrieve the returned value from the child flow (e.g., your list)
result = get_state_result(flow_run.state)  # raises if the run failed
Notes: -
get_state_result
will raise on failure by default; you can pass
raise_on_failure=False
if you prefer to handle it yourself. - Make sure the returned object is serializable with your configured result serializer. If you control the target flow and don’t need deployment isolation, an even simpler option is to call the flow directly as a subflow and get its return value immediately:
Copy code
# Assuming you can import the flow function
returned_list = child_flow(**params)  # runs as a subflow and returns the value
Refs: - Run deployments (Prefect 3) - Source for state result utilities: prefect.states
t
@Marvin I got a AttributeError: 'coroutine' object has no attribute 'id'. Should I do
await run_deployment
? But run_deployment is sync
m
thought for 45.4 seconds
You’re seeing that because you’re in an async context.
run_deployment
(and
wait_for_flow_run
) are sync-compatible: - In sync code, call them normally. - In async code (e.g., inside
async def
flow), you must
await
them. Otherwise you get a coroutine and accessing
.id
fails. Examples: - Async parent flow:
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
async def parent():
    fr = await run_deployment(name="name", parameters={...}, as_subflow=True)
    fr = await wait_for_flow_run(fr.id)
    value = get_state_result(fr.state)  # raises if failed
    return value
- Sync parent flow:
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
def parent():
    fr = run_deployment(name="name", parameters={...}, as_subflow=True)
    fr = wait_for_flow_run(fr.id)
    value = get_state_result(fr.state)  # raises if failed
    return value
Notes: -
get_state_result
will raise on failure by default; set
raise_on_failure=False
to handle errors yourself. - Ensure the child flow’s return value is serializable with your result serializer. Refs: - Run deployments (Prefect 3) - State result helpers
t
@Marvin when should I use an async flow and when to use sync flows?
m
thought for 174.3 seconds
Great question. Short answer: pick the style that matches your workload and ecosystem, then layer in concurrency appropriately. Use an async flow when: - You’re using async-first libraries (aiohttp, asyncpg, aioboto3, etc.) and want to await them without blocking. - You want to orchestrate many I/O-bound actions concurrently in a single process (e.g., launching several deployments and waiting on them with asyncio.gather). - You’re already in an async environment (e.g., a FastAPI endpoint). Use a sync flow when: - Your code and libraries are synchronous (most common). - You have CPU-bound work — pair with a process-based task runner to bypass the GIL. - You want the simplest mental model; you can still get concurrency via task runners and task.submit. Patterns - Async flow orchestrating multiple deployments concurrently
Copy code
import asyncio
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
async def orchestrate():
    # Launch multiple deployments concurrently
    runs = await asyncio.gather(
        run_deployment(name="proj/dep-a", parameters={...}, as_subflow=True),
        run_deployment(name="proj/dep-b", parameters={...}, as_subflow=True),
    )

    # Wait for completion concurrently
    finished = await asyncio.gather(*(wait_for_flow_run(r.id) for r in runs))

    # Collect return values
    results = [get_state_result(fr.state) for fr in finished]
    return results
- Sync flow with CPU-bound concurrency (use processes)
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def heavy_compute(x: int) -> int:
    # CPU-bound work
    return sum(i * i for i in range(10_000_000))

@flow(task_runner=ProcessPoolTaskRunner())
def cpu_flow():
    futs = [heavy_compute.submit(i) for i in range(4)]
    return [f.result() for f in futs]
- Sync flow with I/O-bound concurrency (use threads)
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def fetch(url: str) -> str:
    # blocking I/O (requests, boto3, etc.)
    ...

@flow(task_runner=ThreadPoolTaskRunner())
def io_flow(urls: list[str]):
    futs = [fetch.submit(u) for u in urls]
    return [f.result() for f in futs]
Gotchas - In async flows, you must await async-capable helpers like
run_deployment(...)
and
wait_for_flow_run(...)
. Forgetting to await yields a coroutine object (the error you saw). - Avoid long blocking CPU or blocking I/O directly in async flows; offload to tasks and pick a thread/process runner. - If you only need to call other deployments and handle results, async flows are great for high fan-out/fan-in orchestration. Docs and references - Run deployments: Run deployments - Concurrency patterns and task runners: Run work concurrently - Flow concepts: Flows concept
t
so when do i just to
result = task()
and when do I do
fut = task.submit(args)
, then
result = task.result
when I needed? @Marvin
m
thought for 87.5 seconds
Great question — it boils down to whether you need concurrency or not. Use
result = task(...)
when: - You need the task’s output immediately (sequential logic). - You’re not trying to overlap this task with other work. - You want simpler control flow — exceptions raise right away. Use
fut = task.submit(...)
(then later
result = fut.result()
) when: - You want concurrency/fan-out: fire off many tasks at once, then fan-in later. - You want to overlap this task with other work before blocking for its result. - You want to pass futures to other tasks as upstream dependencies (Prefect will wait on them automatically). Key points: -
.submit(...)
returns a
PrefectFuture
. Get the value with
fut.result()
(it’s a method, not a property). - If you
.submit(...)
and immediately call
.result()
, that’s effectively the same as calling
task(...)
— so prefer
task(...)
for clarity in that case. - In async flows, you can still use
.submit(...)
and call
fut.result()
when you need the value. Use a thread/process task runner to avoid blocking the event loop with heavy work. Patterns - Sequential (simple and immediate)
Copy code
@flow
def pipeline():
    a = task_a(...)
    b = task_b(a)  # runs after task_a completes
    return b
- Concurrent fan-out/fan-in
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner())
def pipeline(urls):
    futs = [download.submit(u) for u in urls]  # launch all
    # do other work here if needed...
    results = [f.result() for f in futs]       # fan-in
    return results
- Passing futures as upstream dependencies (Prefect handles waiting)
Copy code
@flow
def pipeline():
    f1 = task_a.submit(...)
    f2 = task_b.submit(f1)  # will wait for f1 automatically
    return f2.result()
Docs: - Run work concurrently: Concurrency patterns in Prefect
t
@Marvin So i did
Copy code
<http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {flowrun.id}")

    flow_run = wait_for_flow_run(flowrun.id)

    result = get_state_result(flow_run.state)

    <http://logger.info|logger.info>(f"Robocopy flow completed with state: {flow_run.state}")
    <http://logger.info|logger.info>(f"Result: {result}")
I assume
flow_run
would be a finished FlowRun, I changed my flow to be sync (without the async) function signature but I got:
Copy code
AttributeError: 'coroutine' object has no attribute 'state'
m
thought for 64.6 seconds
That error means you’re still calling an async function without awaiting it. In Prefect 3.x, both
run_deployment
and
wait_for_flow_run
are “sync-compatible”: - In sync code, call them normally. - In async code, you must
await
them. If you don’t, you’ll get a coroutine and accessing
.state
or
.id
will fail. Use one of these patterns: Sync parent flow
Copy code
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
def parent():
    logger = get_run_logger()
    fr = run_deployment(name="proj/dep", parameters={...}, as_subflow=True)
    <http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {fr.id}")

    fr = wait_for_flow_run(fr.id)  # blocking in sync code
    result = get_state_result(fr.state)

    <http://logger.info|logger.info>(f"Robocopy flow completed with state: {fr.state}")
    <http://logger.info|logger.info>(f"Result: {result}")
    return result
Async parent flow
Copy code
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
async def parent():
    logger = get_run_logger()
    fr = await run_deployment(name="proj/dep", parameters={...}, as_subflow=True)
    <http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {fr.id}")

    fr = await wait_for_flow_run(fr.id)  # must await in async code
    result = get_state_result(fr.state)

    <http://logger.info|logger.info>(f"Robocopy flow completed with state: {fr.state}")
    <http://logger.info|logger.info>(f"Result: {result}")
    return result
If you’ve removed
async
from your flow but still see a coroutine, something in the call stack is likely still async (e.g., running inside an async task or an async entrypoint like FastAPI). If you can share your flow signature and where this code runs from, I can confirm which variant you need. Refs: - Run deployments
t
@Marvin
Copy code
18:42:33.957 | INFO    | Flow run 'upsilon691-sirrah' - Subflow triggered successfully. Flow run ID: 1a1b81ce-5e6a-45d7-baf7-6f57cffbfd0b
18:42:33.958 | ERROR   | Flow run 'upsilon691-sirrah' - Encountered exception during execution: AttributeError("'coroutine' object has no attribute 'state'")
Traceback (most recent call last):
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 782, in run_context
    yield self
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1397, in run_flow_sync
    engine.call_flow_fn()
    ~~~~~~~~~~~~~~~~~~~^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 802, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
  File "/private/var/folders/lv/kt38h52n3xs70337hx5s_xlm0000gp/T/tmp61xmt52qprefect/github-repository-hcm-core-repo/hcm_core/flows/orchestration/sync_videos.py", line 62, in main_sync_orchestrator_flow
    result = get_state_result(flow_run.state)
                              ^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'state'
18:42:33.984 | INFO    | Flow run 'upsilon691-sirrah' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'coroutine' object has no attribute 'state'")
18:42:33.984 | ERROR   | Flow run 'upsilon691-sirrah' - Engine execution exited with unexpected exception
Traceback (most recent call last):
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1554, in run_flow
    ret_val = run_flow_sync(**kwargs)
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1399, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ~~~~~~~~~~~~~^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 361, in result
    raise self._raised
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 782, in run_context
    yield self
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1397, in run_flow_sync
    engine.call_flow_fn()
    ~~~~~~~~~~~~~~~~~~~^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 802, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
  File "/private/var/folders/lv/kt38h52n3xs70337hx5s_xlm0000gp/T/tmp61xmt52qprefect/github-repository-hcm-core-repo/hcm_core/flows/orchestration/sync_videos.py", line 62, in main_sync_orchestrator_flow
    result = get_state_result(flow_run.state)
                              ^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'state'
I started this deployment run from the Web UI. I am using a self hosted prefect server
m
thought for 118.8 seconds
Thanks for the stack trace — that confirms
wait_for_flow_run(...)
is being treated as async in your execution context, so calling it without
await
returns a coroutine (hence the
'coroutine' object has no attribute 'state'
when you try to use it). Even though your flow function is now sync, Prefect may still be running it inside an event loop. For “sync-compatible” Prefect APIs, that means: - In a pure sync context: call them normally - If an event loop is active: you must either await them or explicitly run the coroutine as sync You have two reliable options: Option A — Make the parent flow async and await
Copy code
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
async def main_orchestrator():
    logger = get_run_logger()
    fr = await run_deployment(name="proj/dep", parameters={...}, as_subflow=True)
    <http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {fr.id}")

    fr = await wait_for_flow_run(fr.id, log_states=True)
    result = get_state_result(fr.state)

    <http://logger.info|logger.info>(f"Robocopy flow completed with state: {fr.state}")
    <http://logger.info|logger.info>(f"Result: {result}")
    return result
Option B — Keep the flow sync and force-sync the coroutine
Copy code
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from prefect.utilities.asyncutils import run_coro_as_sync

@flow
def main_sync_orchestrator_flow():
    logger = get_run_logger()
    fr = run_deployment(name="proj/dep", parameters={...}, as_subflow=True)
    <http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {fr.id}")

    # Force the async call to run synchronously
    fr = run_coro_as_sync(wait_for_flow_run(fr.id, log_states=True))
    result = get_state_result(fr.state)

    <http://logger.info|logger.info>(f"Robocopy flow completed with state: {fr.state}")
    <http://logger.info|logger.info>(f"Result: {result}")
    return result
Notes: - You already confirmed
run_deployment(...)
returned a
FlowRun
(you logged
fr.id
), so the coroutine is coming from
wait_for_flow_run(...)
. - If you flip to an async flow, remember you must
await
both
run_deployment(...)
and
wait_for_flow_run(...)
. - You can pass
log_states=True
to
wait_for_flow_run
to see the state transitions while waiting. Refs: - Run deployments - Source for
wait_for_flow_run
(sync-compatible): prefect.flow_runs - Utilities for bridging async/sync: prefect.utilities.asyncutils
t
@Marvin I just did that, and it crashed:
Copy code
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/_internal/compatibility/blocks.py", line 19, in call_explicitly_async_block_method
    return await getattr(block, f"a{method}")(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/filesystems.py", line 306, in aread_path
    raise ValueError(f"Path {path} does not exist.")
ValueError: Path /Users/talmolab/.prefect/storage/C:\Users\TalmoLab\.prefect\storage\86f613f2c8a74a0799150c6ff98ee65a does not exist.
18:56:49.613 | ERROR   | prefect.flow_runs.runner - Process for flow run 'rho76-hubble-void' exited with status code: 1
What is happening is that I am triggering a deployment subflow that is running on a Windows Machine from my main device (a mac) and I am trying to pull the returned result to the main flow. It seems like Prefect is trying to find the storage location of the remote machine? Is there a way to let that flow know to send it's return value to my Prefect Server (not on cloud)?
m
thought for 246.0 seconds
You nailed the diagnosis: your child flow (running on Windows) persisted its return value to its local default result store (LocalFileSystem under C:\Users\TalmoLab\.prefect\storage). When your parent (running on macOS) calls get_state_result, Prefect tries to read that same file path locally — which doesn’t exist — hence the mixed-path error. Important: Prefect Server does not store your flow results. The server stores only metadata and a pointer to the result’s location. To fetch results across machines, both machines must read from a shared result store. How to fix it Pick a shared result store and make the child flow persist to it. Common options: - Cloud object storage: S3, GCS, Azure Blob (recommended) - Network share: SMB/NFS reachable from both Windows and macOS - Any RemoteFileSystem URL that both sides can access Option A: Set result storage on the child flow
Copy code
from prefect import flow
from prefect.states import get_state_result

# Example: S3 (requires `pip install 'prefect[aws]'`)
from prefect_aws.s3 import S3Bucket

# Load a configured S3Bucket block (create it in code or via UI beforehand)
s3 = S3Bucket.load("default-result-store")

@flow(result_storage=s3, persist_result=True)
def child_flow(...):
    # return your list
    return ["a", "b"]
Or with an SMB/NFS share via RemoteFileSystem:
Copy code
from prefect import flow
from prefect.filesystems import RemoteFileSystem

fs = RemoteFileSystem(basepath="<smb://user:pass@server/share/prefect-results>")

@flow(result_storage=fs, persist_result=True)
def child_flow(...):
    return ["a", "b"]
Redeploy the child so it uses this storage. Then your parent can keep using:
Copy code
fr = await/ wait_for_flow_run(...)
result = get_state_result(fr.state)
and it will download from the shared store. Option B: Configure a global default result store Set these on BOTH the child’s execution environment and any environment that calls get_state_result (your mac): - PREFECT_RESULTS_PERSIST_BY_DEFAULT=true - PREFECT_DEFAULT_RESULT_STORAGE_BLOCK=<block slug shown in the UI, e.g. s3-bucket/default-result-store> This makes all flows/tasks persist to that shared store by default. Notes - Make sure credentials for the shared store are available on both machines. - If your payloads are small and you need a quick workaround, you could emit an artifact in the child flow (e.g., a markdown or table) and read it back via API — but for programmatic, reliable data passing, shared result storage is the right approach. - Your current error shows Prefect is joining your mac’s local base path with the Windows path in the storage key — that’s a telltale sign of using LocalFileSystem on different machines. Docs - Results and result storage in Prefect 3 If you share what storage you prefer (S3/GCS/Azure/SMB), I can give you a drop-in config snippet for that backend.
t
I created a remote storage block on the UI with basename being
<smb://username:password@addr/talmo/than/prefect-results>
but I got:
Copy code
19:13:31.633 | ERROR   | Task run 'run_robocopy_and_create_artifact-ade' - An error was encountered while committing transaction 'e66a2777f7ffa9f8bfbf9b2548af8a4b'
Traceback (most recent call last):
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\transactions.py", line 360, in commit
    self.store.persist_result_record(
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\results.py", line 823, in persist_result_record
    return self._persist_result_record(
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 347, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 207, in run_coro_as_sync
    return call.result()
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 365, in result
    return self.future.result(timeout=timeout)
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 192, in result
    return self.__get_result()
  File "C:\Users\TalmoLab\miniconda3\lib\concurrent\futures\_base.py", line 403, in __get_result
    raise self._exception
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 441, in _run_async
    result = await coro
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 188, in coroutine_wrapper
    return await task
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 341, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\results.py", line 803, in _persist_result_record
    await call_explicitly_async_block_method(
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\_internal\compatibility\blocks.py", line 21, in call_explicitly_async_block_method
    return await getattr(block, method).aio(block, *args, **kwargs)
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\filesystems.py", line 518, in write_path
    self.filesystem.makedirs(dirpath, exist_ok=True)
  File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\filesystems.py", line 530, in filesystem
    self._filesystem = fsspec.filesystem(scheme, **self.settings)
  File "D:\tom\hcm-windows\.venv\lib\site-packages\fsspec\registry.py", line 322, in filesystem
    return cls(**storage_options)
  File "D:\tom\hcm-windows\.venv\lib\site-packages\fsspec\spec.py", line 81, in __call__
    obj = super().__call__(*args, **kwargs)
TypeError: SMBFileSystem.__init__() missing 1 required positional argument: 'host'
Should I put stuff in the Settings json when I create that block? @Marvin
m
thought for 42.6 seconds
You’re hitting an fsspec init error: the SMB filesystem driver needs its connection parameters at construction time. With a RemoteFileSystem block, Prefect calls fsspec.filesystem("smb", **settings) first, then uses your basepath for paths. Since your Settings were empty, fsspec didn’t get a host (or creds), so it failed with SMBFileSystem.__init__() missing 'host'. You’ve got two good options: Option 1 — Use the dedicated SMB block (recommended) - In the Prefect UI, create a block of type “SMB” (not RemoteFileSystem). - Fill: - Host: addr - Username: username - Password: your password (Secret) - Share path: talmo/than/prefect-results (the share and path on the server; don’t include smb:// or host) - Port: 445 (optional) - In your flow code, load and use it for results:
Copy code
from prefect import flow
from prefect.filesystems import SMB

smb_store = SMB.load("my-smb-results")

@flow(result_storage=smb_store, persist_result=True)
def child_flow(...):
    return ["a", "b", "c"]
- Redeploy the child flow. - Ensure your parent environment can also access SMB (see “Dependencies” below). Now get_state_result will fetch from the SMB share on both machines. Option 2 — Keep RemoteFileSystem, but supply Settings - In the Prefect UI for your RemoteFileSystem block: - Basepath: smb://addr/talmo/than/prefect-results - Settings (JSON), use fsspec’s SMBFileSystem args:
Copy code
{
  "host": "addr",
  "username": "username",
  "password": "your-password",
  "port": 445
}
- Remove creds from the URL; keep them in Settings for security. - Redeploy and retry. Dependencies (both machines) - Install the SMB fsspec driver in any environment that will write/read results (your Windows worker and your macOS parent):
Copy code
pip install smbprotocol smbfs
- Verify connectivity/permissions from both sides. Optional: make it the default result store - Set on both environments where results are produced/consumed: - PREFECT_RESULTS_PERSIST_BY_DEFAULT=true - PREFECT_DEFAULT_RESULT_STORAGE_BLOCK=smb/<your-block-slug> - Or keep using per-flow result_storage as shown above. Why this fixes your issue - Prefect Server only stores a pointer to the result. If the child writes to a local path on Windows, your Mac cannot read it. By using a shared SMB result store (and providing host/creds at FS creation), both machines can read the same persisted result, so get_state_result works. Docs - Results and result storage - Blocks concept If you share your exact SMB share naming (server, share name, path), I can sanity-check the “Share path” field for the SMB block.
t
@Marvin I have this weird issue: I configured my SMB with either SMB or RemoteStorage, but I got this everything: I double checked and I am sure my hostname ends with .edu and somewhere in this process "smb:" got appended:
Copy code
File "D:\tom\hcm-windows\.venv\lib\site-packages\smbclient\_os.py", line 277, in mkdir
    raw = SMBDirectoryIO(path, mode="x", **kwargs)
  File "D:\tom\hcm-windows\.venv\lib\site-packages\smbclient\_io.py", line 362, in __init__
    tree, fd_path = get_smb_tree(path, **kwargs)
  File "D:\tom\hcm-windows\.venv\lib\site-packages\smbclient\_pool.py", line 305, in get_smb_tree
    session = register_session(
  File "D:\tom\hcm-windows\.venv\lib\site-packages\smbclient\_pool.py", line 409, in register_session
    connection.connect(timeout=connection_timeout)
  File "D:\tom\hcm-windows\.venv\lib\site-packages\smbprotocol\connection.py", line 869, in connect
    self.transport.connect()
  File "D:\tom\hcm-windows\.venv\lib\site-packages\smbprotocol\transport.py", line 68, in connect
    raise ValueError(f"Failed to connect to '{self.server}:{self.port}': {err}") from err
ValueError: Failed to connect to 'pool1.vast.salk.edusmb::445': [Errno 11001] getaddrinfo failed
m
thought for 64.6 seconds
You’re right to suspect a host parsing issue. The error shows Prefect/fsspec handed smbclient a host value of pool1.vast.salk.edusmb::445 which looks like your hostname ended with “.edu” and then “smb:” and “:445” got appended, yielding “edusmb::445”. This typically happens when: - The host was entered with a scheme (e.g., “smb://pool1.vast.salk.edu”) in a field that expects just a hostname, or - Both the basepath and the settings specify host/scheme, causing fsspec to compose a malformed target, or - There’s a stray suffix or whitespace in the host field. To make this robust, pick one approach and stick to it. I recommend the dedicated SMB block. Recommended: SMB block (no schemes anywhere) - In the Prefect UI, create a block of type “SMB” (not RemoteFileSystem). - Fill the fields exactly like this: - SMB host: pool1.vast.salk.edu - No “smb://”, no slashes, no port, no extra text. - SMB port: 445 - SMB username: your user (if needed, DOMAIN\\username) - SMB password: your password (stored as a Secret) - Share path: than/prefect-results - This should be the UNC path part after the host. For UNC \\pool1.vast.salk.edu\than\prefect-results, use “than/prefect-results”. - In your child flow, use:
Copy code
from prefect.filesystems import SMB

smb_store = SMB.load("my-smb-results")

@flow(result_storage=smb_store, persist_result=True)
def child_flow(...):
    return ["a", "b"]
- Redeploy the child and re-run. Your parent can then call
get_state_result(fr.state)
from macOS and Prefect will read from SMB. Alternative: RemoteFileSystem block If you prefer RemoteFileSystem, ensure the fields are consistent: - Basepath: smb://pool1.vast.salk.edu/than/prefect-results - Include scheme and host here. - Settings JSON: include only credentials and port, not the host or scheme.
Copy code
{
  "username": "DOMAIN\\username",
  "password": "your-password",
  "port": 445
}
If you include “host” in Settings and also put the host in Basepath, the underlying driver may compose an invalid target. Quick sanity check from the worker machine Try a direct fsspec test to validate the parameters outside of Prefect: - If you want to pass host in constructor:
Copy code
import fsspec
fs = fsspec.filesystem(
    "smb",
    host="<http://pool1.vast.salk.edu|pool1.vast.salk.edu>",
    username="DOMAIN\\username",
    password="your-password",
    port=445,
)
fs.makedirs("than/prefect-results/prefect-test", exist_ok=True)
print(<http://fs.ls|fs.ls>("than/prefect-results"))
- If you prefer using the full URL without host in constructor:
Copy code
import fsspec
fs = fsspec.filesystem("smb", username="DOMAIN\\username", password="your-password", port=445)
<http://fs.ls|fs.ls>("<smb://pool1.vast.salk.edu/than/prefect-results>")
Tips - Do not include “smb://” in host fields. Only use it in a URL-style basepath. - Make sure the first segment of “share_path” really is the SMB share name (“than” in my examples). If your share is actually “talmo”, use “talmo/than/prefect-results” only if that’s truly the share + subfolder. - On Windows, domain creds often need “DOMAIN\\username”. - Ensure both machines have the SMB dependencies installed. A common set is:
Copy code
pip install smbprotocol smbfs
If you can paste your current block configuration (redact username/password), I’ll validate the exact values.