Hi I am relatively new to Prefect and this might b...
# prefect-community
k
Hi I am relatively new to Prefect and this might be a very basic question but I need to understand it. How does a function call within a Flow is invoked, I am trying to test a cron schedule which calls a function but the function is never called except for the first run.
Copy code
from prefect import Flow, task
from random import randrange
from prefect.schedules import CronSchedule


def test():
  x = randrange(1000)
  y = randrange(2000)
  print(x, y)
  return x



def func():
  daily_schedule = CronSchedule("*/1 */1 * * *")
  with Flow("My test flow", daily_schedule) as test_flow:
    data = test()
    print(data)

  test_flow.run()

func()
The output I am getting is 
Copy code
367 1629
367
[2020-01-28 13:35:34,589] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:36:00+00:00
[2020-01-28 13:36:00,001] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:36:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:37:00+00:00
[2020-01-28 13:37:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:37:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:38:00+00:00
[2020-01-28 13:38:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:38:00,006] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:38:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:38:00,009] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:39:00+00:00
[2020-01-28 13:39:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:39:00,008] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:40:00+00:00
Is this the expected behaviour? Why isn’t the test function called in each run? How can I achieve it?
a
Hi @Kushagara! One of the fundamental concepts with Prefect is that Flows only know how to execute Tasks, which should wrap the functions you'd like to run. Take a look at the changes and annotations below:
Copy code
from prefect import Flow, task
from random import randrange
from prefect.schedules import CronSchedule


@task  # <-- this wraps your function to make it a prefect.Task
def test():
    x = randrange(1000)
    y = randrange(2000)
    print(x, y)
    return x


def func():
    daily_schedule = CronSchedule("*/1 */1 * * *")

    # All task calls within this Flow context are deferred until later (at Flow.run)
    with Flow("My test flow", daily_schedule) as test_flow:
        data = test()  # <-- Flows only know how to run prefect.Task objects
        print(data)  # <-- this is **not** the return value from your function, as your function has not been called yet
    
    # your function will be callded periodically here
    test_flow.run()


func()
For more reference, here's the section of our docs that goes over various ways to interact with Prefect Flows and Tasks: https://docs.prefect.io/core/tutorials/task-guide.html#the-anatomy-of-a-prefect-task
k
@Alex Goodman, Thank you, i get it now and I will go through the docs more thoroughly to understand it better.
a
no problem! Shout out if you have more questions, we'd be happy to answer 🙂
k
Sure 🙂