Hi there. I have an Airflow DAG that pulls data fr...
# prefect-community
p
Hi there. I have an Airflow DAG that pulls data from an API. It starts with a sensor that polls the API until the data is ready. I am wondering what would be the best pattern to implement this in 2.0 Instead of having a long running process that polls, I was thinking I could have a flow that checks for data and reschedules itself up to N times if the data is not ready. When it finds data, it continues the process and does not reschedule itself. Could someone help with a code sample to do this in 2.0? Can you think of a better approach? Thanks!
k
Just to make sure I understand you correctly, do you want to retry for a specific number of times if your code fails to pull data from an API?
j
You could run flows or tasks with
retries=n1, retry_delay_seconds=n2
when n1 and n2 are integers. That’s the long-running process way. That method seems preferable to dynamically rescheduling a flow.
p
The issue is that there are several different pipelines and the service levels vary a lot. It's not unusual to have to wait several days for the data to be ready. My preference would be to have shorter-lived containers instead of having a bunch of processes running for a long time. How do the flow-level retries work? Is it still a long-running process?
j
You could schedule the flow to run once an hour or so if that meets your needs. The flow would use a task to fetch the data. If the data were not available you could raise an exception in that task. You would need to have your agent running at the time the flow was scheduled to run. We are working on a serverless option over the next few months, which could make it easier to avoid a long-running process.
p
What happens after it succeeds? It sounds like it would keep running every hour. The desired behavior is, that once the flow succeeds, it should stop checking for data until the next batch (month/week).
j
Yep. doing that would check every hour. Gotcha. It’s not super straightforward, but you could write some code to create a flow run that was scheduled for some future time (maybe a day or week) with the Python Client API that you called when your earlier flow completed successfully.
p
got it. How can I set the
state
for a future date? I could not find that in the docs.
a
@Pedro Machado when you say you poll an API, do you mean an event-driven use case such as creating a run when sth happens like when a file arrives in S3?
I'd be curious to hear about the problem you try to solve - forget Airflow sensors or Prefect for a moment
p
Hi Anna. It's an API that provides data for mobile apps. The API provides different metrics. Each metric has a different SLA. For example, monthly usage data is available on the Nth day of the month. Weekly downloads are available on the Wednesday following the end of the prior week. However, it's not unusual for the data to be delayed. In these cases, you get a message saying that the data is not ready for that particular week/month. Then, you have to keep calling the API periodically (for example, every 4 hours) until you get the data. When there are delays, it may take days for the data to be ready. I was wondering if there is an approach that would not require the flow to be running for several days.
a
Thanks so much for explaining the use case. I might be missing some important details but it seems like you can approach it by having a separate flow per metric. Each of these flows can run even hourly and do a lookup querying this API if the data you need is available. If not, you can just exit. If the data exists, you process it. It looks like the sensor pattern may not necessarily be needed if you simply exit the workflow early if the data is not there. Does it make sense?
this means that none of these flows needs to be a long-running process, as long as you do a lookup and finish the flow early if data you need is not available or if the API returns an empty payload (just an example, I don't know your API ofc 😄)
p
Yes. The only thing is I have to stop checking hourly once the flow finds the data. For example, once you find data for June (in July), you have to stop until the Nth day of August. Then you start checking for July data.
a
I wouldn't recommend such a pattern purely from a data engineering perspective, regardless of the tools. You can leverage e.g. Prefect blocks to store stateful information about whether data must be processed or not and query this Block value at runtime, but the ideal use case is that the database to which you store it has some sort of UPDATED timestamp or so that you can leverage to avoid reprocessing the same data again, rather than tackling this on the orchestrator level
👍 1
e.g. create a JSON block with a dictionary saying what is the last timestamp that this data was processed - if it was, don't process again and exit the workflow early https://github.com/anna-geller/prefect-deployment-patterns/blob/main/blocks/creation/json.py
p
I'll give it some more thought. Thanks for the suggestions!
🙌 1
a
this would vastly simplify the process for you and would make troubleshooting much easier - you could look at that value any time to easily find out whether this value was recently processed and if not you could dig deeper why, rather than building complex workflows
100% - take your time, definitely not uncommon challenge and lots of tradeoffs
I think having some stateful info about it such as stored on the block will make troubleshooting this workflow much easier
later on this year we will introduce a really cool feature that will make such SLA-based workflows easy to manage, but it may take a couple of months
p
So do you consider this a valid pattern or are you still leaning towards persisting something in another db and driving the flow from there?
a
I consider the approach driving the flow from the persisted lookup information as the best approach at this time (and generally a really good pattern making your workflow work well independently of which orchestrator/tool you run it from)
the reason is that this information is tied to your data and business logic rather than the state; e.g. if you would want to drive the orchestration in a way that if querying this API returns an error and you want to perform some action based on that failure, this is 100% sth you should tackle with Prefect states, bt the use case you described requires some lookup it seems
definitely think about it a bit, build some PoC for various patterns and choose the approach that works best for you