Tom Han
08/29/2025, 1:26 AMflowrun: FlowRun = run_deployment(
name = "name",
parameters={
...
},
as_subflow=True
)
Marvin
08/29/2025, 1:26 AMMarvin
08/29/2025, 1:29 AMrun_deployment
returns a FlowRun
object; to get the actual return value of the flow, wait for the run to finish and then resolve the state’s result.
Here’s a pattern that works reliably in Prefect 3.x:
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
# Trigger the deployment as a subflow
flow_run = run_deployment(
name="name",
parameters={...},
as_subflow=True,
)
# Ensure it reaches a terminal state
flow_run = wait_for_flow_run(flow_run.id)
# Retrieve the returned value from the child flow (e.g., your list)
result = get_state_result(flow_run.state) # raises if the run failed
Notes:
- get_state_result
will raise on failure by default; you can pass raise_on_failure=False
if you prefer to handle it yourself.
- Make sure the returned object is serializable with your configured result serializer.
If you control the target flow and don’t need deployment isolation, an even simpler option is to call the flow directly as a subflow and get its return value immediately:
# Assuming you can import the flow function
returned_list = child_flow(**params) # runs as a subflow and returns the value
Refs:
- Run deployments (Prefect 3)
- Source for state result utilities: prefect.statesTom Han
08/29/2025, 1:34 AMawait run_deployment
? But run_deployment is syncMarvin
08/29/2025, 1:34 AMMarvin
08/29/2025, 1:35 AMrun_deployment
(and wait_for_flow_run
) are sync-compatible:
- In sync code, call them normally.
- In async code (e.g., inside async def
flow), you must await
them. Otherwise you get a coroutine and accessing .id
fails.
Examples:
- Async parent flow:
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
async def parent():
fr = await run_deployment(name="name", parameters={...}, as_subflow=True)
fr = await wait_for_flow_run(fr.id)
value = get_state_result(fr.state) # raises if failed
return value
- Sync parent flow:
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
def parent():
fr = run_deployment(name="name", parameters={...}, as_subflow=True)
fr = wait_for_flow_run(fr.id)
value = get_state_result(fr.state) # raises if failed
return value
Notes:
- get_state_result
will raise on failure by default; set raise_on_failure=False
to handle errors yourself.
- Ensure the child flow’s return value is serializable with your result serializer.
Refs:
- Run deployments (Prefect 3)
- State result helpersTom Han
08/29/2025, 1:36 AMMarvin
08/29/2025, 1:36 AMMarvin
08/29/2025, 1:39 AMimport asyncio
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
async def orchestrate():
# Launch multiple deployments concurrently
runs = await asyncio.gather(
run_deployment(name="proj/dep-a", parameters={...}, as_subflow=True),
run_deployment(name="proj/dep-b", parameters={...}, as_subflow=True),
)
# Wait for completion concurrently
finished = await asyncio.gather(*(wait_for_flow_run(r.id) for r in runs))
# Collect return values
results = [get_state_result(fr.state) for fr in finished]
return results
- Sync flow with CPU-bound concurrency (use processes)
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def heavy_compute(x: int) -> int:
# CPU-bound work
return sum(i * i for i in range(10_000_000))
@flow(task_runner=ProcessPoolTaskRunner())
def cpu_flow():
futs = [heavy_compute.submit(i) for i in range(4)]
return [f.result() for f in futs]
- Sync flow with I/O-bound concurrency (use threads)
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def fetch(url: str) -> str:
# blocking I/O (requests, boto3, etc.)
...
@flow(task_runner=ThreadPoolTaskRunner())
def io_flow(urls: list[str]):
futs = [fetch.submit(u) for u in urls]
return [f.result() for f in futs]
Gotchas
- In async flows, you must await async-capable helpers like run_deployment(...)
and wait_for_flow_run(...)
. Forgetting to await yields a coroutine object (the error you saw).
- Avoid long blocking CPU or blocking I/O directly in async flows; offload to tasks and pick a thread/process runner.
- If you only need to call other deployments and handle results, async flows are great for high fan-out/fan-in orchestration.
Docs and references
- Run deployments: Run deployments
- Concurrency patterns and task runners: Run work concurrently
- Flow concepts: Flows conceptTom Han
08/29/2025, 1:40 AMresult = task()
and when do I do fut = task.submit(args)
, then result = task.result
when I needed? @MarvinMarvin
08/29/2025, 1:40 AMMarvin
08/29/2025, 1:42 AMresult = task(...)
when:
- You need the task’s output immediately (sequential logic).
- You’re not trying to overlap this task with other work.
- You want simpler control flow — exceptions raise right away.
Use fut = task.submit(...)
(then later result = fut.result()
) when:
- You want concurrency/fan-out: fire off many tasks at once, then fan-in later.
- You want to overlap this task with other work before blocking for its result.
- You want to pass futures to other tasks as upstream dependencies (Prefect will wait on them automatically).
Key points:
- .submit(...)
returns a PrefectFuture
. Get the value with fut.result()
(it’s a method, not a property).
- If you .submit(...)
and immediately call .result()
, that’s effectively the same as calling task(...)
— so prefer task(...)
for clarity in that case.
- In async flows, you can still use .submit(...)
and call fut.result()
when you need the value. Use a thread/process task runner to avoid blocking the event loop with heavy work.
Patterns
- Sequential (simple and immediate)
@flow
def pipeline():
a = task_a(...)
b = task_b(a) # runs after task_a completes
return b
- Concurrent fan-out/fan-in
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner())
def pipeline(urls):
futs = [download.submit(u) for u in urls] # launch all
# do other work here if needed...
results = [f.result() for f in futs] # fan-in
return results
- Passing futures as upstream dependencies (Prefect handles waiting)
@flow
def pipeline():
f1 = task_a.submit(...)
f2 = task_b.submit(f1) # will wait for f1 automatically
return f2.result()
Docs:
- Run work concurrently: Concurrency patterns in PrefectTom Han
08/29/2025, 1:45 AM<http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {flowrun.id}")
flow_run = wait_for_flow_run(flowrun.id)
result = get_state_result(flow_run.state)
<http://logger.info|logger.info>(f"Robocopy flow completed with state: {flow_run.state}")
<http://logger.info|logger.info>(f"Result: {result}")
I assume flow_run
would be a finished FlowRun, I changed my flow to be sync (without the async) function signature but I got:
AttributeError: 'coroutine' object has no attribute 'state'
Marvin
08/29/2025, 1:45 AMMarvin
08/29/2025, 1:46 AMrun_deployment
and wait_for_flow_run
are “sync-compatible”:
- In sync code, call them normally.
- In async code, you must await
them. If you don’t, you’ll get a coroutine and accessing .state
or .id
will fail.
Use one of these patterns:
Sync parent flow
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
def parent():
logger = get_run_logger()
fr = run_deployment(name="proj/dep", parameters={...}, as_subflow=True)
<http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {fr.id}")
fr = wait_for_flow_run(fr.id) # blocking in sync code
result = get_state_result(fr.state)
<http://logger.info|logger.info>(f"Robocopy flow completed with state: {fr.state}")
<http://logger.info|logger.info>(f"Result: {result}")
return result
Async parent flow
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
async def parent():
logger = get_run_logger()
fr = await run_deployment(name="proj/dep", parameters={...}, as_subflow=True)
<http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {fr.id}")
fr = await wait_for_flow_run(fr.id) # must await in async code
result = get_state_result(fr.state)
<http://logger.info|logger.info>(f"Robocopy flow completed with state: {fr.state}")
<http://logger.info|logger.info>(f"Result: {result}")
return result
If you’ve removed async
from your flow but still see a coroutine, something in the call stack is likely still async (e.g., running inside an async task or an async entrypoint like FastAPI). If you can share your flow signature and where this code runs from, I can confirm which variant you need.
Refs:
- Run deploymentsTom Han
08/29/2025, 1:48 AM18:42:33.957 | INFO | Flow run 'upsilon691-sirrah' - Subflow triggered successfully. Flow run ID: 1a1b81ce-5e6a-45d7-baf7-6f57cffbfd0b
18:42:33.958 | ERROR | Flow run 'upsilon691-sirrah' - Encountered exception during execution: AttributeError("'coroutine' object has no attribute 'state'")
Traceback (most recent call last):
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 782, in run_context
yield self
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1397, in run_flow_sync
engine.call_flow_fn()
~~~~~~~~~~~~~~~~~~~^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 802, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
File "/private/var/folders/lv/kt38h52n3xs70337hx5s_xlm0000gp/T/tmp61xmt52qprefect/github-repository-hcm-core-repo/hcm_core/flows/orchestration/sync_videos.py", line 62, in main_sync_orchestrator_flow
result = get_state_result(flow_run.state)
^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'state'
18:42:33.984 | INFO | Flow run 'upsilon691-sirrah' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'coroutine' object has no attribute 'state'")
18:42:33.984 | ERROR | Flow run 'upsilon691-sirrah' - Engine execution exited with unexpected exception
Traceback (most recent call last):
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1554, in run_flow
ret_val = run_flow_sync(**kwargs)
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1399, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
~~~~~~~~~~~~~^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 361, in result
raise self._raised
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 782, in run_context
yield self
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 1397, in run_flow_sync
engine.call_flow_fn()
~~~~~~~~~~~~~~~~~~~^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 802, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
File "/private/var/folders/lv/kt38h52n3xs70337hx5s_xlm0000gp/T/tmp61xmt52qprefect/github-repository-hcm-core-repo/hcm_core/flows/orchestration/sync_videos.py", line 62, in main_sync_orchestrator_flow
result = get_state_result(flow_run.state)
^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'state'
I started this deployment run from the Web UI. I am using a self hosted prefect serverMarvin
08/29/2025, 1:49 AMMarvin
08/29/2025, 1:50 AMwait_for_flow_run(...)
is being treated as async in your execution context, so calling it without await
returns a coroutine (hence the 'coroutine' object has no attribute 'state'
when you try to use it).
Even though your flow function is now sync, Prefect may still be running it inside an event loop. For “sync-compatible” Prefect APIs, that means:
- In a pure sync context: call them normally
- If an event loop is active: you must either await them or explicitly run the coroutine as sync
You have two reliable options:
Option A — Make the parent flow async and await
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
async def main_orchestrator():
logger = get_run_logger()
fr = await run_deployment(name="proj/dep", parameters={...}, as_subflow=True)
<http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {fr.id}")
fr = await wait_for_flow_run(fr.id, log_states=True)
result = get_state_result(fr.state)
<http://logger.info|logger.info>(f"Robocopy flow completed with state: {fr.state}")
<http://logger.info|logger.info>(f"Result: {result}")
return result
Option B — Keep the flow sync and force-sync the coroutine
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from prefect.utilities.asyncutils import run_coro_as_sync
@flow
def main_sync_orchestrator_flow():
logger = get_run_logger()
fr = run_deployment(name="proj/dep", parameters={...}, as_subflow=True)
<http://logger.info|logger.info>(f"Subflow triggered successfully. Flow run ID: {fr.id}")
# Force the async call to run synchronously
fr = run_coro_as_sync(wait_for_flow_run(fr.id, log_states=True))
result = get_state_result(fr.state)
<http://logger.info|logger.info>(f"Robocopy flow completed with state: {fr.state}")
<http://logger.info|logger.info>(f"Result: {result}")
return result
Notes:
- You already confirmed run_deployment(...)
returned a FlowRun
(you logged fr.id
), so the coroutine is coming from wait_for_flow_run(...)
.
- If you flip to an async flow, remember you must await
both run_deployment(...)
and wait_for_flow_run(...)
.
- You can pass log_states=True
to wait_for_flow_run
to see the state transitions while waiting.
Refs:
- Run deployments
- Source for wait_for_flow_run
(sync-compatible): prefect.flow_runs
- Utilities for bridging async/sync: prefect.utilities.asyncutilsTom Han
08/29/2025, 2:00 AMFile "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/_internal/compatibility/blocks.py", line 19, in call_explicitly_async_block_method
return await getattr(block, f"a{method}")(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/talmolab/hcm-core/.venv/lib/python3.13/site-packages/prefect/filesystems.py", line 306, in aread_path
raise ValueError(f"Path {path} does not exist.")
ValueError: Path /Users/talmolab/.prefect/storage/C:\Users\TalmoLab\.prefect\storage\86f613f2c8a74a0799150c6ff98ee65a does not exist.
18:56:49.613 | ERROR | prefect.flow_runs.runner - Process for flow run 'rho76-hubble-void' exited with status code: 1
What is happening is that I am triggering a deployment subflow that is running on a Windows Machine from my main device (a mac) and I am trying to pull the returned result to the main flow. It seems like Prefect is trying to find the storage location of the remote machine? Is there a way to let that flow know to send it's return value to my Prefect Server (not on cloud)?Marvin
08/29/2025, 2:00 AMMarvin
08/29/2025, 2:04 AMfrom prefect import flow
from prefect.states import get_state_result
# Example: S3 (requires `pip install 'prefect[aws]'`)
from prefect_aws.s3 import S3Bucket
# Load a configured S3Bucket block (create it in code or via UI beforehand)
s3 = S3Bucket.load("default-result-store")
@flow(result_storage=s3, persist_result=True)
def child_flow(...):
# return your list
return ["a", "b"]
Or with an SMB/NFS share via RemoteFileSystem:
from prefect import flow
from prefect.filesystems import RemoteFileSystem
fs = RemoteFileSystem(basepath="<smb://user:pass@server/share/prefect-results>")
@flow(result_storage=fs, persist_result=True)
def child_flow(...):
return ["a", "b"]
Redeploy the child so it uses this storage. Then your parent can keep using:
fr = await/ wait_for_flow_run(...)
result = get_state_result(fr.state)
and it will download from the shared store.
Option B: Configure a global default result store
Set these on BOTH the child’s execution environment and any environment that calls get_state_result (your mac):
- PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
- PREFECT_DEFAULT_RESULT_STORAGE_BLOCK=<block slug shown in the UI, e.g. s3-bucket/default-result-store>
This makes all flows/tasks persist to that shared store by default.
Notes
- Make sure credentials for the shared store are available on both machines.
- If your payloads are small and you need a quick workaround, you could emit an artifact in the child flow (e.g., a markdown or table) and read it back via API — but for programmatic, reliable data passing, shared result storage is the right approach.
- Your current error shows Prefect is joining your mac’s local base path with the Windows path in the storage key — that’s a telltale sign of using LocalFileSystem on different machines.
Docs
- Results and result storage in Prefect 3
If you share what storage you prefer (S3/GCS/Azure/SMB), I can give you a drop-in config snippet for that backend.Tom Han
08/29/2025, 2:16 AM<smb://username:password@addr/talmo/than/prefect-results>
but I got:
19:13:31.633 | ERROR | Task run 'run_robocopy_and_create_artifact-ade' - An error was encountered while committing transaction 'e66a2777f7ffa9f8bfbf9b2548af8a4b'
Traceback (most recent call last):
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\transactions.py", line 360, in commit
self.store.persist_result_record(
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\results.py", line 823, in persist_result_record
return self._persist_result_record(
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 347, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 207, in run_coro_as_sync
return call.result()
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 365, in result
return self.future.result(timeout=timeout)
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 192, in result
return self.__get_result()
File "C:\Users\TalmoLab\miniconda3\lib\concurrent\futures\_base.py", line 403, in __get_result
raise self._exception
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 441, in _run_async
result = await coro
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 188, in coroutine_wrapper
return await task
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 341, in ctx_call
result = await async_fn(*args, **kwargs)
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\results.py", line 803, in _persist_result_record
await call_explicitly_async_block_method(
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\_internal\compatibility\blocks.py", line 21, in call_explicitly_async_block_method
return await getattr(block, method).aio(block, *args, **kwargs)
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\filesystems.py", line 518, in write_path
self.filesystem.makedirs(dirpath, exist_ok=True)
File "D:\tom\hcm-windows\.venv\lib\site-packages\prefect\filesystems.py", line 530, in filesystem
self._filesystem = fsspec.filesystem(scheme, **self.settings)
File "D:\tom\hcm-windows\.venv\lib\site-packages\fsspec\registry.py", line 322, in filesystem
return cls(**storage_options)
File "D:\tom\hcm-windows\.venv\lib\site-packages\fsspec\spec.py", line 81, in __call__
obj = super().__call__(*args, **kwargs)
TypeError: SMBFileSystem.__init__() missing 1 required positional argument: 'host'
Should I put stuff in the Settings json when I create that block? @MarvinMarvin
08/29/2025, 2:16 AMMarvin
08/29/2025, 2:17 AMfrom prefect import flow
from prefect.filesystems import SMB
smb_store = SMB.load("my-smb-results")
@flow(result_storage=smb_store, persist_result=True)
def child_flow(...):
return ["a", "b", "c"]
- Redeploy the child flow.
- Ensure your parent environment can also access SMB (see “Dependencies” below). Now get_state_result will fetch from the SMB share on both machines.
Option 2 — Keep RemoteFileSystem, but supply Settings
- In the Prefect UI for your RemoteFileSystem block:
- Basepath: smb://addr/talmo/than/prefect-results
- Settings (JSON), use fsspec’s SMBFileSystem args:
{
"host": "addr",
"username": "username",
"password": "your-password",
"port": 445
}
- Remove creds from the URL; keep them in Settings for security.
- Redeploy and retry.
Dependencies (both machines)
- Install the SMB fsspec driver in any environment that will write/read results (your Windows worker and your macOS parent):
pip install smbprotocol smbfs
- Verify connectivity/permissions from both sides.
Optional: make it the default result store
- Set on both environments where results are produced/consumed:
- PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
- PREFECT_DEFAULT_RESULT_STORAGE_BLOCK=smb/<your-block-slug>
- Or keep using per-flow result_storage as shown above.
Why this fixes your issue
- Prefect Server only stores a pointer to the result. If the child writes to a local path on Windows, your Mac cannot read it. By using a shared SMB result store (and providing host/creds at FS creation), both machines can read the same persisted result, so get_state_result works.
Docs
- Results and result storage
- Blocks concept
If you share your exact SMB share naming (server, share name, path), I can sanity-check the “Share path” field for the SMB block.Tom Han
08/29/2025, 3:08 AMFile "D:\tom\hcm-windows\.venv\lib\site-packages\smbclient\_os.py", line 277, in mkdir
raw = SMBDirectoryIO(path, mode="x", **kwargs)
File "D:\tom\hcm-windows\.venv\lib\site-packages\smbclient\_io.py", line 362, in __init__
tree, fd_path = get_smb_tree(path, **kwargs)
File "D:\tom\hcm-windows\.venv\lib\site-packages\smbclient\_pool.py", line 305, in get_smb_tree
session = register_session(
File "D:\tom\hcm-windows\.venv\lib\site-packages\smbclient\_pool.py", line 409, in register_session
connection.connect(timeout=connection_timeout)
File "D:\tom\hcm-windows\.venv\lib\site-packages\smbprotocol\connection.py", line 869, in connect
self.transport.connect()
File "D:\tom\hcm-windows\.venv\lib\site-packages\smbprotocol\transport.py", line 68, in connect
raise ValueError(f"Failed to connect to '{self.server}:{self.port}': {err}") from err
ValueError: Failed to connect to 'pool1.vast.salk.edusmb::445': [Errno 11001] getaddrinfo failed
Marvin
08/29/2025, 3:08 AMMarvin
08/29/2025, 3:09 AMfrom prefect.filesystems import SMB
smb_store = SMB.load("my-smb-results")
@flow(result_storage=smb_store, persist_result=True)
def child_flow(...):
return ["a", "b"]
- Redeploy the child and re-run. Your parent can then call get_state_result(fr.state)
from macOS and Prefect will read from SMB.
Alternative: RemoteFileSystem block
If you prefer RemoteFileSystem, ensure the fields are consistent:
- Basepath: smb://pool1.vast.salk.edu/than/prefect-results
- Include scheme and host here.
- Settings JSON: include only credentials and port, not the host or scheme.
{
"username": "DOMAIN\\username",
"password": "your-password",
"port": 445
}
If you include “host” in Settings and also put the host in Basepath, the underlying driver may compose an invalid target.
Quick sanity check from the worker machine
Try a direct fsspec test to validate the parameters outside of Prefect:
- If you want to pass host in constructor:
import fsspec
fs = fsspec.filesystem(
"smb",
host="<http://pool1.vast.salk.edu|pool1.vast.salk.edu>",
username="DOMAIN\\username",
password="your-password",
port=445,
)
fs.makedirs("than/prefect-results/prefect-test", exist_ok=True)
print(<http://fs.ls|fs.ls>("than/prefect-results"))
- If you prefer using the full URL without host in constructor:
import fsspec
fs = fsspec.filesystem("smb", username="DOMAIN\\username", password="your-password", port=445)
<http://fs.ls|fs.ls>("<smb://pool1.vast.salk.edu/than/prefect-results>")
Tips
- Do not include “smb://” in host fields. Only use it in a URL-style basepath.
- Make sure the first segment of “share_path” really is the SMB share name (“than” in my examples). If your share is actually “talmo”, use “talmo/than/prefect-results” only if that’s truly the share + subfolder.
- On Windows, domain creds often need “DOMAIN\\username”.
- Ensure both machines have the SMB dependencies installed. A common set is:
pip install smbprotocol smbfs
If you can paste your current block configuration (redact username/password), I’ll validate the exact values.