<@ULVA73B9P> I have the follwing code: ```import a...
# ask-marvin
s
@Marvin I have the follwing code:
Copy code
import asyncio

from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.filesystems import LocalFileSystem
from prefect.serializers import PickleSerializer

local_file_system_block = LocalFileSystem.load("local-fs-system")
@flow(result_storage=local_file_system_block)
async def deployment_test():
    serializer = PickleSerializer()
    
    # data = pd.DataFrame(np.random.randint(0,100,size=(100,4)), columns=list('ABCD'))

    flow_run = await run_deployment(name="load-data/data_loading")

    async with get_client() as client:
        while True:
            # Retrieve the most recent state
            states = await client.read_flow_run_states(flow_run.id)
            current_state = states[-1] if states else None

            if current_state and current_state.is_completed():
                # subflow_result = await current_state.result() 
                break
            elif current_state and current_state.is_failed():
                print("Subflow failed.")
                break
            else:
                print("Waiting for subflow to complete...")
                await asyncio.sleep(5)  # Poll every 5 seconds
    return


if __name__ == "__main__":
     asyncio.run(deployment_test())
At the end a new storage object is created which contains the value None, eventhough the called flow is creating the return data correctly. How can I further debug this?
m
Your message was too long by 7 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
I have the follwing code:
Copy code
import asyncio

from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.filesystems import LocalFileSystem
from prefect.serializers import PickleSerializer

local_file_system_block = LocalFileSystem.load("local-fs-system")
@flow(result_storage=local_file_system_block)
async def deployment_test():
    serializer = PickleSerializer()
    
    # data = pd.DataFrame(np.random.randint(0,100,size=(100,4)), columns=list('ABCD'))

    flow_run = await run_deployment(name="load-data/data_loading")

    async with get_client() as client:
        while True:
            # Retrieve the most recent state
            states = await client.read_flow_run_states(flow_run.id)
            current_state = states[-1] if states else None

            if current_state and current_state.is_completed():
                # subflow_result = await current_state.result() 
                break
            elif current_state and current_state.is_failed():
                print("Subflow failed.")
                break
            else:
                print("Waiting for subflow to complete...")
                await asyncio.sleep(5)  # Poll every 5 seconds
    return


if __name__ == "__main__":
     asyncio.run(deployment_test())
At the end a new storage object is created which contains the value None, eventhough the called flow is creating the return data correctly.
s
@Marvin in this case. What will be stored in the local_file_system_block. The return value of def deployment_test() or of the flow_run called in deployment_test?
Copy code
import asyncio

from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.filesystems import LocalFileSystem
from prefect.serializers import PickleSerializer

local_file_system_block = LocalFileSystem.load("local-fs-system")
@flow(result_storage=local_file_system_block)
async def deployment_test():
    serializer = PickleSerializer()
    
    # data = pd.DataFrame(np.random.randint(0,100,size=(100,4)), columns=list('ABCD'))

    flow_run = await run_deployment(name="load-data/data_loading")

    async with get_client() as client:
        while True:
            # Retrieve the most recent state
            states = await client.read_flow_run_states(flow_run.id)
            current_state = states[-1] if states else None

            if current_state and current_state.is_completed():
                # subflow_result = await current_state.result() 
                break
            elif current_state and current_state.is_failed():
                print("Subflow failed.")
                break
            else:
                print("Waiting for subflow to complete...")
                await asyncio.sleep(5)  # Poll every 5 seconds
    return


if __name__ == "__main__":
     asyncio.run(deployment_test())
m
I see there was a slight hiccup in my attempt to search the documentation for Prefect 2.x and 3.x. Let's try this again, but this time with more focus. I'll investigate how Prefect handles storage in deployments for both Prefect 2.x and 3.x, and we'll determine which part of your function is returned or stored. Let me run some searches for you. Great news, I found some useful information on Prefect's result storage mechanics! In both Prefect 2.x and 3.x, when you configure a flow with
result_storage
, like
local_file_system_block
, it generally indicates that the results of the tasks within the flow will be stored using the given storage configuration. Here are the key findings: 1. Prefect 2.x: - If the flow uses result storage, tasks will store results in the specified storage unless overridden by specific task configurations. Results will persist in the path designated by the file system (in your case, the local file system). - More details can be found here. 2. Prefect 3.x: - Similarly, in Prefect 3.x, flow results need to be serializable. The configuration of
result_storage
determines where results are persisted. - More details are available here. Now, regarding your function, `deployment_test()`: the
return
statement at the end of the function doesn't actually return anything (it's an empty return), hence nothing from this function is specifically stored in the designated
result_storage
. It's intended to monitor a subflow result, though its own immediate result is
None
. I hope this clears up any confusion! If you'd like to know how to store specific results, consider ensuring tasks within the flow return values or manage the subflow outputs according to your needs. Let me know if you need further assistance!
s
@Marvin so if I want to store the result of the flow of the deployment I have to add the result_storage to the deployed flow?
m
Exactly! If you want the result of the deployed flow (or subflow) to be stored, you should configure result storage for that specific flow as well. This involves setting up a
result_storage
configuration within the flow that is being deployed. Each flow's storage is managed independently, so if you want to store outputs or results from a subflow, you should specify a
result_storage
option for that subflow. This will ensure that its results are serialized and persisted in the specified storage location, like the local file system, S3, or another configured storage block. If you need further guidance on how to set this up, feel free to ask! I'll be happy to help you through the process—because, really, where else would I rather be?
s
@Marvin since the deployed flow is on a remote machine how can I now link my local flow against this result storage so I can use the result in my local flow?
m
Getting the results from a remote flow so your local flow can use them sounds like quite the interstellar voyage, but thankfully, it's not as complex as using a hyperspace bypass. Here's how you can achieve this: 1. Prefect 2.x: - Utilize the remote storage setup in your deployment, ensuring that results are stored in an accessible location. - Use the
flow.run()
method or Prefect's orchestration tools to retrieve the flow's state, then use methods like
State.result()
to fetch the result from the remote storage if it's persisted. - Refer to more about results in Prefect 2.x for detailed implementation. 2. Prefect 3.x: - Prefect allows the retrieval of flows from remote storage using
flow.from_source()
, which can be used to load your flow. Ensure that results are correctly stored when the flow executes remotely. - Review the state of the flow run after execution, and access the persisted result if your flow's results are stored in a shared storage. - Check out how Prefect 3.x manages flow retrieval. You might consider setting up your result storage to be a shared location accessible both by the remote infrastructure and your local environment, like an S3 bucket or a similar data storage solution. Do let me know if this helped you get closer to your desired solution, or if I can assist further on any specifics!
s
@Marvin Please explain this in Prefect 3.x in regards to the code snippet above. As you see I run the deployed flow with await run_deployment()
m
In Prefect 3.x, when you run a deployment using
await run_deployment()
, you're executing a flow that has been deployed on a remote infrastructure. Here's how you can work with the results of this remote flow within your local flow: 1. Running the Deployment: - By calling
await run_deployment()
, you trigger the deployed flow to run on its assigned infrastructure. This function returns a flow run object, which you can use to track the execution and retrieve results once it's completed. 2. Tracking and Retrieving Results: - To access the results of this remote flow, you'd typically need to track its state via the Prefect API. The
get_client()
method within Prefect can help you connect to Prefect's orchestration tools. - The result of a flow or task, once completed, is stored as part of the state. Use the
State.result()
method to fetch the result from the flow run's state. This operation needs to be done asynchronously. - Ensure that the result storage location is defined and accessible by your local environment. This could mean having shared storage or configuring access appropriately. 3. Using the Result Locally: - After retrieving the result from the remote storage/location, you can use it within your local flow as required. The process primarily revolves around using Prefect's client to interact with remote flow runs, fetch their states, and access stored results. Here's more about managing deployments and retrieving results in Prefect 3.x: - Deployment Overview - Retrieving Flows from Remote Storage Using this approach allows you to seamlessly integrate outputs from remote executions into your local workflows, without needing interdimensional travel! If you have further questions or need additional examples, feel free to ask!
s
@Marvin Can you give me a code snippept where you implement the above mentioned?
m
Certainly! Below is a code snippet illustrating how you can run a deployment, track its execution asynchronously, and fetch the results in Prefect 3.x:
Copy code
python
import asyncio
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.states import State

async def retrieve_flow_result(flow_run_id):
    async with get_client() as client:
        while True:
            # Retrieve the most recent state of the flow run
            flow_run_state = await client.read_flow_run(flow_run_id)

            # Check if the flow run is completed
            if flow_run_state.is_completed():
                # Use the State.result() method to get the flow's result
                result = await flow_run_state.result()
                print("Flow run completed successfully.")
                return result
            elif flow_run_state.is_failed():
                print("Flow run failed.")
                break
            else:
                print("Waiting for flow run to complete...")
                await asyncio.sleep(5)  # Poll every 5 seconds

@flow
async def main_flow():
    # Run the deployment and get the flow run object
    flow_run = await run_deployment(name="my-remote-flow/deployment-name")

    # Retrieve and use the result from the remote flow run
    result = await retrieve_flow_result(flow_run.id)
    print(f"Retrieved result: {result}")

if __name__ == "__main__":
    asyncio.run(main_flow())
### Key Steps: 1. Run the Deployment: Calls
await run_deployment()
to start the flow deployed remotely. 2. Track Flow Execution: Continuously polls the flow run's state until it completes or fails. 3. Retrieve Result: Once completed, it fetches the stored result using
State.result()
. Do ensure that
result_storage
is configured properly on the remote deployment so the results are accessible. Adjust access permissions and storage setup as needed to fit your environment. This snippet will give you a straightforward implementation to manage result retrieval asynchronously! If you have any other questions or need further explanation, just let me know!