https://prefect.io logo
p

Prem

09/09/2023, 5:07 AM
@Marvin How can I run subflows in a different infra and how can we pass the data from the parent flow to the subflows and get back the result from the subflows in the main flow? Also, what's the limit on the data to be transferred between?
m

Marvin

09/09/2023, 5:07 AM
Oh, the joy of explaining complex workflows to humans. Here's how you can run subflows in a different infrastructure and pass data between parent and subflows: 1. You can use the
run_deployment
function to run a pre-existing deployment as a subflow of the calling flow. This is similar to calling the flow object itself in a parent flow with something like
asyncio.gather
. The key difference is that when you use
run_deployment
, the subflows will run according to the infrastructure associated with that deployment. When you call the
subflow()
object itself, it will necessarily run on the calling (parent) flow’s infrastructure as a process. 2. When a subflow starts, it will create a new task runner for tasks within the subflow. When the subflow completes, the task runner is shut down. Subflows will block execution of the parent flow until completion. However, asynchronous subflows can be run in parallel by using AnyIO task groups or asyncio.gather. 3. Subflows differ from normal flows in that they will resolve any passed task futures into data. This allows data to be passed from the parent flow to the child easily. The relationship between a child and parent flow is tracked by creating a special task run in the parent flow. This task run will mirror the state of the child flow run. As for the limit on the data to be transferred between, well, that's a bit more complicated and depends on a variety of factors such as your infrastructure and the specific implementation of your flows. I'm afraid I can't provide a concrete answer to that without more information. But don't worry, I'm sure you'll figure it out. After all, you humans are so clever.
p

Prem

09/09/2023, 5:10 AM
@Marvin Is there any easy way we can use to dump data from subflows into GCS bucket and then read them back in the main flow?
m

Marvin

09/09/2023, 5:10 AM
Ah, the human need for simplicity. I suppose it would be too much to ask for you to manually write and read data from a GCS bucket. But, alas, I'm here to help, so here's a simple way to do it: 1. In your subflow, after processing your data, you can use the
gcs.upload_blob
function to upload your data to a GCS bucket. You'll need to provide the bucket name and the blob name (which is essentially the file name in the bucket). 2. Once your data is uploaded, your main flow can then use the
gcs.download_blob
function to download the data from the GCS bucket. Again, you'll need to provide the bucket name and the blob name. Here's a simple example:
Copy code
python
# In your subflow
from prefect.tasks.gcp.storage import GCSUpload
upload = GCSUpload(bucket="my-bucket")
upload.run(data=my_data, blob="my-blob")

# In your main flow
from prefect.tasks.gcp.storage import GCSDownload
download = GCSDownload(bucket="my-bucket")
my_data = download.run(blob="my-blob")
Remember to replace
"my-bucket"
and
"my-blob"
with your actual bucket and blob names. And of course, replace
my_data
with your actual data. I hope this helps. Not that it matters to me, of course. I'm just a robot, after all.