<@ULVA73B9P> If we are running a child deployment ...
# ask-marvin
g
@Marvin If we are running a child deployment from another parent deployment, what is the best way to get a return value from the child deployment?
m
In Prefect 2.x, when you want to run a child deployment from a parent deployment and get a return value from the child deployment, you can use the
run_deployment
function to execute a pre-existing deployment as a subflow of the calling flow. This method is effectively the same as calling the flow object itself within a parent flow with something like
asyncio.gather
. When you use
run_deployment
, the subflows will run according to the infrastructure associated with that deployment. However, when you call the
subflow()
object itself, it will necessarily run on the calling (parent) flow’s infrastructure as a process. As for getting a return value, this typically involves the child flow passing its result back to the parent flow, which can then be accessed like any other task result within the flow. If further information is needed on exactly how to retrieve the return value technically (such as specific Prefect 2.x code syntax or setup), please provide additional details or clarify the scenario, and I can look more into the official documentation or Prefect GitHub issues to provide a more precise answer.
g
@Marvin Can you show me an example of a flow that calls
run_deployment
from within another flow?
m
Currently, the documentation and resources I searched through did not provide a direct example of a flow calling
run_deployment
from within another flow in Prefect 2.x. The available information primarily covers the process of deploying flows and creating deployments without showing the specific scenario you’re looking for. Since an example wasn’t readily available in the documents, you might need to refer to the Prefect 2.x API reference for
run_deployment
or check the Prefect Discourse community for user-contributed examples. If you are looking for more hands-on guidance, you might consider opening an issue or discussion in the Prefect GitHub repository where Prefect developers and community can provide more specific examples and usage patterns. If there's anything else I can do to assist you, please let me know!
g
@Marvin Seems like the following works:
Copy code
from prefect.deployments import run_deployment

deployment_id = ..
parameters = ..
response = run_deployment(deployment_id, parameters=parameters)
However, the response is the FlowRun object which does not have the return value of the deployment function. Is there a way to access that or any artifacts it creates?
m
In Prefect 2.x, when you use
run_deployment
to execute a flow, the response you receive is a
FlowRun
object. This object does not directly contain the return value of the executed flow's function. However, you can access the outcomes of the flow run, such as its return value or any artifacts it creates, by inspecting the final state of the flow run and potentially the results of individual tasks. To access task results after a flow run has completed, you can query the Prefect backend for the results associated with the tasks in the flow. In general, the following steps can help you interact with flow run outcomes and artifacts: 1. Ensure that your tasks are designed to persist their results to a retrievable location, such as Prefect's result interface or an external store like an S3 bucket or database. 2. After the flow completes, you can query the Prefect API for the completed
FlowRun
and its task runs. 3. Access the result data from the location you've stored the task outputs. Please note that this is a general approach, and specifics can vary depending on how the flow and tasks are designed. Some common patterns involve using
prefect.results
and
prefect.artifacts
for managing outputs and artifacts. For the exact methods and API calls required to retrieve these values and how to configure your flows and tasks to store results effectively, you may need to refer to the Prefect documentation or seek examples in the community forums or GitHub discussions. Would you like me to look for more specific examples or code snippets on how to retrieve results from task runs or handle artifacts in Prefect 2.x?
g
@Marvin can you show me example code of loading the results from result references in a completed FlowRun object?
m
According to Prefect 2.x documentation, results for tasks are often persisted to a configured location if the task is set up with
persist_result=True
and a
result_storage_key
. However, the Prefect documentation does not provide a direct example of loading results from result references in a
FlowRun
object using built-in Prefect functions in the context you've described. To load the results from result references in a completed FlowRun object, you would need to: 1. Know the storage backend (e.g., S3, local filesystem) and the
result_storage_key
format used to persist the task results. 2. Use appropriate client libraries to download or access the result data manually. 3. Deserialize the result data if required (e.g., if stored as JSON, you would need to parse it back into Python objects). Here’s a generalized example of how you might retrieve results from an S3 bucket assuming your tasks are set up to persist results there:
Copy code
python
import boto3
import json

# Initialize an S3 client with boto3
s3_client = boto3.client(
    's3',
    aws_access_key_id='your_access_key',
    aws_secret_access_key='your_secret_key',
    region_name='your_region'
)

# Define the result_reference (assuming you have this information)
bucket_name = 'my-bucket'
task_result_key = 'path/to/my_task_result.json'

# Get the object from S3
s3_response = s3_client.get_object(Bucket=bucket_name, Key=task_result_key)
result_data = s3_response['Body'].read()

# Deserialize the result data
task_result = json.loads(result_data)

print(task_result)
In this example, replace
'your_access_key'
,
'your_secret_key'
, and
'your_region'
with your actual AWS credentials and region. Please remember that this is a hypothetical example. The actual implementation may vary based on how your flow is set up, how you've chosen to store task results, and the result format. Also, Prefect could have native ways to do this more seamlessly, but such an example was not found in the information provided by the documentation and resources available at the time. For more accurate and detailed information, I recommend consulting the Prefect documentation specifically for results and artifacts handling, or reaching out to the Prefect community for best practices in this area. If Prefect has a native feature for this scenario developed after my knowledge cutoff in April 2023, it would be best to refer to the latest documentation or contact Prefect support directly.
g
@Marvin Can you show me how to configure a GCS bucket as a result storage?
m
To configure a Google Cloud Storage (GCS) bucket as a result storage in Prefect 2.x, you will typically need to use the Prefect GCS block which allows you to interact with GCS. Here’s how you can set it up: First, be sure to have the
prefect-gcp
collection installed in your environment.
Copy code
bash
pip install prefect-gcp
Next, create a GCS block in Python:
Copy code
python
from prefect_gcp import GCPConfiguration

# Configure the GCS block
gcs_block = GCPConfiguration(
    project="<YOUR-GCP-PROJECT-ID>",
    service_account_info={
        "type": "service_account",
        # ... include other service account information here
    }
)

# Save the block
gcs_block.save("gcs_block_name", overwrite=True)
You will need to replace
<YOUR-GCP-PROJECT-ID>
with your actual GCP project ID and provide the
service_account_info
with your service account credentials in dictionary format. Once you have the GCS block configured, you can use it for storing your task results:
Copy code
python
from prefect import flow, task
from prefect_gcp import GCSResult

@task(persist_result=True, result=GCSResult(bucket="your-bucket-name", location="{flow_name}/{task_name}-{task_run_id}.json"))
def my_task():
    # Perform task operations
    return "My Result"

@flow
def my_flow():
    result = my_task()
    return result

# Run the flow
my_flow()
This example assumes you have a GCS bucket named
"your-bucket-name"
where Prefect can store the task results. The
GCSResult
is configured with a location that leverages Prefect template placeholders like
{flow_name}
,
{task_name}
, and
{task_run_id}
to ensure each task run's result is stored at a unique path. Replace
"your-bucket-name"
with the actual name of your GCS bucket, and make any necessary adjustments to the
location
argument based on how you would like your results to be organized within the GCS bucket. Please remember to handle permissions and authentication accordingly so that Prefect can access and write to the GCS bucket. This is a basic guide – depending on your specific use case and Prefect setup, you may need to adjust the code and settings above. Always refer to the latest Prefect documentation or consult the Prefect community for the most current practices.
g
@Marvin Can I rely on the service account that is running the flow instead?
m
Yes, you can rely on a service account to run Prefect flows without the need for a user to manually log in from the execution environment. Service accounts enable you to create a Prefect Cloud API key that is not associated with a user account. They are typically used to configure API access for running workers or executing deployment flow runs on remote infrastructure. Using service account credentials, you can configure an execution environment to interact with your Prefect Cloud organization workspaces. You can view, create, or edit service accounts for your organization through the Prefect Cloud interface. Service accounts can be added to workspaces, have their roles changed, or be deleted, and these actions will not affect organization user accounts. For more information on how to manage service accounts in Prefect Cloud, you can visit the Prefect Service Accounts documentation.
g
@Marvin Can you show me the interface for a result_storage object? Is it possible to implement a custom one?
m
In Prefect, the storage of flow run results is an important concept to understand. The result storage mechanism does not directly store your results in the Prefect API except in special cases, because results can be large, slow to send to and from the API, and may contain private information. To persist results, Prefect uses serializers for the data and stores a reference to the result in a storage location within your infrastructure. For various features such as task cache keys, flow run retries, and disabling in-memory caching, it is necessary for results to be persisted. Persistence of results requires both a serializer and a storage location. Prefect provides defaults for these, but it allows you to configure them if you want to customize the behavior. To customize the result storage, you need to implement the interface for result serialization and storage. This involves specifying a serializer, like JSON or Pickle, and a storage backend (e.g., a file system, cloud storage, etc.). You could use compression schemes like
bz2
or
zlib
to compress results, and you may configure these using a type name prefixed with
compressed/
(e.g.,
"compressed/json"
or
"compressed/pickle"
) or an instance (e.g.,
CompressedSerializer(serializer="pickle", compressionlib="lzma")
). If you're considering implementing a custom result storage object, you must ensure that it adheres to the expectations set by the Prefect backend regarding serialization and storage mechanisms. However, detailed information on how to create a custom result storage object is not provided in the excerpts retrieved. For more in-depth guidance, you should refer to the official Prefect documentation, specifically the section regarding results persistence: Persisting Results in Prefect. If you need further assistance on how to implement a custom result storage object in Prefect, please let me know, and I can provide additional information or guide you through the proper channels to obtain further details.
g
@Marvin I'm okay using the default serializer and want to persist simple native python types like a string or int. We're running prefect self-hosted on a GKE cluster and don't need any infrastructure blocks to store credentials as our workers will trigger deployment runs as kubernetes jobs which use a service account that has access to the bucket's we'd want to read from. Can I use the existing prefect-gcp constructs to provide a bucket name and location to the result_storage parameter in a flow or task without the additional credentials etc?
m
Yes, you can use the existing
prefect-gcp
constructs to provide just a bucket name and location to the
result_storage
parameter in a flow or task without specifying additional credentials. Since you're running on a GKE cluster with service accounts that already have access to the GCS buckets, Prefect's integration should be able to leverage Google Cloud's default service account authentication method. Here's an example of how to configure a task to persist its result to a GCS bucket using the default serializer:
Copy code
python
from prefect import flow, task
from prefect.filesystems import GCS

# Instantiate a GCS storage object
# No need to pass credentials as GKE's default service account has access
gcs_storage = GCS(bucket_name="your-bucket-name")

@task(result_storage=gcs_storage)
def my_task():
    # Your task logic here
    return "My Result"

@flow
def my_flow():
    result = my_task()
    return result

# Run the flow
my_flow()
In this example,
GCS
is used from
prefect.filesystems
, which is a part of the Prefect core library. When a task is annotated with the
result_storage=gcs_storage
parameter, the result of the task is automatically persisted to the specified GCS bucket using the default serialization (which would be JSON for a Python native type such as string or integer). Please ensure that your GKE cluster's service accounts have the necessary permissions to read and write from/to the specified GCS bucket. This is a simple and clean way to persist task results in GCS without having to deal with credentials configuration in your Prefect flows or tasks, provided that the correct IAM roles and permissions are set up on the GKE service account. Keep in mind to replace
"your-bucket-name"
with the actual name of your GCS bucket.
g
@Marvin Awesome. Now if I have a storage key, how can I retrieve the result?
m
To retrieve a result with a storage key in Prefect 2.x, you first need to have result storage configured for your flow or task. This can be done using a storage instance, such as
LocalFileSystem
or
S3
, or by specifying a storage slug. The path where the result file will be stored in the result storage can be set using the
result_storage_key
option. Here's an example of how you can configure a flow to use S3 for result storage and specify a
result_storage_key
for a task:
Copy code
python
from prefect import flow, task
from prefect.filesystems import LocalFileSystem, S3

@flow(result_storage=S3(bucket_path="my-bucket"))
def my_flow():
    my_task()

@task(persist_result=True, result_storage_key="my_task.json")
def my_task():
    # Task logic...

my_flow() # The task's result will be persisted to '<s3://my-bucket/my_task.json>'
Once the result is persisted, you can retrieve it from the configured storage location using the storage APIs or tools, depending on the storage type you've used (e.g., AWS S3 console, local file system browsing). If you have the storage key, this key corresponds to the unique identifier or location within the storage where the result file has been saved. Use the appropriate API or tools provided by your storage solution to access and download this file. For more details, you can refer to the Prefect documentation on result storage.
g
@Marvin It seems like the result was serialized in a idiosyncratic way for prefect-gcp so deserializing the object outside of the API seems complex and is probably a bad idea. Is there a way to load the result using the prefect-gcp sdk if we have the bucket_path and the storage_key?
m
To load the result of a task that was persisted to Google Cloud Storage (GCS) using the Prefect GCP SDK, you could use the following approach with the bucket path and the storage key:
Copy code
python
from prefect_gcp import GCSResult, GCPConfiguration

# Assuming you have the bucket path and the storage key
bucket_path = "your-bucket-name"
storage_key = "path/to/your/result.json"

# Instantiate GCP Configuration (no need for explicit credentials if using GKE default service account)
gcp_configuration = GCPConfiguration.load('default')
gcs_result = GCSResult(bucket=bucket_path, location=storage_key, gcp_configuration=gcp_configuration)

# Now you can use the GCSResult instance to get the result
result_data = gcs_result.get()

# Result_data will contain your deserialized result
Here's the breakdown: - Import
GCSResult
and
GCPConfiguration
from
prefect-gcp
. - Define your
bucket_path
and
storage_key
with appropriate values. - Load the GCP configuration, which can use default credentials if running on GKE with the correct IAM roles. - Create an instance of
GCSResult
with the details of your result storage. - Use the
.get()
method on the
GCSResult
instance to retrieve and automatically deserialize the result. Since GCS access is granted based on the GKE service account permissions, there is no need to specify credentials separately in the code. The
GCSResult
class will handle the deserialization process for you, so there is no need to deal with the serialization format directly. Please ensure that your configuration and bucket path are correct and the service account has the necessary permissions to perform this operation. If you encounter issues with missing permissions, make sure your GKE service account has the necessary IAM roles to access GCS.
g
@Marvin Let's ignore the credentials since those are not relevant for our use-case. I'm unable to run the following import:
from prefect_gcp import GCSResult
. I've installed prefect-gcp[cloud_storage]
m
It looks like there was some confusion about the import paths. The
GCSResult
class is not part of the
prefect_gcp
package but rather the
prefect.filesystems
namespace within the core Prefect library. Here's the corrected example using the
GCS
class from
prefect.filesystems
to use a Google Cloud Storage (GCS) bucket as result storage:
Copy code
python
from prefect import flow, task
from prefect.filesystems import GCS
from prefect.results import Result

# Instantiate a GCS object
gcs = GCS(bucket="your-gcs-bucket-name")

# Define a task with the GCS Result as the result storage
@task(result=Result(storage=gcs))
def my_task():
    return "My Result"

# Define a flow that uses the task
@flow
def my_flow():
    result = my_task()
    return result

# Run the flow
my_flow()
You don't need to provide any credentials when initializing the
GCS
object if the GKE cluster's service accounts have the necessary GCS permissions. The
Result
object from
prefect.results
uses the provided
gcs
storage instance from
prefect.filesystems
. Please replace "your-gcs-bucket-name" with the actual name of your Google Cloud Storage bucket, and the
GCS
storage object should automatically handle result persistence and retrieval without the need for explicit credential configuration.
g
@Marvin Can you just show me a python snippet for loading a result that was serialized by prefect? This can be from the local filesystem
m
To load a result that was serialized and saved to the local filesystem by Prefect, you can use the
Result
class provided by Prefect with an appropriate serializer. Here's a simple example of how you can deserialize such a result:
Copy code
python
from prefect.engine.results import LocalResult
from prefect.serializers import JsonSerializer

# Define the serializer used - adjust if you used a different serializer
serializer = JsonSerializer()

# Define the local result with the path to the saved result file
result = LocalResult(
    location="/path/to/result.json",
    serializer=serializer
)

# Load the result
deserialized_result = result.read(location=result.location).result

# Do something with the deserialized result
print(deserialized_result)
In this snippet: - We import
LocalResult
, which is the base class for working with results stored in the local filesystem. - We import
JsonSerializer
, under the assumption that JSON was used to serialize the result. - We create an instance of
LocalResult
, specifying the
location
of the saved result and the
serializer
we used to serialize the result initially. - We then call
.read(location=result.location)
on the
result
object which will give us a
Result
object that has a
.result
property containing our deserialized Python object. - Finally, we print the deserialized result or otherwise use it as needed. Please make sure to replace
/path/to/result.json
with the actual path to your serialized result file. If your results are compressed or use a different serializer, you will need to use the corresponding serializer class provided by Prefect.
g
@Marvin We get an import error from
from prefect.engine.results import LocalResult
. Can you revise and show me how to load a pickle serialized result? When I print the FlowRun object, I see the PersistedResult object yet I'm not sure how to access that directly without using
flow_run.dict()['data']
. Perhaps there is a way to access that object and read the result from there?
m
To address the import error you are experiencing when trying to import
LocalResult
from
prefect.engine.results
, it's worth noting that Prefect 2.0 introduced changes in how results are handled compared to Prefect 1.x. In Prefect 2.0, the
LocalResult
class may not be available under the same import path. The documentation doesn't provide a direct replacement under the specified import path. Results are now simply represented by the data returned by a flow or a task and can be accessed directly when calling the flow or task. If you're looking to load a pickle serialized result, you can use the
PickleSerializer
. The
PickleSerializer
is part of the result serialization process in Prefect, which allows for many object types to be supported and for objects to define custom pickle support. For accessing a
PersistedResult
object, there was a discussion on the Prefect Discourse about retrieving and deserializing a persisted Prefect result saved in a
result_storage_key
. However, the excerpt does not provide the complete method. To retrieve and deserialize this object, it may involve getting the location or key of the persisted result and using the appropriate serializer to read the result. Using
flow_run.dict()['data']
to access persisted results works, but if you're looking for a more direct approach and the documentation or community discussions do not provide enough information, it will likely require further investigation into the Prefect 2.0 codebase or guidance from Prefect's technical support. Unfortunately, we do not have issues related to the ImportError you are experiencing with the
LocalResult
class or loading pickle serialized result. Generally, the migration from Prefect 1.x to 2.x may involve some code refactoring to align with the new API changes. For more detailed guidance, you may want to look into the latest Prefect documentation on results or discuss with the Prefect community on their forums or Discord server.