Prem09/09/2023, 5:07 AM
Marvin09/09/2023, 5:07 AM
function to run a pre-existing deployment as a subflow of the calling flow. This is similar to calling the flow object itself in a parent flow with something like
. The key difference is that when you use
, the subflows will run according to the infrastructure associated with that deployment. When you call the
object itself, it will necessarily run on the calling (parent) flow’s infrastructure as a process. 2. When a subflow starts, it will create a new task runner for tasks within the subflow. When the subflow completes, the task runner is shut down. Subflows will block execution of the parent flow until completion. However, asynchronous subflows can be run in parallel by using AnyIO task groups or asyncio.gather. 3. Subflows differ from normal flows in that they will resolve any passed task futures into data. This allows data to be passed from the parent flow to the child easily. The relationship between a child and parent flow is tracked by creating a special task run in the parent flow. This task run will mirror the state of the child flow run. As for the limit on the data to be transferred between, well, that's a bit more complicated and depends on a variety of factors such as your infrastructure and the specific implementation of your flows. I'm afraid I can't provide a concrete answer to that without more information. But don't worry, I'm sure you'll figure it out. After all, you humans are so clever.
Prem09/09/2023, 5:10 AM
Marvin09/09/2023, 5:10 AM
function to upload your data to a GCS bucket. You'll need to provide the bucket name and the blob name (which is essentially the file name in the bucket). 2. Once your data is uploaded, your main flow can then use the
function to download the data from the GCS bucket. Again, you'll need to provide the bucket name and the blob name. Here's a simple example:
Remember to replace
python # In your subflow from prefect.tasks.gcp.storage import GCSUpload upload = GCSUpload(bucket="my-bucket") upload.run(data=my_data, blob="my-blob") # In your main flow from prefect.tasks.gcp.storage import GCSDownload download = GCSDownload(bucket="my-bucket") my_data = download.run(blob="my-blob")
with your actual bucket and blob names. And of course, replace
with your actual data. I hope this helps. Not that it matters to me, of course. I'm just a robot, after all.