SULEMAN KHAN
05/26/2022, 12:10 PMScheduled Flow
fails because task is unable to return value when Prefect Server
runs the flow with CloudFlowRunner
. Same Flow runs successfully when I use flow.run()
. I have attached the code, Prefect UI screenshot, and terminal output of flow.run().
if-else block at lineno 920 in task_runner.py
is causing issue. If I return a value from the function/task
it runs the if-block otherwise else block. line self.result.write(value, **formatting_kwargs)
is calling the function with format self.
and dictionary formatting_kwargs
also contains the self
variable, this self variable contains the object of Demo
class.from prefect import task, Flow
from datetime import timedelta, datetime
from prefect.schedules import IntervalSchedule
class Demo:
def demo1(self, a: int, b: int) -> int:
value = a+ b
def demo2(self, a: int, b: int) -> int:
return a+b
obj = Demo()
demo1_function = task(Demo.demo1)
demo2_function = task(Demo.demo2)
with Flow("demo-flow") as flow:
params = {"a": 1, "b": 2}
demo1_function(obj, **params)
demo2_function(obj, **params)
start_time = datetime.now() - timedelta(hours=5)
interval_time = timedelta(hours=0, minutes=1, seconds=0)
end_time = datetime.strptime("2022-12-31 23:59:00", "%Y-%m-%d %H:%M:%S")
schedule = IntervalSchedule(
start_date = start_time,
interval=interval_time,
end_date=end_time
)
flow.run()
flow.schedule = schedule
flow_id = flow.register(project_name="test",
labels=['Suleman'],
idempotency_key=flow.serialized_hash())
this issue resolves when I rename variable self
.Kevin Kho
SULEMAN KHAN
05/26/2022, 4:10 PMcheckpoint=False
is helpful. I have also removed the code from the main thread.Kevin Kho