SULEMAN KHAN
05/26/2022, 12:10 PMScheduled Flow fails because task is unable to return value when Prefect Server runs the flow with CloudFlowRunner . Same Flow runs successfully when I use flow.run() . I have attached the code, Prefect UI screenshot, and terminal output of flow.run().
if-else block at lineno 920 in task_runner.py is causing issue. If I return a value from the function/task it runs the if-block otherwise else block. line self.result.write(value, **formatting_kwargs) is calling the function with format self. and dictionary formatting_kwargs also contains the self variable, this self variable contains the object of Demo class.SULEMAN KHAN
05/26/2022, 12:23 PMfrom prefect import task, Flow
from datetime import timedelta, datetime
from prefect.schedules import IntervalSchedule
class Demo:
def demo1(self, a: int, b: int) -> int:
value = a+ b
def demo2(self, a: int, b: int) -> int:
return a+b
obj = Demo()
demo1_function = task(Demo.demo1)
demo2_function = task(Demo.demo2)
with Flow("demo-flow") as flow:
params = {"a": 1, "b": 2}
demo1_function(obj, **params)
demo2_function(obj, **params)
start_time = datetime.now() - timedelta(hours=5)
interval_time = timedelta(hours=0, minutes=1, seconds=0)
end_time = datetime.strptime("2022-12-31 23:59:00", "%Y-%m-%d %H:%M:%S")
schedule = IntervalSchedule(
start_date = start_time,
interval=interval_time,
end_date=end_time
)
flow.run()
flow.schedule = schedule
flow_id = flow.register(project_name="test",
labels=['Suleman'],
idempotency_key=flow.serialized_hash())
this issue resolves when I rename variable self .Kevin Kho
SULEMAN KHAN
05/26/2022, 4:10 PMcheckpoint=False is helpful. I have also removed the code from the main thread.Kevin Kho