Kushagara
01/28/2020, 1:43 PMfrom prefect import Flow, task
from random import randrange
from prefect.schedules import CronSchedule
def test():
x = randrange(1000)
y = randrange(2000)
print(x, y)
return x
def func():
daily_schedule = CronSchedule("*/1 */1 * * *")
with Flow("My test flow", daily_schedule) as test_flow:
data = test()
print(data)
test_flow.run()
func()
The output I am getting is
367 1629
367
[2020-01-28 13:35:34,589] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:36:00+00:00
[2020-01-28 13:36:00,001] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:36:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:37:00+00:00
[2020-01-28 13:37:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:37:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:38:00+00:00
[2020-01-28 13:38:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:38:00,006] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:38:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:38:00,009] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:39:00+00:00
[2020-01-28 13:39:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:39:00,008] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:40:00+00:00
Is this the expected behaviour? Why isn’t the test function called in each run? How can I achieve it?Alex Goodman
01/28/2020, 1:51 PMfrom prefect import Flow, task
from random import randrange
from prefect.schedules import CronSchedule
@task # <-- this wraps your function to make it a prefect.Task
def test():
x = randrange(1000)
y = randrange(2000)
print(x, y)
return x
def func():
daily_schedule = CronSchedule("*/1 */1 * * *")
# All task calls within this Flow context are deferred until later (at Flow.run)
with Flow("My test flow", daily_schedule) as test_flow:
data = test() # <-- Flows only know how to run prefect.Task objects
print(data) # <-- this is **not** the return value from your function, as your function has not been called yet
# your function will be callded periodically here
test_flow.run()
func()
Kushagara
01/28/2020, 1:56 PMAlex Goodman
01/28/2020, 1:57 PMKushagara
01/28/2020, 2:00 PM