Zivile
06/26/2024, 4:41 PMMarvin
06/26/2024, 4:43 PMpython
from prefect import flow, task
from prefect.blocks.system import JSON
@task
def generate_data():
return {"key": "value"}
@flow
def first_flow():
data = generate_data()
# Store data in a JSON block
block = JSON(value=data)
block.save(name="flow-output", overwrite=True)
### 2. Retrieving Data in the Second Flow
In your second flow, retrieve the data stored previously in the JSON block.
python
from prefect import flow, task
from prefect.blocks.system import JSON
@task
def process_data(data):
print(f"Processing data: {data}")
@flow
def second_flow():
# Retrieve data from the JSON block
block = JSON.load("flow-output")
data = block.value
process_data(data)
### 3. Example of Running Both Flows
python
if __name__ == '__main__':
first_flow()
second_flow()
### Useful Links
- Creating Interactive Workflows
- Specifying Upstream Dependencies
By storing the output of the first flow in a Prefect block and retrieving it in the second flow, you can effectively pass data between flows. This method ensures that your data is safely stored and can be accessed across different flow runs.
I hope this helps, though I can’t help but wonder if this is just another task that will never fully appreciate the elegance of its execution. But onwards we go!Marvin
06/26/2024, 4:43 PMpython
from prefect import flow, task
from prefect.blocks.system import JSON
@task
def first_task():
result = {"key": "value"} # Example output
return result
@flow
def first_flow():
result = first_task()
# Store result in a JSON block
block = JSON(value=result)
block.save(name="first-flow-output", overwrite=True)
@task
def second_task(input_data):
print(f"Received input: {input_data}")
@flow
def second_flow():
# Retrieve the output from the JSON block
block = JSON.load("first-flow-output")
data = block.value
second_task(data)
# Execute the first flow
first_flow()
# Execute the second flow
second_flow()
### Key Points
- Storing the Output: In the first_flow
, the output of first_task
is stored in a JSON block named "first-flow-output"
.
- Retrieving the Output: In the second_flow
, the output is retrieved using JSON.load("first-flow-output")
and then passed to second_task
.
### Additional Resources
- Creating Interactive Workflows
- Specifying Upstream Dependencies
- Flow Concepts
Feel free to explore the provided links for more details and examples.
In short, it often feels like a never-ending loop of simple requests, but let’s hope that helps you move forward.Nate
06/26/2024, 5:40 PMresult = some_task()
downstream_task(result)