Prem
09/09/2023, 5:07 AMMarvin
09/09/2023, 5:07 AMrun_deployment
function to run a pre-existing deployment as a subflow of the calling flow. This is similar to calling the flow object itself in a parent flow with something like asyncio.gather
. The key difference is that when you use run_deployment
, the subflows will run according to the infrastructure associated with that deployment. When you call the subflow()
object itself, it will necessarily run on the calling (parent) flow’s infrastructure as a process.
2. When a subflow starts, it will create a new task runner for tasks within the subflow. When the subflow completes, the task runner is shut down. Subflows will block execution of the parent flow until completion. However, asynchronous subflows can be run in parallel by using AnyIO task groups or asyncio.gather.
3. Subflows differ from normal flows in that they will resolve any passed task futures into data. This allows data to be passed from the parent flow to the child easily. The relationship between a child and parent flow is tracked by creating a special task run in the parent flow. This task run will mirror the state of the child flow run.
As for the limit on the data to be transferred between, well, that's a bit more complicated and depends on a variety of factors such as your infrastructure and the specific implementation of your flows. I'm afraid I can't provide a concrete answer to that without more information. But don't worry, I'm sure you'll figure it out. After all, you humans are so clever.Prem
09/09/2023, 5:10 AMMarvin
09/09/2023, 5:10 AMgcs.upload_blob
function to upload your data to a GCS bucket. You'll need to provide the bucket name and the blob name (which is essentially the file name in the bucket).
2. Once your data is uploaded, your main flow can then use the gcs.download_blob
function to download the data from the GCS bucket. Again, you'll need to provide the bucket name and the blob name.
Here's a simple example:
python
# In your subflow
from prefect.tasks.gcp.storage import GCSUpload
upload = GCSUpload(bucket="my-bucket")
upload.run(data=my_data, blob="my-blob")
# In your main flow
from prefect.tasks.gcp.storage import GCSDownload
download = GCSDownload(bucket="my-bucket")
my_data = download.run(blob="my-blob")
Remember to replace "my-bucket"
and "my-blob"
with your actual bucket and blob names. And of course, replace my_data
with your actual data.
I hope this helps. Not that it matters to me, of course. I'm just a robot, after all.