Darragh

    Darragh

    2 years ago
    Morning all! We’re seeing an issue with mapped tasks on Fargate, where we only get 2 mapped tasks running at a time. Doesn’t seem to matter what the tasks are, Prefect seems to cap the max concurrent tasks at 2. Any idea? We’re using Core 0.12.0, running on Fargate with DockerStorage and DaskExecutor And just uncovered another issue. While trying to boil down a sample flow to reproduce, I’m now hitting this problem on every run:
    Unexpected error: KilledWorker('run_task-a6bb0ba3-e74f-4e26-8cc4-2bcc8f8c217c', <Worker '<tcp://127.0.0.1:41917>', name: 1, memory: 0, processing: 1>)
    Doesn’t seem to matter what the flow is, getting this on all flows, even after restarting Prefect Server and the Fargate Agent..
    a

    Avi A

    2 years ago
    I just encountered
    KilledWorker
    exceptions yesterday, due to a very large series of tasks I was running. Probably not your case b/c you’re saying it’s just a test flow, but what helped me was tweaking the dask distributed config file to better fit my reqs
    Darragh

    Darragh

    2 years ago
    Hey Avi, thanks - propblem is we’re relying on the on-demand dask cluster that the DaskExecutor creates rather than configuring our own, but so far I don’t see the magic words to change the number of workers it can spaws yet
    I’m guessing it’s the
    adapt_kwargs
    arg, I just don’t know what to pass to it
    j

    josh

    2 years ago
    @Darragh I believe you can pass
    n_workers
    to your executor.
    DaskExecutor(n_workers=4)
    The interface to work with environments and executors was improved in
    0.12.1
    so you can now instantiate the executor object at build time https://docs.prefect.io/orchestration/execution/local_environment.html#examples
    Darragh

    Darragh

    2 years ago
    Thanks @josh testing that now - will involved upgrading my base_image and local install to 0.12.1 😂
    New problem. Building my flow after switching to 0.12.1 now gives me this: ModuleNotFoundError: No module named ‘prefect.engine.serializers’ Local Prefect installation: 0.12.1 base_image:
    prefecthq/prefect:0.12.1-python3.6
    flow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=partition_count))
    j

    josh

    2 years ago
    I believe @Tyler Wanner had encountered this last week. It has to do with versions being out of sync somewhere where you’re building your flow on a version of prefect prior to
    0.12.1
    and then putting it into an image that has
    0.12.1
    or vice versa
    Tyler Wanner

    Tyler Wanner

    2 years ago
    hmm yes but i was using an old version of “latest” that needed to be repulled
    Darragh

    Darragh

    2 years ago
    That’s what I assumed, I’m just not seeing where…
    I’m building a custom image to pass as base_image to the Docker Storage that literally only has this in it:
    FROM prefecthq/prefect:0.12.1-python3.6
    
    COPY scripts /usr/local/scripts
    And then I’ve done a pip upgrade of my local version to 0.12.1. I shouldn’t have to upgrade the server side as well yet, right? More accurately, I know that I will but my assumption is that I don’t need to get to that yet just to have my Flow build locally
    j

    josh

    2 years ago
    Correct, you don’t need to upgrade the server to build the flow locally. Try one thing, on your Docker storage set
    prefect_version="0.12.1"
    and see if that fixes it
    Tyler Wanner

    Tyler Wanner

    2 years ago
    that all sounds right to me
    Darragh

    Darragh

    2 years ago
    Same problem with the
    prefect_version="0.12.1"
    a

    Avi A

    2 years ago
    are you registering the flow on one machine and executing on another? The machine that registers the flow should also be the same version
    Tyler Wanner

    Tyler Wanner

    2 years ago
    i’m in the middle of something atm but i can try to replicate the problem this morning
    i don’t know about ur pipenv/ conda lifestyle decisions but it seems like something isn’t actually on 0.12.1. I will personally test that base image though
    j

    josh

    2 years ago
    Tested ^ @Tyler Wanner it has 0.12.1 in it 🙂
    Tyler Wanner

    Tyler Wanner

    2 years ago
    🤔
    Darragh

    Darragh

    2 years ago
    Building on local: [MacOS, python 3.6.10, prefect 0.12.1 for both local install and the base_image] Registering against 0.12.0 on AWS, but far as I understand it I haven’t even hit that as a possible issue yet, it’s still trying to serialize. Wondering are the local imports into the Flow on the wrong API version…
    Upgrading my server on EC2 now to see if that affects it
    @Tyler Wanner @josh Victory at last - found a stale docker image that was being pulled in as an intermediate stage, hadn’t been updated. Cleaned them all out, building and registering now 👍 BUT WAIT!! A new challenger appears!! 😄
    Missing required parameter in containerDefinitions[0].repositoryCredentials: "credentialsParameter"
    I don’t have any credentials configured in the containerDefintion section in my Fargate agent, and from looking at the related PR from a few days ago the text suggests that this is an ‘optional’ configuration ??
    j

    josh

    2 years ago
    Darragh

    Darragh

    2 years ago
    On the ball as always @josh!! Any idea how I can workaround till it’s merged?
    j

    josh

    2 years ago
    It’s merged into master now if you want to use it 🙂
    Darragh

    Darragh

    2 years ago
    Great, I’ll have a dig! What’s the normal timescale for getting something out into a release? You guys were pretty quick going from 0.12.0 to 0.12.1
    j

    josh

    2 years ago
    We generally tend to get a release out weekly, some end up being more dense than others. I expect 0.12.2 to go out sometime in the next day or two since it’s mostly small improvements
    Darragh

    Darragh

    2 years ago
    Great! And if I’m using it locally, I need the following: • Keep the latest prefecthq/prefect image and then in docker storage do
    prefect_version=master
    • Locally/Server
    pip3 install prefect@master
    ???
    j

    josh

    2 years ago
    Yeah something like
    pip install git+<https://github.com/PrefectHQ/prefect.git@master>
    Darragh

    Darragh

    2 years ago
    Great, thanks Josh!
    @josh A new error rears it’s head Unexpected error: ValueError(‘Could not infer an active Flow context.’,) Traceback (most recent call last): File “/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py”, line 48, in inner new_state = method(self, state, *args, **kwargs) File “/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py”, line 409, in get_flow_run_state with executor.start(): File “/usr/local/lib/python3.6/contextlib.py”, line 81, in enter return next(self.gen) File “/usr/local/lib/python3.6/site-packages/prefect/engine/executors/dask.py”, line 212, in start with self.cluster_class(**self.cluster_kwargs) as cluster: # type: ignore File “/usr/local/lib/python3.6/site-packages/distributed/deploy/local.py”, line 168, in init threads_per_worker = max(1, int(math.ceil(CPU_COUNT / n_workers))) File “/usr/local/lib/python3.6/site-packages/prefect/core/task.py”, line 989, in rtruediv return prefect.tasks.core.operators.Div().bind(other, self) File “/usr/local/lib/python3.6/site-packages/prefect/core/task.py”, line 520, in bind raise ValueError(“Could not infer an active Flow context.“) ValueError: Could not infer an active Flow context.
    j

    josh

    2 years ago
    What’s your flow code look like? It appears that you’re attempting to bind something to your flow without specifying the flow that it belongs with https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows
    Darragh

    Darragh

    2 years ago
    Found the issue - was importing a task from our custom library that wasn’t being used anymore, but wasn’t embedded directly in the Flow itself. After all that mucking around I finally have a full list of Dask Execution mapped tasks 😂 Thanks a lot, yet again, for all the help guys!
    One last question - is there an upper limit to the number of workers I can pass through to n_workers?
    j

    josh

    2 years ago
    I believe dask can handle as many workers as you want but it comes down to the amount of CPU and memory available to run each of those workers
    Darragh

    Darragh

    2 years ago
    Bewdiful. Thanks Josh!
    Tyler Wanner

    Tyler Wanner

    2 years ago
    @Darragh this stackoverflow answer pretty much sums up my assumptions about dask workers https://stackoverflow.com/a/49407253/9424837
    Darragh

    Darragh

    2 years ago
    Thanks Tyler!