r

    Richard Hughes

    7 months ago
    Hi Community - I am using
    croniter
    to calc cron schedules inside of the prefect flow and registering on the cloud instance. When I kick of my flow I want to understand what is the last cron time schedule that was ran. These cron values I have placed them into a dictionary that has these times for a handful of different jobs. Then, I was to use this as a parameters to some code I am running. For Example:
    croniter(SchemaNames[key], datetime.now()).get_prev(datetime).strftime("%m/%d/%Y %H:%M:%S")
    For whatever reason the datetime.now() function is recalling the datetime that might be corresponding to the datetime when the flow was actually registered and not actually the runtime datetime.now(). Any thoughts how to achieve the results I am looking or guidance from this point? Much appreciated. p.s. I would assume I could use the mquery api and extract the last schedule start times for these parameters but, I thought this approach was easier.
    Kevin Kho

    Kevin Kho

    7 months ago
    The stuff in the Flow block is executed immediately unless it’s inside a task so if you do stuff like
    datetime.datetime.now()
    inside the Flow, it will persist the registration time. To defer it to the Flow run, you need to wrap it in a task
    r

    Richard Hughes

    7 months ago
    @Kevin Kho - I am struggling here on this one. I am using the task decorator and using
    result=PrefectResult()
    I am returning a dictionary. I want use the dictionary in the remaining tasks to follow afterwards. Is there a limitation in doing this type of setup? What am I missing?
    Kevin Kho

    Kevin Kho

    7 months ago
    The Result should not matter whether or not you can use it downstream. If you do something like this:
    @task
    def get_dict():
        return {"a": 1, "b": 2}
    
    with Flow(..) as flow:
        x = get_dict()
        print(x.keys())
    this will break because x is of type Task using registration and the datatype Task does not have a
    .keys()
    method. It only materializes during runtime, but the content of the Flow block is not deferred so everything should be tasks. But the following:
    @task
    def get_dict():
        return {"a": 1, "b": 2}
    
    with Flow(..) as flow:
        x = get_dict()
        x["a"]
    might work because I think the Task class overrode the
    ___getitem___
    method to defer execution
    But you should be able to pass the dict to downstream tasks no problem