Blake List
08/22/2021, 3:26 AM'0000-00-00 00:00:00'
and t2 will be time.now()
. The next time schedule runs, t1 needs to be the previous value of t2, and t2 will once again be time.now()
. I can see that I may need to use a task to compute the parameters within the script of the parent flow (similar to here) and use the StartFlowRun
task (as seen here), but I'm not quite sure how to put it together.
Any help would be appreciated, thank you!!Kevin Kho
Kevin Kho
Blake List
08/22/2021, 3:34 AMKevin Kho
Blake List
08/22/2021, 3:41 AMBlake List
08/23/2021, 1:11 AMimport prefect
from prefect import task, Flow
@task
def flow_a_time_do_something(t1, t2):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Flow A - Time from: {t1}")
<http://logger.info|logger.info>(f"Flow A - Time until: {t2}")
return
def main():
with Flow("flow_a") as flow:
flow_a_time_do_something(t1, t2)
try:
client = Client()
client.create_project(project_name='test_pipeline')
except prefect.utilities.exceptions.ClientError as e:
<http://logger.info|logger.info>("Project already exists")
flow.register(project_name="test_pipeline", idempotency_key=flow.serialized_hash(), labels=["development"], add_default_labels=False)
if __name__ == "__main__":
main()
flow_b.py
import prefect
from prefect import task, Flow
@task
def flow_b_time_do_something(t1, t2):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Flow B - Time from: {t1}")
<http://logger.info|logger.info>(f"Flow B - Time until: {t2}")
return
def main():
with Flow("flow_b") as flow:
flow_b_time_do_something(t1, t2)
try:
client = Client()
client.create_project(project_name='test_pipeline')
except prefect.utilities.exceptions.ClientError as e:
<http://logger.info|logger.info>("Project already exists")
flow.register(project_name="test_pipeline", idempotency_key=flow.serialized_hash(), labels=["development"], add_default_labels=False)
if __name__ == "__main__":
main()
flow_p.py
import time
from datetime import datetime, timedelta
import prefect
from prefect import task, Flow
from prefect.schedules import clocks, Schedule
from prefect.backend import set_key_value, get_key_value
from prefect.tasks.prefect import StartFlowRun
@task(nout=2)
def get_time_params():
t1 = get_key_value(t1_key)
t2 = get_key_value(t2_key)
return t1, t2
@task
def set_time_params(t2):
set_key_value(key=t1_key, value=t2)
set_key_value(key=t2_key, value=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
return
def main():
now = datetime.now()
clock = clocks.IntervalClock(start_date=now + datetime.timedelta(seconds=30),
interval=datetime.timedelta(minutes=1))
schedule = Schedule(clocks=[clock])
t1_key = "0000-00-00 00:00:00"
t2_key = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
flow_a = StartFlowRun(flow_name="flow_a", project_name="test_pipeline", wait=True)
flow_b = StartFlowRun(flow_name="flow_b", project_name="test_pipeline", wait=True)
with Flow("flow_p", schedule=schedule) as flow:
t1, t2 = get_time_params()
flow_a.parameters = {'t1_key': t1, 't2_key': t2}
flow_b.parameters = {'t1_key': t1, 't2_key': t2}
set_time_params(t2=t2)
try:
client = Client()
client.create_project(project_name='test_pipeline')
except prefect.utilities.exceptions.ClientError as e:
<http://logger.info|logger.info>("Project already exists")
flow.register(project_name="test_pipeline", idempotency_key=flow.serialized_hash(), labels=["development"], add_default_labels=False)
if __name__ == "__main__":
main()
I am just wondering what the syntax is for passing the values t1 and t2 into the two flows a and b?Kevin Kho
StartFlowRun
takes a dictionary of parameters and then you need to have them in your flow to accept themBlake List
08/23/2021, 2:43 AMKevin Kho