https://prefect.io logo
#prefect-community
Title
# prefect-community
a

Avi A

07/07/2020, 9:05 AM
Hey community, I’m trying to figure out a way to control the flow with the caches. Suppose I have a flow with a dependency graph like this: A_heavy_mapped_task --> B_heavy_reduce_task --> C_some_other_task Now, suppose I have the results for tasks A & B cached in a
Result
and I just want to run task C. In Luigi, C would ask for B’s output, and only fetch that. However, in prefect, the flow runner will fetch ALL the results of the mapped tasks A, but since we already have B’s result, that’s totally redundant and wastes a lot of time and data exchange. Any idea on how to tackle this issue? i.e. fetch only task B results somehow so that C can run?
👀 1
z

Zachary Hughes

07/07/2020, 12:50 PM
Hi @Avi A! I think you've identified a special case we need to address in our mapping/caching interactions. Your desired behavior definitely sounds like something we'd like to implement. Could I persuade you to open a Github issue so we can track and prioritize this?
TBH I’m not sure there’s something that can be done, given that this goes against the whole way the prefect flow runner works
so I’m looking maybe for a way to overcome this by writing the flow differently. Maybe separating into different flows? I know there’s been some threads on this issue, but I’m not sure that the status with subflows in prefect nowadays.
z

Zachary Hughes

07/07/2020, 1:11 PM
Running subgraphs is also on our radar, so watch this space on that front. In the meantime, decomposing this into several flows could do the trick as well. If you're using an orchestration layer (Server, Cloud), you could use that to check the state of your "upstream" flows.
a

Avi A

07/07/2020, 1:12 PM
yes I’m using a server. I’m not sure how I can do this, can you point me to the relevant part in the docs?
z

Zachary Hughes

07/07/2020, 1:20 PM
I'm not sure we have docs on that, but will put that in our queue. The interactive API in the UI is also a solid source for getting more acquainted with our API. If I'm understanding your use case correctly, in this case you'd separate task A into Flow A and tasks B and C into Flow BC. Then if you wanted to rerun just tasks B and C, you'd rerun Flow BC. If you wanted to manage BC in relation to A, you could put logic in Flow BC that queried server for the state of A or whatever criteria you'd like.
a

Avi A

07/07/2020, 1:25 PM
cool! how do I query the server for the state of A? I’m guessing I need to use the interactive API? can I somehow fetch the whole
state
of Flow A?
z

Zachary Hughes

07/07/2020, 1:35 PM
Here's an example of the sort of query you'd want to run:
Copy code
query {
  flow_run (where: {flow_id: {_eq: "flow-a-id-here"}}){ 
    state
  }
  }
You can play around with this via the interactive API, but if there's something you'd like to query programmatically, you can do so using Core's
client.graphql
method.
a

Avi A

07/07/2020, 1:44 PM
thanks. I’m not familiar with graphql, but I see that there’s a way to use variables. how do I use the
variables
when running a graphql? I mean, what’s the mark for a variable placeholder inside a query?
z

Zachary Hughes

07/07/2020, 2:04 PM
a

Avi A

07/07/2020, 2:10 PM
well I was referring the the
variables
argument in the
client.graphql
method
Copy code
prefect.client.client.Client.graphql(query, raise_on_error=True, headers=None, variables=None, token=None)
anyhow, the query you gave me gives the final state of the task (failed, succeeded, etc..) but I was looking for the
State
object that you get when running locally i.e.
Copy code
state = flow.run()
or, at least for the tasks, a way to find where the task results are located. Currently when I use
get_task_run_info
I’m getting very general stuff. example:
Copy code
>>> client.get_task_run_info("b598dc53-3494-4de4-ae5f-ca48e5b7b636", task_id=task_id)
Out[66]: TaskRunInfoResult(id='56a761f8-0143-47dd-b7d0-f19ae5d2f57a', task_id='cf92e3c2-7868-4910-83aa-826d550b2d89', task_slug='33990e35-d898-43ef-a596-a7242e5a31f3', version=2, state=<Success: "Task run succeeded.">)
z

Zachary Hughes

07/07/2020, 2:19 PM
Gotcha.
get_task_run_info
is essentially a canned query, so you'll want to specify your own fields. In this case, sounds like you'll want something like this:
Copy code
query {
  flow_run (where: {flow_id: {_eq: "your-id-here"}}){ 
    task_runs {
      states {
        result
        serialized_state
      }
    }
  }
  }
a

Avi A

07/07/2020, 2:21 PM
yup, I’m just realizing this by looking at the schemas themselves
I think you meant to write
task_run
inside the query you just wroet
z

Zachary Hughes

07/07/2020, 2:21 PM
As for using variables, GraphQL allows for the use of input objects. If you'd like to use them, you can specify your query like the one linked, then build your input dict, and provide it like so: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/client/client.py#L877
Well, it depends. In the query provided, we're searching for a flow run matching the provided ID and all of the states for its associated task runs. Does that make sense?
a

Avi A

07/07/2020, 2:23 PM
ohh sorry I read it wrong 🙂
z

Zachary Hughes

07/07/2020, 2:24 PM
All good! Want to make sure I'm being clear/helpful here. 🙂
a

Avi A

07/07/2020, 2:24 PM
you are! thanks!!~
m

Marwan Sarieddine

07/07/2020, 4:35 PM
@Avi A @Zachary Hughes - hope you dont mind me jumping in on this thread - but while splitting the flow into two flows sounds like a quick fix for this specific problem, what if the flow was a sequence of many mapped tasks (i.e. instead of A to C) - (A to Z where Z is the other task … ) ? Shouldn’t prefect’s cache be a bit smarter about not having to retrieve all the previous tasks data ?
z

Zachary Hughes

07/07/2020, 4:37 PM
Hi @Marwan Sarieddine, absolutely, and that's why I encouraged Avi to open an issue so we can track and handle this behavior! In the meantime, the goal was to get Avi unblocked. 🙂
m

Marwan Sarieddine

07/07/2020, 4:37 PM
ah ok gotcha ! thanks for the quick reply