Thomas Fredriksen
09/20/2021, 12:29 PMClient.get_flow_run_info
-function seems to be one way of achieving this, but it may not scale as number of runs for a flow grows large.Kevin Kho
Thomas Fredriksen
09/20/2021, 5:44 PMKevin Kho
Thomas Fredriksen
09/20/2021, 6:26 PMThomas Fredriksen
09/21/2021, 12:39 PMKevin Kho
watermarks
attached to the project and each watermark
file was just a timestamp. Then in your Python logic you create if not exist
and just update that watermark
each time the flow runs.Kevin Kho
Thomas Fredriksen
09/23/2021, 5:35 PMThomas Fredriksen
09/23/2021, 5:37 PMKevin Kho
Thomas Fredriksen
09/23/2021, 5:51 PMThomas Fredriksen
09/28/2021, 7:03 PM@task
decorator instead, which allows for watermarked inputs:
@watermarked_task(watermark_args=["count"])
def say_hello_count(count, store=None):
logger = prefect.context.get("logger")
watermark = int(count.watermark or 0)
<http://logger.info|logger.info>("Hello watermark. watermark={}, count={}".format(watermark, count))
value = count.value
do_somewith_with_value(value)
store.set(count.key, watermark + 1)
Here, the count
and store
arguments are amended by the decorator. count
is wrapped into a Watermark
-type which is just a namedtuple
to hold the watermark
, value
, and key
. The store
, set to None
by default to keep the IDE happy, is set to the watermark-store client. We kind of dislike changing inputs this way, but this works fine for now.
On the back end, we have created a watermark-store factory-type which will initialize a watermark storage client based on environment variables (or an optional input to the decorator mentioned above). That way, we can allow local watermark management for developers in the form of in-memory stores or files (also as you suggested), while any deployed flow will use Redis.
We had to create a separate prefect agent deployment and disable the one in the helm-chart, as the helm-chart does not allow us to set environment variables in the agent, as well as a custom config-map in order to provide a prefect job_spec.yaml