Richard Hughes
02/14/2022, 7:15 PMcroniter
to calc cron schedules inside of the prefect flow and registering on the cloud instance. When I kick of my flow I want to understand what is the last cron time schedule that was ran. These cron values I have placed them into a dictionary that has these times for a handful of different jobs. Then, I was to use this as a parameters to some code I am running. For Example:
croniter(SchemaNames[key], datetime.now()).get_prev(datetime).strftime("%m/%d/%Y %H:%M:%S")
For whatever reason the datetime.now() function is recalling the datetime that might be corresponding to the datetime when the flow was actually registered and not actually the runtime datetime.now(). Any thoughts how to achieve the results I am looking or guidance from this point? Much appreciated.
p.s. I would assume I could use the mquery api and extract the last schedule start times for these parameters but, I thought this approach was easier.Kevin Kho
datetime.datetime.now()
inside the Flow, it will persist the registration time. To defer it to the Flow run, you need to wrap it in a taskRichard Hughes
02/14/2022, 10:34 PMresult=PrefectResult()
I am returning a dictionary. I want use the dictionary in the remaining tasks to follow afterwards. Is there a limitation in doing this type of setup?
What am I missing?Kevin Kho
@task
def get_dict():
return {"a": 1, "b": 2}
with Flow(..) as flow:
x = get_dict()
print(x.keys())
this will break because x is of type Task using registration and the datatype Task does not have a .keys()
method. It only materializes during runtime, but the content of the Flow block is not deferred so everything should be tasks.
But the following:
@task
def get_dict():
return {"a": 1, "b": 2}
with Flow(..) as flow:
x = get_dict()
x["a"]
might work because I think the Task class overrode the ___getitem___
method to defer executionKevin Kho