Hello everyone. I trying to figure out what is con...
# ask-community
t
Hello everyone. I trying to figure out what is considered best practice for differential dataloads and backfills? I am trying to obtain the timestamp of the last successful run of a flow in order to obtain data added or changed from last run. Using the prefect
Client.get_flow_run_info
-function seems to be one way of achieving this, but it may not scale as number of runs for a flow grows large.
k
Hey @Thomas Fredriksen, what do you think of using the KV store to hold the watermark for the flow? Docs
t
Hi @Kevin Kho - this seem to achieve what I want. Thank you. Is this considered best practice though? I feel like a function to get the last time a task or flow was last run should be built-in?
k
In general I think the watermark as a location is a best practice to load deltas (outside of Prefect). I’ve used it before Prefect myself. Before the introduction of the KV Store, users were using third party tools like Firestore or persisting a file in S3. The nice thing about the watermark is it only gets persisted if the ETL actually succeeded. Getting the last time a task or flow was run is possible through the GraphQL API.
t
awesome, thank you!
I did not realize that the KV store is only supported for Prefect Cloud. Are there any solutions to backfill/differential dataload/watermarking for those of us who are hosting our own Prefect servers?
k
Ah ok. You can spin up a redis cache or firestore table. In my experience, I was on Azure and had a folder called
watermarks
attached to the project and each
watermark
file was just a timestamp. Then in your Python logic you
create if not exist
and just update that
watermark
each time the flow runs.
This user did it with Redis and the Redis tasks from the task library I think if you want to explore that
t
Thank you @Kevin Kho, this seems very useful.
👍 1
Forgive my basic questions, but would it be possible to access the a redis client (or similar) through the Prefect context? I guess what I am asking is - can I add custom objects to the context
k
Not quite. The context is not mutable. Like you could, but it won’t carry over from task to task. You may need to pass that client around (which is possible if you don’t use DaskExecutor)
t
thanks 🙂
👍 1
Hi @Kevin Kho - Once again, thank you for your help. We managed to implement watermarking using Redis as you suggested, however we chose to create a form of the
@task
decorator instead, which allows for watermarked inputs:
Copy code
@watermarked_task(watermark_args=["count"])
def say_hello_count(count, store=None):
    logger = prefect.context.get("logger")

    watermark = int(count.watermark or 0)
    <http://logger.info|logger.info>("Hello watermark. watermark={}, count={}".format(watermark, count))
    
    value = count.value
    do_somewith_with_value(value)

    store.set(count.key, watermark + 1)
Here, the
count
and
store
arguments are amended by the decorator.
count
is wrapped into a
Watermark
-type which is just a
namedtuple
to hold the
watermark
,
value
, and
key
. The
store
, set to
None
by default to keep the IDE happy, is set to the watermark-store client. We kind of dislike changing inputs this way, but this works fine for now. On the back end, we have created a watermark-store factory-type which will initialize a watermark storage client based on environment variables (or an optional input to the decorator mentioned above). That way, we can allow local watermark management for developers in the form of in-memory stores or files (also as you suggested), while any deployed flow will use Redis. We had to create a separate prefect agent deployment and disable the one in the helm-chart, as the helm-chart does not allow us to set environment variables in the agent, as well as a custom config-map in order to provide a prefect
job_spec.yaml