Hello everyone, I was wondering if it's necessary...
# ask-community
j
Hello everyone, I was wondering if it's necessary to assign the return value of a task to a variable within a Flow's with-block. I have a flow like this:
Copy code
with Flow('my-flow') as f:
  things = get_things()
  process_thing.map(things)
...where each
process_thing
returns a value that we don't really need. When Prefect runs this flow, the second task shows that all the individual elements of
things
have been mapped, yet the flow run remains in the "Running" state. I'm using Prefect 0.14.22.
k
Hey @Jeremy Phelps, I don’t think returning that will affect the flow remaining in a “Running” state (and to answer that, I don’t think you have to return it). Are you using LocalDaskExecutor or DaskExecutor? This flow hanging in running I think is common with LocalDaskExecutor.
j
It's DaskExecutor.
Using a static cluster.
k
Just confirming that’s the last task of the flow and I assume Dask dashboard doesn’t show anything off like high memory?
j
It is the last task of the flow, and although I haven't checked the Dask dashboard (because it's so heavily firewalled that it's difficult to connect to it), Prefect reports that all the tasks are complete, which should imply that Dask isn't doing anything relating to this flow run.
Just checked Dask. The highest memory usage on any worker is reported as 23%.
k
So the task completion could be an individual worker hitting the API and marking the task success while the mapped task still might end with a
gather
step that collects the results in memory so I think it’s this
gather
step that might be hanging. Is the
process_thing
returning a large thing?
j
Could you explain how
gather
works?
And why would it hang?
I have another flow where the mapped results are used. The downstream tasks have completed, implying that
gather
must have succeeded if its job is to get the results so they can be passed downstream.
Flow runs in this flow are also stuck in the "running" state.
k
This is my understanding (may be wrong), but I think the way the Prefect code works is it sends the
task
as a
future
to Dask with
client.submit()
. When you have a mapped task, it uses
client.submit()
on each one, and then collects them back with
client.gather()
to bring them back to the client. So the scheduler orchestrates this operation and could run out of memory if it has to collect a lot of big results.
I see what you mean. In the flow where the mapped results are used, is the terminal task a mapped task?
j
No.
And its state is
Succeeded
.
k
Ah gotcha. Thanks for the info. Will need to check with the team for ideas.
What is your Dask version btw? I think 2021.06 has stuff around memory management that might help (big maybe).
j
I'm using 2021.06.0, which matches the version used in the
prefecthq/prefect:0.14.22-python3.7
Docker image.
👍 1
It could be a problem in the GraphQL API. I have a few Kubernetes pods that have failed with this error:
Copy code
Flow run 90259fef-86ba-4ddd-80be-ee636da5ef16 not found
Traceback (most recent call last):
  File "/usr/local/bin/prefect", line 8, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/cli/execute.py", line 54, in flow_run
    raise ValueError("Flow run {} not found".format(flow_run_id))
ValueError: Flow run 90259fef-86ba-4ddd-80be-ee636da5ef16 not found
I've debugged this error before. The Prefect client assumes that any GraphQL error can only be caused by the flow run not existing. The real error that the GraphQL server sent is thrown away.
Normally, I've noticed that the Kubernetes agent promptly deletes failed pods, but there are 7 pods that failed over 24 hours ago that have not been deleted.
k
This is Prefect Cloud right? (not Server?)
j
Right.
This error from the agent outside of Kubernetes shows up in syslog:
Copy code
Sep 27 00:01:37 prefect-agent agent-script.sh[87937]: [2021-09-27 00:01:37,967] ERROR - agent | Error while managing existing k8s jobs
Sep 27 00:01:37 prefect-agent agent-script.sh[87937]: Traceback (most recent call last):
Sep 27 00:01:37 prefect-agent agent-script.sh[87937]:   File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/agent/kubernetes/agent.py", line 384, in heartbeat
Sep 27 00:01:37 prefect-agent agent-script.sh[87937]:     self.manage_jobs()
Sep 27 00:01:37 prefect-agent agent-script.sh[87937]:   File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/agent/kubernetes/agent.py", line 333, in manage_jobs
Sep 27 00:01:37 prefect-agent agent-script.sh[87937]:     flow_run_id
Sep 27 00:01:37 prefect-agent agent-script.sh[87937]:   File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/client/client.py", line 1262, in get_flow_run_state
Sep 27 00:01:37 prefect-agent agent-script.sh[87937]:     return prefect.engine.state.State.deserialize(flow_run.serialized_state)
Sep 27 00:01:37 prefect-agent agent-script.sh[87937]: AttributeError: 'NoneType' object has no attribute 'serialized_state'
k
Does this happen for all Flows with DaskExecutor? I’ve seen this scenario before where they are on Server but the Dask Workers are configured to hit Cloud. Do you think it could be the other way around where you are on Cloud but the Dask Cluster is configured to hit Server?
j
No, the Dask cluster is definitely hitting Cloud.
The problem is intermittent. I had one flow run last night that was correctly marked as complete.
k
Ah ok. Will check if the team has ideas
Can you DM me the flow run id of the run having issues today?
j
In a previous conversation, you claimed you had no ability to use flow run IDs.
Copy code
unbiased-flamingo        demand-forecasting-delivery-scheduler        Running    2 days ago    2021-09-27 05:00:13  a88c2062-6594-4976-8bbe-8112aac6610d
amber-tody               demand-forecasting-delivery-scheduler        Running    3 days ago    2021-09-27 05:00:14  f9ca5bdd-6e4e-4fa8-aa09-6b76f0a0d35d
unselfish-bug            demand-forecasting-delivery-scheduler        Running    3 days ago    2021-09-26 05:00:11  7ec91aeb-3f35-4c2f-8b99-feeed9ce7469
festive-mantis           demand-forecasting-delivery-scheduler        Running    3 days ago    2021-09-27 05:00:13  38b59b3f-6ee6-4160-a5cf-2ad69bf31042
impossible-giraffe       demand-forecasting-delivery-scheduler        Running    3 days ago    2021-09-26 05:00:11  49556f7c-5e84-4dfe-9d7b-9fdc823c9c9a
impressive-walrus        demand-forecasting-delivery-scheduler        Running    4 days ago    2021-09-25 05:00:10  4773b20c-a1c3-4f28-b94a-fbf2d32da504
sassy-cicada             demand-forecasting-delivery-scheduler        Running    4 days ago    2021-09-26 05:00:11  f798a5d0-64fc-422e-9ae8-0ea6978bbbea
curly-muskrat            one_market_item_ingestion                    Running    5 days ago    2021-09-27 07:00:16  16ae6995-f029-4a32-994a-bedcb51406d6
k
I remember what you are talking about. We have limited views without going to the database, and for that use case the dashboards we have didn’t cover it. This is just a general check that we can load basic info and see logs.
z
Hi Jeremy, looking at the logs for flow run
a88c2062-6594-4976-8bbe-8112aac6610d
, our system is showing several tasks still in a Pending state. For example, I see a run of a task named
insert_django_object_groups
is still in a Pending state. Are you able to see any tasks still in a Pending state in the UI?
j
That particular flow run shows tasks in different states.
16ae6995-f029-4a32-994a-bedcb51406d6
is an example of a flow run where all the tasks show up as complete.
Looks like somebody manually marked that one as complete, though.
I can't find another example of a task in that state right now. However,
ded26dfd-19cf-46df-a24c-f23f735f5805
has a failed task, yet is still running, even the flow is configured to fail if any task fails. It shouldn't waste resources attempting to run remaining tasks to completion.
a0c8f568-2986-4ee3-8d31-4e1e7048e63f
is a new example of a flow run where all the tasks have completed, yet Prefect still marks the task as "Running". There is no task_run with a state other than
Success
or
Mapped
.
k
Zach checked this and we see what you see. Will ask the team for ideas.
j
What is responsible for marking a flow run as "complete" after the tasks run? Is that the Agent, or the Prefect API server?
k
The flow itself is responsible for reporting its final state to the API, which is what allows state handlers to define the final state.
For
ded26dfd-19cf-46df-a24c-f23f735f5805
, a Flow with a failed task can continue to run if there are still tasks that don’t depend on the failed tasks. Are other tasks that depend on the failed task continuing to execute? For
a0c8f568-2986-4ee3-8d31-4e1e7048e63f
, the best idea right now is that the FlowRunner might be dying during your flow run causing the Flow to hang. You mentioned that having a downstream task continues. Curious if having an empty “reduce” step makes this flow succeed? If it doesn’t, that would indicate the FlowRunner is dying.
j
For 
ded26dfd-19cf-46df-a24c-f23f735f5805
, a Flow with a failed task can continue to run if there are still tasks that don’t depend on the failed tasks.
So I've noticed. But this behavior doesn't make any sense since we'll ultimately have to regard the entire flow as having failed. All that is accomplished by completing the remaining tasks is to waste resources.
For 
a0c8f568-2986-4ee3-8d31-4e1e7048e63f
 , the best idea right now is that the FlowRunner might be dying during your flow run causing the Flow to hang.
What is a FlowRunner and how do I stop it from dying?
k
You can explicitly set those tasks as downstream of the task that determines whether or not they run. This will propagate the
FAILED
state. If a task can be independently executed from other tasks, it doesn’t make sense to just fail them when they are independent. Either way, you do have the tools to control the behavior by making the dependency explicit. If the flow is still showing a
SUCCESS
state even if that a task fails, that means that task is not a reference task and you can set it with
flow.reference_tasks = List[Tasks]
The FlowRunner is the Python process that submits the tasks for execution. It looks to be getting stuck somewhere, it can die as with any Python process (memory or cpu shortage).
j
Where does the FlowRunner run? Is it part of the agent?
If I'm using the KubernetesAgent, does the FlowRunner run inside of Kubernetes or outside of it (on the agent server)?
k
The agent deploys the FlowRunner. So if it’s a LocalAgent it’s a separate process. If it’s a KubernetesAgent, it would run on the pod spun up.
j
I've reported errors from the pod before.
This one happens pretty frequently:
Copy code
Flow run 90259fef-86ba-4ddd-80be-ee636da5ef16 not found
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/cli/execute.py", line 54, in flow_run
        raise ValueError("Flow run {} not found".format(flow_run_id))
    ValueError: Flow run 90259fef-86ba-4ddd-80be-ee636da5ef16 not found
k
Ah sorry. I thought that was a separate issue. If that flow run id different from the flow run, or does it match?
j
That error gets reported for all kinds of flow run IDs. I haven't logged into the agent server to track down the logs for this one.
The error above does not appear in the logs on the day that
a0c8f568-2986-4ee3-8d31-4e1e7048e63f
ran, but this one does:
Copy code
Oct  1 05:21:11 prefect-agent agent-script.sh[87937]: Traceback (most recent call last):
Oct  1 05:21:11 prefect-agent agent-script.sh[87937]:   File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/agent/kubernetes/agent.py", line 384, in heartbeat
Oct  1 05:21:11 prefect-agent agent-script.sh[87937]:     self.manage_jobs()
Oct  1 05:21:11 prefect-agent agent-script.sh[87937]:   File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/agent/kubernetes/agent.py", line 333, in manage_jobs
Oct  1 05:21:11 prefect-agent agent-script.sh[87937]:     flow_run_id
Oct  1 05:21:11 prefect-agent agent-script.sh[87937]:   File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/client/client.py", line 1262, in get_flow_run_state
Oct  1 05:21:11 prefect-agent agent-script.sh[87937]:     return prefect.engine.state.State.deserialize(flow_run.serialized_state)
Oct  1 05:21:11 prefect-agent agent-script.sh[87937]: AttributeError: 'NoneType' object has no attribute 'serialized_state'
Oh, right, I would need to look in Kubernetes for the other error.
Unfortunately, it's hard to tell which Kubernetes pod is handling a given flow run.
There are 50 pods even though there should be no jobs running right now.
k
Gotcha. Will need to ask the team about this and will look around.
j
Also, I notice the agent doesn't always clean up its pods. Sometimes it does, but other times it doesn't. I wonder if I should write a new agent from scratch that doesn't have all these problems.
k
About the pods not clearing up, that’s a problem that we’re aware of. This happens also when Flows are cancelled (flow state handlers to clean up hardware aren’t triggered). It’s on the roadmap (though no timeline) as it will take designing some new component potentially like agent hooks.
I have a meeting later with some engineers and am bringing this up. Will circle back later today
Went over these issues with the team. There are 3 separate things here. 1. The
serialized_state
is a Cloud bug and needs to be fixed on our end. 2. The flow stuck in a Running State is likely having problems collecting results after a mapping executes. 3. The flow run not found, we could not find the flow run id. This may be because it was past the retention period. Not sure what would trigger that. Is the flow being registered or deleted mid run?
j
The "flow run not found" error has nothing to do with the flow run being there or not. Here's your code that produces that message:
Copy code
@execute.command(hidden=True)
    def flow_run():
        """
        Execute a flow run in the context of a backend API.
        """
        flow_run_id = prefect.context.get("flow_run_id")
        if not flow_run_id:
            click.echo("Not currently executing a flow within a Cloud context.")
            raise Exception("Not currently executing a flow within a Cloud context.")
    
        query = {
            "query": {
                with_args("flow_run", {"where": {"id": {"_eq": flow_run_id}}}): {
                    "flow": {"name": True, "storage": True, "run_config": True},
                    "version": True,
                }
            }
        }
    
        client = Client()
        result = client.graphql(query)
        flow_run = result.data.flow_run
    
        if not flow_run:
            click.echo("Flow run {} not found".format(flow_run_id))
            raise ValueError("Flow run {} not found".format(flow_run_id))
Notice how it ignores any GraphQL error and just reports "not found" no matter what really happened. "Flow run not found" errors are intermittent.
What are you going to do about the problem collecting results after a mapping?
k
But we checked for that flow run id and it wasn’t in our database. That snippet you posted,
client.graphl
does raise errors if there are other errors. That error message looks right. There is nothing to do on our end about the collecting of results after a mapping. This is just distributed compute. Your driver node needs to be able to have enough resource to pull off the collection of the mapped tasks in memory, otherwise the process can die.
j
What's a driver node, and how does the collection happen?
I doubt it's running out of memory. The only thing being returned is a few hundred paths.
The real data is stored in GCS with explicitly-written code to avoid having Dask and Prefect (mis)handle it.
But without knowing where and how data collection happens, there's nothing I can check for problems.
I see that the flow run ID I gave you for the "flow run not found" error really isn't in the system anymore. What is the retention period? I found that error message on 9/30, at which time it might have been a few days old.
k
The driver node in distributed compute is the one that orchestrates the work. The driver can cumulatively run out of memory from all of the previous tasks. There are ways to tackle unmanaged memory in

Dask

. Maybe an environment variable in the video can help you. The collection is literally just how Dask operates. It sends work to workers and collects the results (with our without Prefect)
The retention period is 2 weeks for standard tier so it should still be in there
j
My code doesn't call Dask. Your code does.
k
Then don’t use the DaskExecutor?
j
I do use DaskExecutor, but that's a Prefect class. I don't know anything about how DaskExecutor is implemented.
k
I’m not sure what the complaint here? If you’re not happy with the Prefect Dask interface, you can use it as a ResourceManager .
And I’m saying results are collected even if you use Dask itself directly. That’s just what Dask does
j
The video suggests calling a particular method on the Dask client object. That would require a modification to DaskExecutor.
k
No. You just didn’t watch all the way through. There is an enviroment variable you can get to lower unmanaged memory
j
Actually, it looks like he might be running some code in the cluster. I didn't understand his code because Dask is an internal Prefect dependency.
Is there a way to get the Dask client object from the Prefect context?
k
Kind of like this . It’s not through the context. Also, the cluster can be used as a resource manager
j
According to documentation,
MALLOC_TRIM_THRESHOLD_
defaults to
65536
, and if that value is passed directly to
malloc_trim(3)
, then that's in bytes.
k
You can try setting it to 0 or a low number to aggressively trim memory
j
It shouldn't make a difference. The Dask workers are using over 1 GB each.
k
What is the side of the pod with the Dask scheduler?
j
The Dask scheduler is on a dedicated machine.
k
This isn’t about the workers, this is about getting data from the workers
j
I just took the infrastructure down, so I can't check.
k
Oh then can you try bumping the resources up a bit and seeing if the flow completes?
j
I can, but I doubt it will help.
Your video was about optimizing memory usage on the workers.
k
You can also set the env variable on the scheduler to trim unmanaged memory there
j
There shouldn't be any unmanaged memory on the scheduler. Keras runs on the workers.
There shouldn't be anything but pure Python running on the scheduler.
k
You might be right on that but pure Python does leave objects in memory also.
j
It shouldn't cause memory starvation, since the scheduler runs in one process. That memory is free on the heap. malloc_trim is for releasing memory to the OS for other processes.
k
If you really think I’m not helping, I can stop trying cuz we don’t seem to be going anywhere here
I can’t help if you don’t wanna try my suggestions
j
I'm going to try bumping the memory for all Dask nodes, including the scheduler. If that doesn't work, the only place left to go is writing a new Executor that doesn't rely on Dask.
k
We do have the LocalExecutor and LocalDaskExecutor which doesn’t rely on Dask distributed
j
I need distributed functionality of some kind.