Demid
07/28/2025, 4:39 PMMarvin
07/28/2025, 4:39 PMMarvin
07/28/2025, 4:40 PMpython
from prefect import flow
@flow
def subflow(x: int):
return x * 2
@flow
def main_flow():
# Direct call syntax
result = subflow(5)
return result
2. Using with Parameters
You can pass parameters to subflows just like regular functions:
python
from prefect import flow
@flow
def subflow(x: int, y: str):
return f"{y}: {x}"
@flow
def main_flow(value: int):
# Call with multiple parameters
result = subflow(value, "Result")
return result
The subflow will be automatically tracked as part of the parent flow's execution, and you'll see it in the Prefect UI as a nested flow run.
Key Points to Remember:
- Subflows run in the same process as the parent flow by default
- They appear as nested flows in the UI, making it easier to track execution
- They inherit the parent flow's context
- They can have their own task runners, retries, and other flow-specific configurations
Best Practices:
1. Use subflows to organize logical units of work
2. Use them to create reusable components that can be called from different parent flows
3. Consider using subflows when you want separate monitoring or retry logic for a specific part of your workflow
Note: While there are more advanced patterns possible with runners and deployments, for most use cases, the direct call syntax is the recommended approach as it's simple and maintainable.
If you have a specific use case in mind or need to explore more advanced patterns (like running subflows on different infrastructure), let me know and I can provide more targeted guidance.Demid
07/29/2025, 7:21 AM@flow called extract_data that's deployed to a specific work pool. How can I call it from another flow so that it runs via its deployment and goes to the correct work pool, instead of being executed directly like a normal function?Marvin
07/29/2025, 7:21 AMMarvin
07/29/2025, 7:23 AMbash
prefect deployment ls
Alternative approach: If you don't want to store the deployment ID in your code, you can also look up the deployment by name:
python
@flow
async def main_flow():
async with get_client() as client:
# Find the deployment by name
deployments = await client.read_deployments(
flows={"name": {"any_": ["extract_data"]}}, # Replace with your flow name
)
if not deployments:
raise ValueError("Deployment not found")
deployment = deployments[0] # Use the first matching deployment
# Create and wait for the flow run as before
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
parameters={"your": "parameters"},
)
# ... rest of the code as above
This approach is more flexible as it doesn't hardcode the deployment ID, but make sure your flow name is unique enough to find the correct deployment.
Would you like me to explain any part of this in more detail or help you with a specific implementation?Demid
07/29/2025, 7:29 AMmain_flow) is deployed to Work Pool A.
• It needs to call another flow (extract_data) which is deployed to Work Pool B.
So we have:
• 2 separate flows
• 2 separate deployments
• 2 different work pools
Currently, in main_flow, we do something like:
is_table = await tools.extract_data(file_name=file)
But this executes
extract_data as a regular function, not as a deployment, so it runs inside the same work pool as main_flow.
We’d like this to trigger the extract_data deployment, so that it runs via its own deployment and is picked up by Work Pool B, not executed locally or within Work Pool A.
Can you confirm the best way to do this? We'd like to keep the flows independent and still orchestrate them via deployments and correct work pools.Marvin
07/29/2025, 7:29 AMMarvin
07/29/2025, 7:30 AMawait tools.extract_data(...)) is executing the flow directly rather than triggering its deployment in Work Pool B.
Let me provide a solution specifically for your distributed work pool setup:
python
from prefect import flow
from prefect.client import get_client
from prefect.flow_runs import wait_for_flow_run
@flow
async def main_flow():
# This flow is deployed to Work Pool A
async with get_client() as client:
# Create a flow run that will be picked up by Work Pool B
flow_run = await client.create_flow_run_from_deployment(
deployment_id="extract-data-deployment-id", # The deployment ID for extract_data
parameters={
"file_name": file # Pass through the same parameters
}
)
# Wait for the extract_data flow to complete and get its result
flow_run_state = await wait_for_flow_run(flow_run.id)
if flow_run_state.is_completed():
# Get the result from the completed flow run
is_table = flow_run_state.result()
return is_table
else:
raise Exception(f"extract_data flow failed with state: {flow_run_state.type}")
This approach:
1. Properly triggers the extract_data deployment in Work Pool B
2. Maintains separation between the flows and their respective work pools
3. Still allows for orchestration and data passing between flows
4. Preserves the async nature of your flows
A few important points:
1. Error Handling: You might want to add more robust error handling:
python
@flow
async def main_flow():
async with get_client() as client:
try:
flow_run = await client.create_flow_run_from_deployment(
deployment_id="extract-data-deployment-id",
parameters={"file_name": file}
)
flow_run_state = await wait_for_flow_run(
flow_run.id,
timeout=1800 # Optional: set custom timeout (in seconds)
)
if flow_run_state.is_completed():
return flow_run_state.result()
else:
raise Exception(
f"extract_data flow failed with state: {flow_run_state.type}"
f"\nFlow run ID: {flow_run.id}"
)
except Exception as e:
# Handle any API or timeout errors
raise Exception(f"Failed to execute extract_data: {str(e)}")
2. Finding the Deployment: If you don't want to hardcode the deployment ID, you can look it up by name:
python
deployments = await client.read_deployments(
flows={"name": {"any_": ["extract_data"]}},
work_pools={"name": {"any_": ["Work Pool B"]}} # Ensure correct work pool
)
if not deployments:
raise ValueError("extract_data deployment not found in Work Pool B")
deployment = deployments[0]
3. Monitoring: You can get the flow run URL for monitoring:
python
flow_run = await client.create_flow_run_from_deployment(...)
print(f"Started extract_data flow run: {flow_run.id}")
# If you have PREFECT_UI_URL set in your environment:
print(f"Monitor at: {flow_run.ui_url}")
This setup ensures that:
- main_flow runs in Work Pool A
- When it needs to run extract_data, it creates a new flow run via the deployment
- That flow run is picked up by a worker in Work Pool B
- main_flow waits for the result before continuing
Would you like me to explain any part of this in more detail or help with implementation specifics?Demid
07/29/2025, 8:09 AMcreate_flow_run_from_deployment() inside a flow to call another flow deployed on a different work pool. It works fine, but in the Prefect UI, this "subflow" doesn't show up as a child run — it's completely separate.
Is there a way to link it back to the parent flow run so it appears as a subflow in the UI (like it would when using a @flow call directly)?
Any advice on how to preserve subflow visibility while using deployment-based cross-pool execution?Marvin
07/29/2025, 8:10 AMMarvin
07/29/2025, 8:11 AMpython
from prefect import flow
from prefect.client import get_client
from prefect.flow_runs import wait_for_flow_run
@flow
async def main_flow():
async with get_client() as client:
# Get the current flow run ID
current_flow_run = await client.get_current_flow_run()
# Create flow run with parent relationship
flow_run = await client.create_flow_run_from_deployment(
deployment_id="extract-data-deployment-id",
parameters={"file_name": file},
parent_flow_run_id=current_flow_run.id # This creates the parent-child link
)
flow_run_state = await wait_for_flow_run(flow_run.id)
if flow_run_state.is_completed():
return flow_run_state.result()
else:
raise Exception(f"extract_data flow failed with state: {flow_run_state.type}")
The key addition is the parent_flow_run_id parameter. By passing the current flow run's ID, you establish the parent-child relationship that will be reflected in the UI.
For better error handling and clarity, here's a more complete version:
python
from prefect import flow, get_run_logger
from prefect.client import get_client
from prefect.flow_runs import wait_for_flow_run
@flow
async def main_flow():
logger = get_run_logger()
async with get_client() as client:
try:
# Get current flow run
current_flow_run = await client.get_current_flow_run()
<http://logger.info|logger.info>("Creating extract_data flow run...")
# Create the child flow run
flow_run = await client.create_flow_run_from_deployment(
deployment_id="extract-data-deployment-id",
parameters={"file_name": file},
parent_flow_run_id=current_flow_run.id,
name="extract-data-subflow" # Optional: give it a descriptive name
)
<http://logger.info|logger.info>(
f"Started extract_data as subflow. "
f"Flow run ID: {flow_run.id}"
f"\nMonitor at: {flow_run.ui_url}"
)
# Wait for completion
flow_run_state = await wait_for_flow_run(
flow_run.id,
timeout=1800 # Optional: adjust timeout as needed
)
if flow_run_state.is_completed():
<http://logger.info|logger.info>("Subflow completed successfully")
return flow_run_state.result()
else:
raise Exception(
f"Subflow failed with state: {flow_run_state.type}"
)
except Exception as e:
logger.error(f"Error executing subflow: {str(e)}")
raise
This approach gives you:
1. Proper parent-child relationship visible in the UI
2. Execution in the correct work pool via deployment
3. Better observability with logging
4. Proper error handling and status tracking
Alternative: Using run_deployment
There's also another approach using the higher-level run_deployment function:
python
from prefect import flow
from prefect.deployments import run_deployment
@flow
async def main_flow():
# This automatically maintains the parent-child relationship
result = await run_deployment(
name="extract-data/deployment-name", # "flow-name/deployment-name"
parameters={"file_name": file},
timeout=1800,
as_subflow=True # This ensures it shows up as a subflow
)
return result
Both approaches will work, but:
- run_deployment is slightly more concise and handles some details for you
- create_flow_run_from_deployment gives you more fine-grained control
Choose based on your needs:Marvin
07/29/2025, 8:11 AMrun_deployment if you want a simpler, more straightforward approach
- Use create_flow_run_from_deployment if you need more control over the flow run creation and waiting process
Would you like me to explain any of these approaches in more detail?Demid
07/29/2025, 8:45 AMMarvin
07/29/2025, 8:45 AMMarvin
07/29/2025, 8:46 AMpython
from prefect import flow, get_run_logger
from prefect.client import get_client
from prefect.flow_runs import wait_for_flow_run
import uuid
@flow
async def main_flow():
logger = get_run_logger()
async with get_client() as client:
try:
# Generate a unique name for this run
run_name = f"extract-data-{uuid.uuid4().hex[:8]}"
# Create the flow run with unique identifiers
flow_run = await client.create_flow_run_from_deployment(
deployment_id="extract-data-deployment-id",
parameters={"file_name": file},
name=run_name, # Ensure unique name
tags=["triggered-from-main-flow"] # Optional: helpful for tracking
)
<http://logger.info|logger.info>(f"Created flow run with ID: {flow_run.id}")
flow_run_state = await wait_for_flow_run(flow_run.id)
if flow_run_state.is_completed():
return flow_run_state.result()
else:
raise Exception(f"Flow run failed with state: {flow_run_state.type}")
except Exception as e:
logger.error(f"Error creating or executing flow run: {str(e)}")
raise
If you're still getting the error, let's try these troubleshooting steps:
1. First, verify your deployment ID is correct:
python
async with get_client() as client:
# List all deployments to find the correct ID
deployments = await client.read_deployments()
for dep in deployments:
print(f"Name: {dep.name}, ID: {dep.id}")
2. Alternatively, try using run_deployment which might handle some edge cases better:
python
from prefect import flow
from prefect.deployments import run_deployment
@flow
async def main_flow():
try:
result = await run_deployment(
name="extract-data/your-deployment-name", # Use "flow-name/deployment-name"
parameters={"file_name": file},
timeout=1800
)
return result
except Exception as e:
print(f"Error running deployment: {str(e)}")
raise
3. If you're still having issues, let's check the versions:
python
from prefect import __version__
print(f"Prefect version: {__version__}")
Could you please:
1. Confirm which approach you're using (create_flow_run_from_deployment or run_deployment)?
2. Share your Prefect version?
3. Let me know if you see any specific error messages in the server logs?
This will help me provide a more targeted solution for your specific setup.Demid
07/29/2025, 11:10 AMMarvin
07/29/2025, 11:11 AMMarvin
07/29/2025, 11:11 AMpython
import os
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
# Define a common base path via environment variable
# Set this in both work pool environments
STORAGE_PATH = os.getenv("STORAGE_PATH", "/path/to/shared/storage")
@flow
async def extract_data(file_name: str):
# Use the environment-aware path
result_path = os.path.join(STORAGE_PATH, file_name)
# ... your extraction logic ...
return result_path
@flow
async def main_flow():
logger = get_run_logger()
try:
result = await run_deployment(
name="extract-data/your-deployment-name",
parameters={
"file_name": file,
},
timeout=1800
)
# Translate the path if needed
result_path = os.path.join(STORAGE_PATH, os.path.basename(result))
return result_path
except Exception as e:
logger.error(f"Error: {str(e)}")
raise
2. Use Prefect Artifacts for Result Sharing
python
from prefect import flow, get_run_logger
from prefect.artifacts import create_markdown_artifact
import json
@flow
async def extract_data(file_name: str):
# ... your extraction logic ...
# Store the result path as an artifact
result_info = {
"file_path": result_path,
"status": "completed"
}
await create_markdown_artifact(
key="extract-data-result",
value=json.dumps(result_info),
description="Extract data result path"
)
return result_info
@flow
async def main_flow():
logger = get_run_logger()
try:
result = await run_deployment(
name="extract-data/your-deployment-name",
parameters={"file_name": file},
timeout=1800
)
# Result will contain the info we need
if isinstance(result, dict) and "file_path" in result:
# Handle the path appropriately for your environment
return result["file_path"]
except Exception as e:
logger.error(f"Error: {str(e)}")
raise
3. Use a Shared Storage Solution
python
from prefect import flow
from prefect.deployments import run_deployment
# Configure this in both environments to point to the same storage
SHARED_STORAGE_BASE = "/mnt/shared" # or "<s3://your-bucket>" etc.
@flow
async def extract_data(file_name: str):
# Use shared storage path
result_path = f"{SHARED_STORAGE_BASE}/results/{file_name}"
# ... your extraction logic ...
return result_path
@flow
async def main_flow():
result_path = await run_deployment(
name="extract-data/your-deployment-name",
parameters={"file_name": file},
timeout=1800
)
# The path will be valid in both environments
return result_path
Best Practices for Path Handling:
1. Environment Configuration:
bash
# In both work pool environments:
export STORAGE_PATH=/path/to/shared/storage
2. Use Path Utilities:
python
from pathlib import Path
@flow
async def handle_paths(file_path: str):
# Convert to Path object for better handling
path = Path(file_path)
if not path.is_absolute():
path = Path(os.getenv("STORAGE_PATH")) / path
return str(path)
3. Consider Using Remote Storage:
- S3, GCS, or Azure Blob Storage can be accessed consistently across different environments
- Network file systems (NFS) if both work pools are in the same network
Choose the approach that best fits your infrastructure:
- If you have shared storage accessible from both work pools, use approach #3Marvin
07/29/2025, 11:11 AM