Morning all! We’re seeing an issue with mapped tas...
# prefect-community
d
Morning all! We’re seeing an issue with mapped tasks on Fargate, where we only get 2 mapped tasks running at a time. Doesn’t seem to matter what the tasks are, Prefect seems to cap the max concurrent tasks at 2. Any idea? We’re using Core 0.12.0, running on Fargate with DockerStorage and DaskExecutor And just uncovered another issue. While trying to boil down a sample flow to reproduce, I’m now hitting this problem on every run:
Unexpected error: KilledWorker('run_task-a6bb0ba3-e74f-4e26-8cc4-2bcc8f8c217c', <Worker '<tcp://127.0.0.1:41917>', name: 1, memory: 0, processing: 1>)
Doesn’t seem to matter what the flow is, getting this on all flows, even after restarting Prefect Server and the Fargate Agent..
a
I just encountered
KilledWorker
exceptions yesterday, due to a very large series of tasks I was running. Probably not your case b/c you’re saying it’s just a test flow, but what helped me was tweaking the dask distributed config file to better fit my reqs
d
Hey Avi, thanks - propblem is we’re relying on the on-demand dask cluster that the DaskExecutor creates rather than configuring our own, but so far I don’t see the magic words to change the number of workers it can spaws yet
I’m guessing it’s the
adapt_kwargs
arg, I just don’t know what to pass to it
j
@Darragh I believe you can pass
n_workers
to your executor.
Copy code
DaskExecutor(n_workers=4)
The interface to work with environments and executors was improved in
0.12.1
so you can now instantiate the executor object at build time https://docs.prefect.io/orchestration/execution/local_environment.html#examples
d
Thanks @josh testing that now - will involved upgrading my base_image and local install to 0.12.1 😂
New problem. Building my flow after switching to 0.12.1 now gives me this: ModuleNotFoundError: No module named ‘prefect.engine.serializers’ Local Prefect installation: 0.12.1 base_image:
prefecthq/prefect:0.12.1-python3.6
Copy code
flow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=partition_count))
j
I believe @Tyler Wanner had encountered this last week. It has to do with versions being out of sync somewhere where you’re building your flow on a version of prefect prior to
0.12.1
and then putting it into an image that has
0.12.1
or vice versa
t
hmm yes but i was using an old version of “latest” that needed to be repulled
d
That’s what I assumed, I’m just not seeing where…
I’m building a custom image to pass as base_image to the Docker Storage that literally only has this in it:
Copy code
FROM prefecthq/prefect:0.12.1-python3.6

COPY scripts /usr/local/scripts
And then I’ve done a pip upgrade of my local version to 0.12.1. I shouldn’t have to upgrade the server side as well yet, right? More accurately, I know that I will but my assumption is that I don’t need to get to that yet just to have my Flow build locally
j
Correct, you don’t need to upgrade the server to build the flow locally. Try one thing, on your Docker storage set
prefect_version="0.12.1"
and see if that fixes it
t
that all sounds right to me
d
Same problem with the
prefect_version="0.12.1"
a
are you registering the flow on one machine and executing on another? The machine that registers the flow should also be the same version
t
i’m in the middle of something atm but i can try to replicate the problem this morning
i don’t know about ur pipenv/ conda lifestyle decisions but it seems like something isn’t actually on 0.12.1. I will personally test that base image though
j
Tested ^ @Tyler Wanner it has 0.12.1 in it 🙂
t
🤔
d
Building on local: [MacOS, python 3.6.10, prefect 0.12.1 for both local install and the base_image] Registering against 0.12.0 on AWS, but far as I understand it I haven’t even hit that as a possible issue yet, it’s still trying to serialize. Wondering are the local imports into the Flow on the wrong API version…
🤔 1
Upgrading my server on EC2 now to see if that affects it
@Tyler Wanner @josh Victory at last - found a stale docker image that was being pulled in as an intermediate stage, hadn’t been updated. Cleaned them all out, building and registering now 👍 BUT WAIT!! A new challenger appears!! 😄
Missing required parameter in containerDefinitions[0].repositoryCredentials: "credentialsParameter"
I don’t have any credentials configured in the containerDefintion section in my Fargate agent, and from looking at the related PR from a few days ago the text suggests that this is an ‘optional’ configuration ??
👏 2
j
d
On the ball as always @josh!! Any idea how I can workaround till it’s merged?
j
It’s merged into master now if you want to use it 🙂
🚀 1
d
Great, I’ll have a dig! What’s the normal timescale for getting something out into a release? You guys were pretty quick going from 0.12.0 to 0.12.1
j
We generally tend to get a release out weekly, some end up being more dense than others. I expect 0.12.2 to go out sometime in the next day or two since it’s mostly small improvements
d
Great! And if I’m using it locally, I need the following: • Keep the latest prefecthq/prefect image and then in docker storage do
prefect_version=master
• Locally/Server
pip3 install prefect@master
???
j
Yeah something like
Copy code
pip install git+<https://github.com/PrefectHQ/prefect.git@master>
d
Great, thanks Josh!
@josh A new error rears it’s head Unexpected error: ValueError(‘Could not infer an active Flow context.’,) Traceback (most recent call last): File “/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py”, line 48, in inner new_state = method(self, state, *args, **kwargs) File “/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py”, line 409, in get_flow_run_state with executor.start(): File “/usr/local/lib/python3.6/contextlib.py”, line 81, in enter return next(self.gen) File “/usr/local/lib/python3.6/site-packages/prefect/engine/executors/dask.py”, line 212, in start with self.cluster_class(**self.cluster_kwargs) as cluster: # type: ignore File “/usr/local/lib/python3.6/site-packages/distributed/deploy/local.py”, line 168, in init threads_per_worker = max(1, int(math.ceil(CPU_COUNT / n_workers))) File “/usr/local/lib/python3.6/site-packages/prefect/core/task.py”, line 989, in rtruediv return prefect.tasks.core.operators.Div().bind(other, self) File “/usr/local/lib/python3.6/site-packages/prefect/core/task.py”, line 520, in bind raise ValueError(“Could not infer an active Flow context.“) ValueError: Could not infer an active Flow context.
j
What’s your flow code look like? It appears that you’re attempting to bind something to your flow without specifying the flow that it belongs with https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows
d
Found the issue - was importing a task from our custom library that wasn’t being used anymore, but wasn’t embedded directly in the Flow itself. After all that mucking around I finally have a full list of Dask Execution mapped tasks 😂 Thanks a lot, yet again, for all the help guys!
One last question - is there an upper limit to the number of workers I can pass through to n_workers?
j
I believe dask can handle as many workers as you want but it comes down to the amount of CPU and memory available to run each of those workers
d
Bewdiful. Thanks Josh!
t
@Darragh this stackoverflow answer pretty much sums up my assumptions about dask workers https://stackoverflow.com/a/49407253/9424837
👏 1
d
Thanks Tyler!