Steffen
11/20/2024, 3:09 PMMarvin
11/20/2024, 3:10 PMrun_deploymentSteffen
11/20/2024, 3:11 PMMarvin
11/20/2024, 3:11 PMrun_deploymentpython
from prefect import flow, task
from prefect.deployments import run_deployment
# Define your main local flow
@flow
def parent_flow():
    print("Running the parent flow.")
    # Call the deployed subflow
    subflow_result = run_deployment(name="my-deployed-flow")  # Replace with your deployment name
    print(f"Subflow result: {subflow_result}")
if __name__ == "__main__":
    parent_flow()"my-deployed-flow"Steffen
11/20/2024, 3:17 PMimport asyncio
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
@flow
async def deployment_test():
    
    # data = pd.DataFrame(np.random.randint(0,100,size=(100,4)), columns=list('ABCD'))
    flow_run = await run_deployment(name="load-data/data_loading")
    async with get_client() as client:
        while True:
            # Retrieve the most recent state
            states = await client.read_flow_run_states(flow_run.id)
            current_state = states[-1] if states else None
            if current_state and current_state.is_completed():
                print(f"{current_state}")
                subflow_result = await current_state.result(fetch=True) 
                print(f"Final processed result: {subflow_result}")
                break
            elif current_state and current_state.is_failed():
                print("Subflow failed.")
                break
            else:
                print("Waiting for subflow to complete...")
                await asyncio.sleep(5)  # Poll every 5 seconds
if __name__ == "__main__":
     asyncio.run(deployment_test())Marvin
11/20/2024, 3:17 PMSteffen
11/20/2024, 3:20 PMMarvin
11/20/2024, 3:20 PMPREFECT_RESULTS_PERSIST_BY_DEFAULTPREFECT_DEFAULT_RESULT_STORAGE_BLOCKresult_storagePREFECT_RESULTS_DEFAULT_SERIALIZERPREFECT_TASKS_DEFAULT_PERSIST_RESULTprefect worker start <work_pool_name>PREFECT_WORKER_HEARTBEAT_SECONDS--limitprefect worker startSteffen
11/20/2024, 3:24 PMMarvin
11/20/2024, 3:25 PMPREFECT_RESULTS_PERSIST_BY_DEFAULTtrueresult_storagepython
     from prefect.filesystems import LocalFileSystem
     storage = LocalFileSystem(basepath="/path/to/store")@taskpersist_result=Truepython
     from prefect import task
     @task(persist_result=True)
     def my_task(data):
         return dataSteffen
11/20/2024, 3:26 PMMarvin
11/20/2024, 3:26 PMconfig.tomlPREFECT_RESULTS_PERSIST_BY_DEFAULTPREFECT_DATABASE_CONFIG_CONNECTION_URLget_clientpython
     from prefect.client.orchestration import get_client
     import asyncio
     async def verify_results(flow_run_id):
         async with get_client() as client:
             flow_run_state = await client.read_flow_run(flow_run_id)
             print(flow_run_state.result)  # Check result persistence here
     
     asyncio.run(verify_results("your-flow-run-id"))shell
     echo $PREFECT_RESULTS_PERSIST_BY_DEFAULT
     echo $PREFECT_API_URLSteffen
11/20/2024, 4:00 PMMarvin
11/20/2024, 4:01 PMNonepersist_result=Truepython
     from prefect import task
     @task(persist_result=True)
     def my_task():
         return "Expected Data"python
     from prefect import task
     from prefect.serializers import JSONSerializer
     @task(result_serializer=JSONSerializer())
     def my_task():
         return "Expected Data"state.resultNoneNoneSteffen
11/20/2024, 4:01 PMMarvin
11/20/2024, 4:01 PMpersist_resultpersist_result=Truepython
from prefect import flow
@flow(persist_result=True)
def my_flow():
    return "Flow result with persistence"
# Running the flow
my_flow()my_flowmy_flow