Hi there, I am trying to build a "flow of flows",...
# ask-community
b
Hi there, I am trying to build a "flow of flows", where two flows (a and b) are run by a parent flow (p). I have a cron clock which will run 5 times a day at specific times. I need to pass two varying parameters (t1 and t2) to flows a and b at the beginning of each schedule. These parameters represent timestamps where, the first time everything is scheduled, t1 will be
'0000-00-00 00:00:00'
and t2 will be
time.now()
. The next time schedule runs, t1 needs to be the previous value of t2, and t2 will once again be
time.now()
. I can see that I may need to use a task to compute the parameters within the script of the parent flow (similar to here) and use the
StartFlowRun
task (as seen here), but I'm not quite sure how to put it together. Any help would be appreciated, thank you!!
k
@Blake List I think you are on the right track but this is hard to execute without a third-party store for those timestamps to be cached and loaded for the next run. I am honestly a bit lost with the flows, but I am pretty sure the KV Store is a step in the right direction where you can persist those times. This sounds like you are trying to do some kind of ETL on deltas (new records not processed yet?). The KV Store will let you keep track of the times processed. The KV Store also does not need to work inside of tasks as it uses the client as long as you're authenticated
They may not even need to be parameters? They can just be loaded in and saved during the flow run
b
Hi @Kevin Kho, thanks for your message. Yes, essentially I have two flows that pull data from different places and transform it (and soon combine it in a third flow). As new data is generated for those two locations, throughout each day, I want to schedule both flows to pull only the data it hasn't already loaded in yet (based on the timestamps). I hope that makes sense. I will check out the key-value store
k
I think you have two options. First is you use the KV store in the parent flow and it handles interacting with the KV store and you pass it as parameters. The second is you don't pass any parameters and then each of those subflows interacts with the KV store directly. From experience though, I think you want those subflows to be parameterized so that you can run them with different timestamps if needed. I would personally lean towards the first setup
๐Ÿ‘ 1
b
Thanks very much. Yes I think the first approach is better. I'll get right on it! ๐Ÿ™‚
Hi @Kevin Kho, just to follow up: I have three flows (a, b and parent p) which look like this: flow_a.py
Copy code
import prefect
from prefect import task, Flow

@task
def flow_a_time_do_something(t1, t2):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Flow A - Time from: {t1}")
    <http://logger.info|logger.info>(f"Flow A - Time until: {t2}")
    return

def main():
    with Flow("flow_a") as flow:
        flow_a_time_do_something(t1, t2)

    try:
        client = Client()
        client.create_project(project_name='test_pipeline')
    except prefect.utilities.exceptions.ClientError as e:
        <http://logger.info|logger.info>("Project already exists")

    flow.register(project_name="test_pipeline", idempotency_key=flow.serialized_hash(), labels=["development"], add_default_labels=False)

if __name__ == "__main__":
    main()
flow_b.py
Copy code
import prefect
from prefect import task, Flow

@task
def flow_b_time_do_something(t1, t2):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Flow B - Time from: {t1}")
    <http://logger.info|logger.info>(f"Flow B - Time until: {t2}")
    return

def main():
    with Flow("flow_b") as flow:
        flow_b_time_do_something(t1, t2)

    try:
        client = Client()
        client.create_project(project_name='test_pipeline')
    except prefect.utilities.exceptions.ClientError as e:
        <http://logger.info|logger.info>("Project already exists")

    flow.register(project_name="test_pipeline", idempotency_key=flow.serialized_hash(), labels=["development"], add_default_labels=False)

if __name__ == "__main__":
    main()
flow_p.py
Copy code
import time
from datetime import datetime, timedelta
import prefect
from prefect import task, Flow
from prefect.schedules import clocks, Schedule
from prefect.backend import set_key_value, get_key_value
from prefect.tasks.prefect import StartFlowRun


@task(nout=2)
def get_time_params():
    t1 = get_key_value(t1_key)
    t2 = get_key_value(t2_key)
    return t1, t2

@task
def set_time_params(t2):
    set_key_value(key=t1_key, value=t2)
    set_key_value(key=t2_key, value=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
    return

def main():
    
    now = datetime.now()
    
    clock = clocks.IntervalClock(start_date=now + datetime.timedelta(seconds=30),
                                 interval=datetime.timedelta(minutes=1))
    
    schedule = Schedule(clocks=[clock])
    
    t1_key = "0000-00-00 00:00:00"
    t2_key = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
    
    flow_a = StartFlowRun(flow_name="flow_a", project_name="test_pipeline", wait=True)
    flow_b = StartFlowRun(flow_name="flow_b", project_name="test_pipeline", wait=True)

    
    with Flow("flow_p", schedule=schedule) as flow:
        t1, t2 = get_time_params()
        flow_a.parameters = {'t1_key': t1, 't2_key': t2}
        flow_b.parameters = {'t1_key': t1, 't2_key': t2}
        set_time_params(t2=t2)

    try:
        client = Client()
        client.create_project(project_name='test_pipeline')
    except prefect.utilities.exceptions.ClientError as e:
        <http://logger.info|logger.info>("Project already exists")

    flow.register(project_name="test_pipeline", idempotency_key=flow.serialized_hash(), labels=["development"], add_default_labels=False)

if __name__ == "__main__":
    main()
I am just wondering what the syntax is for passing the values t1 and t2 into the two flows a and b?
k
StartFlowRun
takes a dictionary of parameters and then you need to have them in your flow to accept them
b
Thanks! One big question: what is the equivalent of the kv store for server backend? I have just encountered the client error. I am thinking this is what I need?
k
Oh itโ€™s a Cloud only feature. If you are on server, any store will do. Redis might work if you have one, but I think any database or even a file in S3 will work