https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • b

    Brett Jurman

    07/29/2021, 9:35 PM
    Trying to test out prefect + coiled with gpus. i have a model training as a task and got "No heartbeat detected from the remote task; marking the run as failed." Is this something that can happen if the worker becomes too bogged down resource wise?
    k
    7 replies · 2 participants
  • k

    Krapi Shah

    07/30/2021, 5:39 PM
    Hi all, We have multiple teams set up within our prefect account for different teams. Is it possible to share agents between teams?
    k
    7 replies · 2 participants
  • k

    Kurt Rhee

    07/30/2021, 7:34 PM
    Realistically speaking, how many people does it take to run and maintain prefect server? Is it like 0.5 people, 2 people?
    k
    2 replies · 2 participants
  • l

    Lawrence Finn

    07/30/2021, 7:48 PM
    How does the scheduling component of Prefect actually work? I get that the agent asks for work, but how does the API know which flows should be ran? I am guessing it has the next N scheduled times computed, but is that pre computed on some schedule? Or calculated on API request?
    k
    7 replies · 2 participants
  • e

    Eric Mauser

    07/30/2021, 10:44 PM
    Hey everyone, I'm using the ECS agent and want to pass a secret from AWS Secrets Manager into each task. I saw the documentation about providing a task template for each task to build upon. do I have to create a fully defined template for each task? or can I just create the secrets{} part and let prefect auto generate the rest of it with defaults?
    k
    1 reply · 2 participants
  • t

    TOMAS IGNACIO ACUÑA RUZ

    08/02/2021, 2:03 PM
    Hello there, I would like to ask how to shut down all process of my 'prefect server start'. I have found 1. https://github.com/PrefectHQ/prefect/issues/2348 2. https://github.com/PrefectHQ/prefect/issues/3660 But I can't find any documentation associated with prefect server stop Could you show me how to perform a docker-compose down similar behavior. Best reagards
    k
    d
    8 replies · 3 participants
  • s

    Sam Cook

    08/03/2021, 3:48 PM
    Is there a utility, graphql mutation, or canned SQL query available for cleaning up the postgres database? I have a kubernetes deployment and I want to regularly clean up some of the larger tables (logs, task runs) while leaving recent data from the past 30 days intact so users don't suddenly lose all of their recent logs, tasks, etc.
    k
    3 replies · 2 participants
  • l

    Lawrence Finn

    08/03/2021, 5:45 PM
    Is there a way to set the api_token in code rather than relying on env or config file?
    k
    11 replies · 2 participants
  • m

    Matt Klein

    08/03/2021, 7:44 PM
    We have some Prefect flows that sometimes need to run for many hours. For particularly long-running flows, once the flow has been running for more than 12 hours, we’re often seeing that flow fail before it completes -- the Prefect UI shows its last state message as “`Unexpected error: CancelledError()`“. This doesn’t happen as a result of the code we’ve written to launch or monitor flows. It appears to be a result of an action that Prefect (or Dask?) is taking to automatically cancel long-running flows. However, I don’t see anything in the Prefect or Dask docs indicating that this is expected behavior, or how it could be controlled (e.g., disabled, or increased the allowable duration, etc.). Can anybody provide any guidance on how to deal with flows failing with this
    CancelledError
    ? Any clues on how we can configure Prefect or Dask to allow flows to run past the 12 hour mark?
    k
    4 replies · 2 participants
  • l

    Lawrence Finn

    08/03/2021, 8:30 PM
    If I have a flow run in scheduled state with one parameter, then i try to run that flow again with a different param, why does it cancel the first flow run?
    k
    14 replies · 2 participants
  • a

    Alfie

    08/04/2021, 3:34 AM
    Hi Team, about the the storage options, can I use any DB for it, such as Postgres? If it’s not supported so far, is there any guide I can reference to and implement it by myself? Thanks
    k
    6 replies · 2 participants
  • r

    Ryan Sattler

    08/04/2021, 6:11 AM
    Hi - I’ve been going through the Prefect tutorials using a free account. I’ve gotten the basic Hello World flow to run using the local agent, but now I’m trying to get it to run via the kubernetes agent (using MacOS desktop docker). I’ve run the command-line k8s agent as follows:
    prefect agent kubernetes start
    . When I kick off the hello world flow, this indeed starts a job container on my desktop k8s which appears to start up fine and have the values that I configured (eg correct image). However it stays in a “running” state for several minutes until it errors out as follows:
    urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f5455f07710>: Failed to establish a new connection: [Errno -3] Tempor
    Here’s the code for my flow:
    import prefect
    from prefect import task, Flow
    from prefect.run_configs import KubernetesRun
    
    @task
    def hello_task():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello world!")
    
    flow = Flow("hello-flow2", tasks=[hello_task])
    
    # flow.run() We could run our flow locally using the flow's run method but we'll be running this from Cloud!
    
    flow.run_config = KubernetesRun(
        env={"PREFECT__LOGGING__LEVEL": "DEBUG"},
        image="prefecthq/prefect:latest",
        labels=[]
    )
    flow.register(project_name="tester")
    k
    9 replies · 2 participants
  • a

    Alexander van Eck

    08/04/2021, 1:41 PM
    Morning 👋 I’m using
    DaskExecutor
    with a custom
    cluster_class
    the logs in Prefect UI show me;
    15:36:12 INFO DaskExecutor Creating a new Dask cluster with `dask_jobqueue.remote_slurm.RemoteSLURMCluster.with_http_api_client`...
    15:36:13 INFO DaskExecutor The Dask dashboard is available at <http://10.240.3.45:8787/status>
    But I know for a fact that RemoteSlurmCluster (by dask-jobqueue) also has logger.debug statement. How do I make those logger statements show up in either the flow logs (in the agent) or in the prefect logger UI?
    k
    32 replies · 2 participants
  • a

    Adam Everington

    08/04/2021, 3:27 PM
    Hey guys! I'm really stuck here, but sure i'm missing something easy within the documentation. I have a prefect server running on an independant linux machine and developing on a windows environment. I want to deploy my flow (from my windows environment) to my prefect server. Local storage doesn't seem to be an option as my linux machine has no access to the network that my .py file is on. So... git storage? We use Azure Devops as our repo but i'm honestly lost at how I can now deploy or connect that repo to my server. Any help would be greatly appreciated.
    k
    15 replies · 2 participants
  • k

    Kurt Rhee

    08/04/2021, 4:01 PM
    Hello, I have reached this point in the prefect documentation and I was wondering where I should be looking to be able to host my dashboard in the cloud instead of from my local server? https://docs.prefect.io/orchestration/server/deploy-local.html#ui-configuration
    👋 1
    k
    100 replies · 2 participants
  • a

    Alexander van Eck

    08/04/2021, 4:06 PM
    Found a potential difficulty/confusion with
    prefect agent docker start --network host
    where the network is added to the container but not able to use it as the primary interface to find
    PREFECT__SERVER__HOST
    behind a private DNS. https://github.com/PrefectHQ/prefect/issues/4840 Posting here for visibility
    k
    1 reply · 2 participants
  • b

    Brett Jurman

    08/04/2021, 4:07 PM
    Sometimes with my coiled flows it takes long enough that prefect times out while starting the cluster. Is there a way to extend the timeout?
    k
    7 replies · 2 participants
  • j

    Joe McDonald

    08/04/2021, 6:00 PM
    So ran into something new today after updating to 0.15.3 on all server components. For simplicity, here is a flow run creation in interactive UI:
    mutation {
      create_flow_run(
        input: {
          flow_id: "f667e3f3-b5eb-4c52-8f90-36bc54633ac9", 
          parameters: "{\"file_id\": \"42556bd0-f495-11eb-a7a0-8c8590acd4d8\"}"
        }
      ) {
        id
      }
    }
    Which gives this output. We are getting the same output from our code that runs against graphql endpoint to trigger a flow run and has been working fine for months on 0.14.15.
    {
      "errors": [
        {
          "message": "[{'extensions': {'path': '$.variableValues.insert_on_conflict.constraint', 'code': 'validation-failed'}, 'message': 'unexpected value \"ix_flow_run_idempotency_key_unique\" for enum: \\'flow_run_constraint\\''}]",
          "locations": [
            {
              "line": 2,
              "column": 3
            }
          ],
          "path": [
            "create_flow_run"
          ],
          "extensions": {
            "code": "INTERNAL_SERVER_ERROR",
            "exception": {
              "message": "[{'extensions': {'path': '$.variableValues.insert_on_conflict.constraint', 'code': 'validation-failed'}, 'message': 'unexpected value \"ix_flow_run_idempotency_key_unique\" for enum: \\'flow_run_constraint\\''}]"
            }
          }
        }
      ],
      "data": {
        "create_flow_run": null
      }
    }
    k
    m
    6 replies · 3 participants
  • m

    Margaret Walter

    08/04/2021, 8:09 PM
    Hey guys! Glad to have found this community and hope everyone has had a good week so far! I've been trying to deploy the Prefect server in its own namespace on a shared EKS cluster - the cluster itself has 3 nodes and exists inside a VPC. I'd like to have the UI hosted in the cluster if possible, and have been trying to use DNS to connect it to the apollo endpoint without much luck (it just seems unable to connect at all). I know we have a DNS server up, though it's in a different namespace. This may be more of a k8s/EKS issue, but I figured I'd ask in case anyone else had run into it.
    k
    9 replies · 2 participants
  • r

    Ruslan

    08/05/2021, 5:27 AM
    Hi! Too often something happens and flow freezes influentially. Flow should finish in few seconds, sometimes it work but sometimes it freezes forever. It is impossible to cancel it too. How to debug such problem? 10% of flow runs have such behaviour and it happens only if server side run in docker
    j
    k
    19 replies · 3 participants
  • a

    Arran

    08/05/2021, 9:42 AM
    does anyone know of a resource that will get me started with installing prefect server and an agent on an ubuntu VM? All i seem to be able to find is docs on GCP and AWS. I want it to run as a service too, restarting automatically if it crashes out
    k
    12 replies · 2 participants
  • p

    Pierre Monico

    08/05/2021, 12:55 PM
    I have a very generic question about the Prefect architecture, the
    Storage
    in particular. My current project is using simple
    flow.run()
    s, all packaged in a Docker container. When switching to Prefect Server for orchestration, I have to register flows and thus define a
    Storage
    class,
    RunConfig
    etc. It seems like I have to write a lot of “infrastructure” code into my repo and reference different systems etc. I am not sure how to properly encapsulate this / separate these concerns from my business logic. I just want to be able to keep developing locally and use
    flow.run
    , but then have all the needed registration, storage configuration etc taken care of during deployment. I tried by simply adding a separate file in which I import the
    flow
    objects and then configure them before I register them, but e.g. with
    Local()
    storage and
    save_as_script=True
    , I then need to provide a path to the original flow file so I am not sure all the flow config made in the separate file will be taken into account. Long story short: I find the step from using core to orchestration very difficult to understand and to structure in terms of code - I have the feeling it’s kind of either-or. Positive closing: my experience with Prefect has been amazing so far 🙂
    k
    2 replies · 2 participants
  • p

    patrickd

    08/05/2021, 2:57 PM
    Hey everyone! I am new to Prefect and am loving it so far! I have deployed Prefect Server + Agent alongside a persistent Dask cluster in K8s, using Git storage for the following flow:
    from prefect import task, Flow
    import random
    from time import sleep
    from prefect.run_configs import KubernetesRun
    from prefect.executors import DaskExecutor
    from prefect.storage import GitLab
    
    @task
    def inc(x):
        sleep(random.random() / 10)
        return x + 1
    
    @task
    def dec(x):
        sleep(random.random() / 10)
        return x - 1
    
    @task
    def add(x, y):
        sleep(random.random() / 10)
        return x + y
    
    @task(name="sum")
    def list_sum(arr):
        return sum(arr)
    
    
    with Flow("dask-test") as flow:
        incs = inc.map(x=range(100))
        decs = dec.map(x=range(100))
        adds = add.map(x=incs, y=decs)
        total = list_sum(adds)
    
    flow.storage = GitLab(<git kwargs>)
    flow.run_config = KubernetesRun(image="prefecthq/prefect:latest-python3.8")
    flow.executor = DaskExecutor("<tcp://dask-scheduler:8786>")
    
    flow.register(project_name="test")
    When running this flow with a
    LocalDaskExecutor
    , it works fine. In the Kubernetes cluster, however, the job is never marked as finished on the Prefect server. I can see the job execute to completion on the Dask scheduler UI, but the flow run goes indefinitely on Prefect. Any help would be greatly appreciated!
    k
    s
    +1
    43 replies · 4 participants
  • b

    Blake List

    08/05/2021, 11:14 PM
    Hi there! Firstly, I'm sorry if this is a repeated question, but I have been banging my head against a wall for some time now. We have Prefect Server deployed to a machine on our company cluster using docker. I have a flow (example.py) that runs on the same machine happily by running Prefect locally. I can start an agent in the directory of the script (or define the path to my script with -p) and register the flow. Flows are defined in ~/.prefect/flows/ (not where I am running the script nor the agent) and when registering the flow, it appears in the ui just fine. However, when I try to run the flow from within the ui, I get:
    Failed to load and execute Flow's environment: ValueError('No flows found in file.')
    Any ideas what I am doing wrong?
    k
    7 replies · 2 participants
  • j

    John T

    08/06/2021, 5:50 AM
    Hello! I’m having a hard time with containerizing my prefect docker agent. I will post more information in the thread
    n
    k
    +2
    40 replies · 5 participants
  • a

    Arran

    08/06/2021, 8:04 AM
    sorry to keep posting here but im still having problems getting this to run on a VM. i have tried both inside a venv and outside. Googling doesnt seem to help. Prefect installs with no errors but running
    prefect server start
    keeps giving me this error. Is there somewere i can just get a copy of the docker-compose file? I’ve tried this on two VMs now
    ERROR: 
            Can't find a suitable configuration file in this directory or any
            parent. Are you in the right directory?
    
            Supported filenames: docker-compose.yml, docker-compose.yaml
    i had a look in the directory mentioned in the traceback for the docker-compose.yml and it is there, so I’m not sure why it isn’t being recognised
    Traceback (most recent call last):
      File "******/venv/lib/python3.8/site-packages/prefect/cli/server.py", line 608, in start
        subprocess.check_call(
      File "/usr/lib/python3.8/subprocess.py", line 364, in check_call
        raise CalledProcessError(retcode, cmd)
    subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1.
    k
    3 replies · 2 participants
  • a

    Alex Furrier

    08/06/2021, 5:02 PM
    Running Prefect Server on K8s and the apollo server pod keeps crashing with this error message:
    PayloadTooLargeError: request entity too large
        at readStream (/apollo/node_modules/raw-body/index.js:155:17)
        at getRawBody (/apollo/node_modules/raw-body/index.js:108:12)
        at read (/apollo/node_modules/body-parser/lib/read.js:77:3)
        at jsonParser (/apollo/node_modules/body-parser/lib/types/json.js:135:5)
        at Layer.handle [as handle_request] (/apollo/node_modules/express/lib/router/layer.js:95:5)
        at trim_prefix (/apollo/node_modules/express/lib/router/index.js:317:13)
        at /apollo/node_modules/express/lib/router/index.js:284:7
        at Function.process_params (/apollo/node_modules/express/lib/router/index.js:335:12)
        at next (/apollo/node_modules/express/lib/router/index.js:275:10)
        at cors (/apollo/node_modules/cors/lib/index.js:188:7)
    Any idea where to start for debugging that?
    a
    2 replies · 2 participants
  • s

    Serdar Tumgoren

    08/06/2021, 5:43 PM
    Hello all, any chance someone can offer advice or reference examples on spinning up a Kubernetes deployment on Google Cloud Platform as a preliminary step to deploying ephemeral DaskExecutor flows? My org has a limited budget and we’re trying to minimize cloud costs by scaling our infrastructure to zero when we’re not running large web scraping jobs once or twice a week. I’ve looked at Dask Cloud Provider, which has GCP support and could work. But we also wanted to explore a k8s solution before settling on a final strategy. I realize this question is not directly related to Prefect, but hoping someone might have experience with this kind of approach (perhaps using something like Terraform?). Any advice is greatly appreciated!
    k
    g
    10 replies · 3 participants
  • r

    Ross Rochford

    08/09/2021, 9:23 AM
    Hi everyone, I'm creating a basic docker-compose setup of prefect + dask. I'm able to run flows with LocalExecutor in Prefect, and I'm able to run tasks in Dask directly, but I'm not able to run Prefect flows using DaskExecutor. My flow runs simply stay in a pending state and never execute. I don't see any errors in the logs or dashboard. What are some things I can try to debug for issues? I can share my docker-compose file if that helps. The only possibly unusual thing I can see is that I'm running a the agent as 'local' although it is running within a container, but I think that should be fine?
  • r

    Ross Rochford

    08/09/2021, 11:14 AM
    When I inspect the agent logs (run with "prefect agent local start" inside a container), it looks like it is only picking up flow-runs with LocalExecutor. I see there is also LocalDaskExecutor but I don't want the agent to run its own local dask cluster, I want it to send work to the scheduler, which is in a separate container.
    k
    m
    48 replies · 3 participants
Powered by Linen
Title
r

Ross Rochford

08/09/2021, 11:14 AM
When I inspect the agent logs (run with "prefect agent local start" inside a container), it looks like it is only picking up flow-runs with LocalExecutor. I see there is also LocalDaskExecutor but I don't want the agent to run its own local dask cluster, I want it to send work to the scheduler, which is in a separate container.
k

Kevin Kho

08/09/2021, 1:21 PM
Hey @Ross Rochford, it sounds like Prefect can’t send work to the scheduler of the cluster. Could you share how to define the executor in the Flow?
r

Ross Rochford

08/09/2021, 1:39 PM
with Flow("list-sum-dask") as flow:
    flow.executor = DaskExecutor(address=SCHEDULER_ADDRESS)
k

Kevin Kho

08/09/2021, 1:39 PM
Could you try it outside the Flow block?
with Flow("list-sum-dask") as flow:
     ....
    
flow.executor = DaskExecutor(address=SCHEDULER_ADDRESS)
But if that doesn’t work, would there be any reason the Dask cluster can’t accept incoming requests? If it’s in a container, are the needed ports open?
r

Ross Rochford

08/09/2021, 1:43 PM
That didn't work.
No I don't think so, I'm able to send work to the cluster from the dask client inside the agent's container.
Sorry, I confused two flows, the run seems to have been scheduled this time
k

Kevin Kho

08/09/2021, 1:47 PM
No worries. Just let me know if it’s still not working.
r

Ross Rochford

08/09/2021, 1:47 PM
ok, the tasks are still pending
maybe that didn't make a difference after all
This time the agent stdout showed:
[2021-08-09 13:42:09+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'list-sum-dask2'
which I don't think it did previously (though I'm not 100% sure)
Is it strange that it says "CloudFlowRunner"? I'm using prefect server, not Prefect Cloud
The dashboard says it's "running" but isn't making progress, all the tasks are pending.
k

Kevin Kho

08/09/2021, 1:52 PM
Do you have access to the Dask UI to see if anything was scheduled there?
The CloudFlowRunner is not out of the ordinary. It’s just the class name that makes the logs. Maybe you can run the Flow with debug level logs?
r

Ross Rochford

08/09/2021, 1:58 PM
Yes, I have access to the Dask UI, no tasks are shown
trying now with debug logs, you mean on the agent?
[2021-08-09 13:58:54+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'list-sum-dask2'
[2021-08-09 13:58:55,282] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
/usr/local/lib/python3.8/site-packages/distributed/client.py:1190: VersionMismatchWarning: Mismatched versions found

+---------+----------------+---------------+---------------+
| Package | client         | scheduler     | workers       |
+---------+----------------+---------------+---------------+
| blosc   | None           | 1.9.2         | 1.9.2         |
| lz4     | None           | 3.1.3         | 3.1.3         |
| msgpack | 1.0.2          | 1.0.0         | 1.0.0         |
| numpy   | None           | 1.21.1        | 1.21.1        |
| pandas  | None           | 1.3.0         | 1.3.0         |
| python  | 3.8.11.final.0 | 3.8.0.final.0 | 3.8.0.final.0 |
+---------+----------------+---------------+---------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
[2021-08-09 13:58:55,390] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
[2021-08-09 13:58:55,394] DEBUG - agent | Sleeping flow run poller for 2.0 seconds...
DEBUG:agent:Sleeping flow run poller for 2.0 seconds...
[2021-08-09 13:58:57,396] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
[2021-08-09 13:58:57,456] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
k

Kevin Kho

08/09/2021, 2:01 PM
This seems fine. Agent and flow. On the Flow you can click “Advanced Configuration” in the UI under “Run” and then either add an env variable with
PREFECT___LOGGING___LEVEL="DEBUG"
or you might be able to set the logging level.
r

Ross Rochford

08/09/2021, 2:18 PM
ok, I tried that, I don't see anything different in the logs
2021-08-09 14:13:11+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'list-sum-dask2'
[2021-08-09 14:13:12,755] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
/usr/local/lib/python3.8/site-packages/distributed/client.py:1190: VersionMismatchWarning: Mismatched versions found

+---------+----------------+---------------+---------------+
| Package | client         | scheduler     | workers       |
+---------+----------------+---------------+---------------+
| blosc   | None           | 1.9.2         | 1.9.2         |
| lz4     | None           | 3.1.3         | 3.1.3         |
| msgpack | 1.0.2          | 1.0.0         | 1.0.0         |
| numpy   | None           | 1.21.1        | 1.21.1        |
| pandas  | None           | 1.3.0         | 1.3.0         |
| python  | 3.8.11.final.0 | 3.8.0.final.0 | 3.8.0.final.0 |
+---------+----------------+---------------+---------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
[2021-08-09 14:13:12,877] DEBUG - agent | No ready flow runs found.
k

Kevin Kho

08/09/2021, 2:19 PM
Did you check the Flow logs in the UI? The debug level logs of the flow won’t show up on the agent unless you pass the
--show-flow-logs
flag.
r

Ross Rochford

08/09/2021, 2:19 PM
I notice when I run "prefect config" on the agent, there are a lot of default values that look wrong. But I'm assuming these are values that the server would use, not the agent.
ok, one sec
9 August 2021,04:09:20 	agent	INFO	Submitted for execution: PID: 158
9 August 2021,04:09:21 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'list-sum-dask2'
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Flow 'list-sum-dask2': Handling state change from Scheduled to Running
and nothing else...
k

Kevin Kho

08/09/2021, 2:23 PM
When you said you were able to send work to the client on the same container as the agent, that’s just using Dask commands without Prefect right? Could you show me what code you used?
r

Ross Rochford

08/09/2021, 2:24 PM
actually, a few more lines just arrived
9 August 2021,04:09:20 	agent	INFO	Submitted for execution: PID: 158
9 August 2021,04:09:21 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'list-sum-dask2'
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
9 August 2021,04:09:21 	prefect.CloudFlowRunner	DEBUG	Flow 'list-sum-dask2': Handling state change from Scheduled to Running
9 August 2021,04:23:01 	prefect-server.Lazarus.FlowRun	INFO	Rescheduled by a Lazarus process. This is attempt 1.
9 August 2021,04:23:10 	agent	INFO	Submitted for execution: PID: 197
9 August 2021,04:23:11 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'list-sum-dask2'
9 August 2021,04:23:11 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
9 August 2021,04:23:11 	prefect.CloudFlowRunner	DEBUG	Flow 'list-sum-dask2': Handling state change from Scheduled to Running
9 August 2021,04:23:12 	prefect.DaskExecutor	INFO	Connecting to an existing Dask cluster at dask-scheduler:8890
9 August 2021,04:23:15 	prefect.CloudFlowRunner	DEBUG	Checking flow run state...
9 August 2021,04:23:15 	prefect.CloudFlowRunner	INFO	Flow run RUNNING: terminal tasks are incomplete.
yes, just with dask
import os

import dask
from dask.distributed import Client
import distributed


SCHEDULER_ADDRESS = os.environ['PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS']


def inc(x):
    dask.distributed.get_worker().log_event('inc-log', {'x': x})
    return x + 1

def square(x):
    return x*x


client = Client(SCHEDULER_ADDRESS)
x = client.submit(inc, 10)
print(x.result())

y = client.submit(inc, x)
print(y.result())
L = client.map(inc, range(1000))
list_incremented = client.gather(L)
print(list_incremented)


total = client.submit(sum, L)
print(total.result())

events = client.get_events('inc-log')
print(events)
I don't want to take any more of your time, if you like I can post a repo with my docker-compose setup
k

Kevin Kho

08/09/2021, 2:28 PM
That’s weird This is basically what the DaskExecutor does as well. Will ask the team if there are other ideas.
r

Ross Rochford

08/09/2021, 2:28 PM
or submit it as a github issue
k

Kevin Kho

08/09/2021, 2:30 PM
I’ll have someone else on the team look at this in a bit so no need to submit as a github issue quite yet. i don’t think the repo would help since the Dask cluster stuff is not in it?
r

Ross Rochford

08/09/2021, 2:38 PM
it is all together in one repo
I can document the scenario
m

Michael Adkins

08/09/2021, 2:56 PM
Hey @Ross Rochford -- if you put a reproducible example up I'll give it a go for you
r

Ross Rochford

08/09/2021, 3:19 PM
thanks!
@Michael Adkins here you go: https://github.com/rossrochford/prefect-dask-docker
@Michael Adkins friendly poke in case you forgot :)
m

Michael Adkins

08/10/2021, 11:20 PM
Ah yes; I'll try to get to this in the morning. Sorry about that.
r

Ross Rochford

08/10/2021, 11:31 PM
Thanks! No major rush.
m

Michael Adkins

08/11/2021, 4:40 PM
Giving this a look now
Hi! Your dask workers were not receiving your Prefect configuration so when tasks were submitted to them for execution they were attempting to contact Cloud instead of Server
❯ git diff                                      
diff --git a/docker-compose.yml b/docker-compose.yml
index 793dbde..aaddc0a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -169,11 +169,13 @@ services:
     environment:
       - DASK_SCHEDULER_ADDRESS=dask-scheduler:8890
       - EXTRA_PIP_PACKAGES=prefect
     hostname: dask-worker
     networks:
       - prefect-network
     volumes:
       - './dask/config:/etc/dask'
+      - './agent/prefect_backend_config.toml:/root/.prefect/backend.toml'
     restart: "always"
     depends_on:
       - dask-scheduler
👍 1
prefect run --name list-sum-dask --watch --log-level DEBUG
succeeds now
r

Ross Rochford

08/11/2021, 5:47 PM
awesome, I have it working now, thank you
View count: 2