Laura Lorenz (she/her)
04/20/2020, 1:42 PMScott Zelenka
04/20/2020, 6:07 PMLaura Lorenz (she/her)
04/20/2020, 11:32 PMLaura Lorenz (she/her)
04/23/2020, 8:10 PMcontrib
subdirectories into prefect last week to make it easier to get code into core and I am interested in everyone’s opinions 🙂Joe Schmid
04/24/2020, 6:16 PMCloudFlowRunner
):
def execute( # type: ignore
self, storage: "Storage", flow_location: str, **kwargs: Any # type: ignore
) -> None:
flow_run_id = prefect.context.get("flow_run_id")
try:
flow_run_info = self.client.get_flow_run_info(flow_run_id)
except Exception as exc:
self.logger.debug(
"Failed to retrieve flow state with error: {}".format(repr(exc))
)
if state is None:
state = Failed(
message="Could not retrieve state from Prefect Cloud", result=exc
)
raise ENDRUN(state=state)
updated_parameters = flow_run_info.parameters or {} # type: ignore
Is there a better way to do this than fetching the flow run info?Joe Schmid
04/24/2020, 6:20 PMLaura Lorenz (she/her)
04/24/2020, 8:39 PMsk4la
04/28/2020, 3:27 PMLaura Lorenz (she/her)
04/28/2020, 6:51 PMLaura Lorenz (she/her)
04/30/2020, 6:25 PMJoe Schmid
04/30/2020, 8:45 PM[2020-04-30 20:35:05,397] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-04-30 20:35:05,397] DEBUG - agent | Updating states for flow run f7912fd5-8ce3-4048-8cfe-965a15d46845
[2020-04-30 20:35:05,400] DEBUG - agent | Flow run f7912fd5-8ce3-4048-8cfe-965a15d46845 is in a Scheduled state, updating to Submitted
[2020-04-30 20:35:05,505] INFO - agent | Deploying flow run f7912fd5-8ce3-4048-8cfe-965a15d46845
[2020-04-30 20:35:05,519] DEBUG - agent | Submitted flow run f7912fd5-8ce3-4048-8cfe-965a15d46845 to process PID 46552
[2020-04-30 20:35:05,608] DEBUG - agent | Completed flow run submission (id: f7912fd5-8ce3-4048-8cfe-965a15d46845)
[2020-04-30 20:36:22,277] INFO - agent | Process PID 46552 returned non-zero exit code
Is there a good way to (a) have the flow runner in process or (b) see log output from the subprocess?Troy Köhler
05/01/2020, 6:07 PMLaura Lorenz (she/her)
05/01/2020, 9:29 PMLaura Lorenz (she/her)
05/08/2020, 4:47 PMAlex Cano
05/11/2020, 2:17 PMSubmitted
by the Agent. After this, from what I can tell each flow handles the transition from Submitted
to Running
by means of the CloudFlowRunner
, so each CloudFlowRunner
is calling the set_flow_run_states
with its own flow run, right? So maybe the conditions below don’t matter?
First is since the server is written using async, if we put the concurrency check in the api.states.set_flow_run_state
call, and an API call comes in with N flow runs that are trying to transition into a Running
state and there’s M concurrency slots where M < N, how can we guarantee only M flows will transition into the Running
state?
Second, when we’re creating the “Run Queue” in the api.runs.get_runs_in_queue
, we’re specifically returning flow runs in the order of first scheduled. Related to the above, if we’re submitting flow runs to a Running
state, are we guaranteeing that state changes in the flow runs submitted in the set_flow_runs_states
mutation are occurring in order? If there are 3 flow runs and only one concurrency slot, are we just guaranteeing that one of the flow runs in the payload will succeed? Or are we guaranteeing that the first flow run in the payload will succeed?Alex Cano
05/11/2020, 2:18 PMAlex Cano
05/11/2020, 2:19 PMLaura Lorenz (she/her)
05/13/2020, 6:36 PMLaura Lorenz (she/her)
05/13/2020, 6:38 PMhttps://youtu.be/YHqfJwFvTFY▾
itay livni
05/22/2020, 2:53 PMLaura Lorenz (she/her)
05/22/2020, 2:53 PMLaura Lorenz (she/her)
05/22/2020, 10:05 PMNate Atkins
05/22/2020, 10:09 PMAlex Cano
05/26/2020, 8:18 PMprefect.client.client.Client
only work for Cloud?itay livni
05/29/2020, 2:11 PMLaura Lorenz (she/her)
05/29/2020, 9:36 PMitay livni
06/04/2020, 5:06 PMFlow
has one method, update
. My concern to continue this approach is twofold: (1) Adding more code to a core piece of prefect and thus adding complexity -- That are not really essential to Flow
(2) The idea of doing direct operations on a Flow
can lead to undesirable effects (???). On the flip side is it's convenient: node_resources_fl.disjoint(fl)
flow_lst = [get_secondary_sources_fl, get_primary_sources_fl, choose_defintions_fl]
# Test disjoint
node_resources_fl = Flow("node_resources")
for fl in flow_lst:
node_resources_fl.disjoint(fl)
node_resources_fl.visualize()
The other way would be something along the lines of a module in core.utilities
. And would be structured like networkx operators.
from prefect.utilities.flow_operators import disjoint_union
new_flow = disjoint_union(get_primary_sources_fl, get_secondary_sources_fl)
itay livni
06/04/2020, 5:11 PMScott Zelenka
06/04/2020, 5:14 PMAlex Cano
06/04/2020, 11:22 PMFlow
state lifecycle, is there any specific reason the Client.set_flow_run_state(flow_run_id, state)
call doesn’t return the new state? Or is it just that, unlike the task version, the flow state lifecycle is much more streamlined and it just hasn’t been needed up until now?Alex Cano
06/04/2020, 11:22 PMFlow
state lifecycle, is there any specific reason the Client.set_flow_run_state(flow_run_id, state)
call doesn’t return the new state? Or is it just that, unlike the task version, the flow state lifecycle is much more streamlined and it just hasn’t been needed up until now?Jeremiah
06/04/2020, 11:24 PMAlex Cano
06/04/2020, 11:25 PMQueued
as a state, so wanted to make sure I understood why it was the way it was before starting to hit it with a wrench 🙂Jeremiah
06/04/2020, 11:26 PM