alex
01/21/2021, 7:20 PMdemo('flow1')
and demo('flow2').
I then changed the signature of DemoClass to take in another param, and reflected that change ie. tc = DemoClass(source_name, "new_param")
and ran demo('flow1')
again. This caused flow2 to start failing due it missing the new param, even though the flow was not directly modified.
I'm wondering what exactly happened here? If I'm using a venv, is that venv's python being used? If I pip install --upgrade
a package with breaking changes, would all my flows immediately start failing? Or if I deployed a flow after the upgrade?
class DemoClass:
def __init__(self, source_name):
self.source_name = source_name
# self.new_param = new_param (added later in signature)
pass
def run(self):
return self.source_name
from myprefect.demos.demo_class import DemoClass
from prefect import Flow, task
from prefect.schedules import IntervalSchedule
@task(log_stdout=True)
def initialize_and_run_class(source_name):
tc = DemoClass(source_name)
r = tc.run()
print(r)
return r
def demo(data_source):
with Flow(
f"flow - {data_source}",
schedule=IntervalSchedule(interval=datetime.timedelta(seconds=60)),
) as f:
tsk = initialize_and_run_class(data_source)
f.register(project_name="demo")
Zanie
alex
01/21/2021, 8:19 PM