https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • j

    Joey Kellison-Linn

    05/14/2021, 4:01 PM
    I'm having trouble running the prefect server on an AWS ubuntu instance, with docker and docker compose installed, big error stack: • PermissionError: [Errno 13] Permission denied • urllib3.exceptions.ProtocolError: ('Connection aborted.', PermissionError(13, 'Permission denied')) • requests.exceptions.ConnectionError: ('Connection aborted.', PermissionError(13, 'Permission denied')) • docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', PermissionError(13, 'Permission denied')) • subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1. • subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1. docker --version: Docker version 20.10.6, build 370c289 docker-compose --version: docker-compose version 1.29.2, build unknown EDIT: Found the solution: https://stackoverflow.com/questions/61698133/docker-py-permissionerror13
    k
    • 2
    • 1
  • c

    Carlo

    05/14/2021, 7:11 PM
    Hi I'm on prefect 0.13.15, Trying to migrate to
    EcsAgent
    from
    FargateAgent
    I'm specifying the
    task_definition_path
    as an
    <s3://url>
    . It's returning a 404 error. Using the same credentials, I'm able to write my own boto3 code to access the file successfully. Any ideas?
    k
    • 2
    • 19
  • i

    Ian Harvey

    05/15/2021, 9:00 AM
    Hi, a question about databases. We have a prefect server install in kube using helm. If I'm storing data from my Flows, is it expected that I would stand up a fresh database, or is it ok to use the database that prefect is using itself? This question is more about "best practices" than if it is literally possible.
    k
    • 2
    • 2
  • l

    Lionel

    05/16/2021, 8:34 AM
    Hi, how do we add ENV variables when installing with Helm ? Specifically to add
    GITHUB_ACCESS_TOKEN
    to use GitHub as the script storage backend.
    👀 1
    z
    • 2
    • 6
  • m

    Matt Camp

    05/17/2021, 1:35 AM
    I'm trying to setup an agent for my local Prefect Server and pretty sure I did something wrong.. The agent registers itself, but the UI says that the Agent hasn't checked for any flows. I think that it might have something to do with how I've configured the Prefect server. It's running in a docker stack with everything being routed through Traefik. This is my agent's config.yml :
    # debug mode
    debug = true
    
    # base configuration directory (typically you won't change this!)
    home_dir = "~/.prefect/"
    
    backend = "server"
    
    [server]
    host = "<http://apollo.prefect.campground.lan>"
    port = "4200"
    endpoint = "${server.host}:${server.port}"
    my apollo server is at
    <http://apollo.prefect.campground.lan:4200/>
    my graphl server is at
    <http://graphql.prefect.campground.lan:4200/graphql>
    my ui is at
    <http://prefect.campground.lan>
    Any glaring problems with what I've done? TIA
    m
    • 2
    • 14
  • l

    Lionel

    05/17/2021, 4:08 PM
    Is there a way to activate 1-way TLS on the MySQL task ? I'm using :
    my_query = 'SELECT * FROM my_db LIMIT 10'
    MySQLFetch("my_db ", "user_read_only", "Abcd1234", "mysql.test.domain", port=3306, query=my_query)
    k
    • 2
    • 7
  • i

    Ismail Cenik

    05/17/2021, 7:38 PM
    Hello, is there a way to limit the concurrent run of flows? Assume that I have a flow that can be run from different resources and we do not want concurrent runs of some flows. We need to put a kind of queue to control triggers on our side but is there a way to limit concurrent runs of a specific flow on the Prefect side? I saw an ability to limit concurrent runs of tasks but I need for a specific flow.
    k
    m
    t
    • 4
    • 17
  • t

    Tom Forbes

    05/17/2021, 8:36 PM
    Hey, We’re evaluating Prefect and I had a few questions about how you’re meant to structure mapping tasks that use a Dask executor. I’ve got the following simple workflow:
    @task()
    def run_export():
        prefix = trigger_export()
        return dask.dataframe.read_parquet(f'{prefix}/*.parquet')
    
    @task()
    def download_image(uuid):
        s3 = get_boto_client("s3")
        img = imageio.imread(s3.get_object(Bucket="..", Key=f"{uuid}")["Body"])
        return skimage.resize(img, (1024, 1024))
    
    @task()
    def save_to_s3(result_df):
        result_df.save("<s3://bucket/output_prefix/>")
    
    with Flow("something") as flow:
        df = run_export()
        images_df = download_images.map(df["uuid"])
        save_to_s3(images_df)
    Basically I want to grab a dataframe from somewhere, download a bunch of images from S3, resize them, and attach them to the dataframe as a new column, then save the frame somewhere else to S3.
  • t

    Tom Forbes

    05/17/2021, 8:41 PM
    The above is roughly pseudocode, but the main question was: how would you structure this? You can probably do most of it with
    df.apply
    directly - is that what you would do? We’d rather use Prefect for this directly if possible. But I’m not clear on how Prefect works with Dask - would a mapping task like this be the way to go? Would this scale to a high number of tasks (millions?), or would you perhaps map over the dask partitions instead?
    k
    • 2
    • 6
  • d

    Daniel Davee

    05/18/2021, 1:23 AM
    Hey if I need help setting up a Kubernetes we can I start to look?
    w
    m
    • 3
    • 5
  • l

    Lionel

    05/18/2021, 6:02 AM
    I got Prefect-Server and UI deployed using Helm based on official documentation. It's up and running. Now I'd like to run Flows which uses 3rd party dependencies/libraries like pymysql and py-postgresql. Since a Flow is started as a Job by the Kubernetes Agent, how do I install the dependencies on the Job?? and also how do I pass the ENV possessed by the Kubernetes Agent to the Job ??
    k
    • 2
    • 1
  • r

    Rehan Razzaque Rajput

    05/18/2021, 8:21 AM
    Hi everyone, I have my Prefect server running on Azure VM and 1 Prefect Kubernetes agent on AKS. I scheduled 3 flows at the same time but the problem is that they were submitted by the server one after another. I expected all the flows to run in parallel but that didn't happen. Do I need to increase the number of agents or is it something in the prefect server config that I need to fix.
    k
    • 2
    • 4
  • m

    Maria

    05/18/2021, 12:45 PM
    Hi all, I'm using great expectations task which validates my raw data and produces artifacts - great feature! I was wondering, if there is a way to access the artifacts from within the code somehow? I'd like to pick up that markdown and send it in an email
    k
    • 2
    • 2
  • n

    Nitin Karolla

    05/18/2021, 2:11 PM
    Hi Everyone, Prefect if awesome!! I wanted to share my excitement with my team, by demoing prefect running on our k8s cluster. Before I do that, I am trying to get this thing work on local k8s. So I have followed the helm instructions on official documentation this is where I stand. I have selected 2 replicas for the agent, they spin up so far. But only 1 agents shows up in the UI? Any idea why that would happen and what I am missing here? Thanks in Advance!!
    ✅ 1
    k
    • 2
    • 6
  • i

    Ismail Cenik

    05/18/2021, 3:56 PM
    Hello, What is the reason for the following error? Pod prefect-job-9e6e019e-2tpvl failed. Container 'flow' state: terminated Exit Code:: 139 Reason: Error
    k
    t
    • 3
    • 19
  • m

    Mdu Keswa

    05/18/2021, 6:31 PM
    Hello, I'm on 0.1.4.19 local Server and UI. Also have, static Dask cluster installed with helm with microK8S. Am able to successfully execute "... executor = DaskExecutor(address="<tcp://microk8s-kub-svr1:8786%7Ctcp://&lt;my-microk8s-dask-svr-ip&gt;:8786&gt;%22) flow.run(executor=executor)" on this remote Dask from python command-line. What do I need to do to 1) register the same flow with local Server (i can do this) 2) setup an agent (which one?), and 3) execute this flow from within local Server UI to run on static Dask cluster? Thanks.
    k
    n
    d
    • 4
    • 30
  • m

    Mdu Keswa

    05/18/2021, 6:31 PM
    Meant version 0.14.19
  • m

    Mdu Keswa

    05/18/2021, 7:16 PM
    @Kevin, thanks. Will try local agent with static Dask cluster.
  • k

    Karl

    05/19/2021, 12:41 AM
    Hello all, I’m attempting to stand up a POC in the AWS cloud using AWS ECS Fargate and Prefect Cloud and hit a roadblock. Error: "when calling the RegisterTaskDefinition operation: Role is not valid" It appears to be associated with my AWS roles/permission but I’m not sure. Not sure what I am doing wrong..
    n
    k
    • 3
    • 5
  • m

    Michael Hadorn

    05/19/2021, 1:17 PM
    We miss the prefect backend version in the local dev environenment. Is this gone? (We updated from 0.14.12 to 0.14.17 [same for 0.14.19]). If you click on default, in the button is only displayed the ui release date:
    ✅ 1
    k
    n
    • 3
    • 2
  • m

    Michael Hadorn

    05/19/2021, 3:35 PM
    Hi all Since I upgrade the GUI to 0.14.19 i can not use the docker agent anymore. Always get this error, when he starts a flow:
    May 19 17:30:35 <http://XXX.ch|XXX.ch> run_dev_docker_agent.sh[3306015]: requests.exceptions.ConnectionError: HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f8572e5ecd0>: Failed to establish a new connection: [Errno -2] Name or service not known'))
    (-> full log in the thread.) Before (<=0.14.17) everything was working. On Windows it's working also with .19. But not on: Ubuntu 20.04 Docker version 20.10.3, build 48d30b5 docker-compose version 1.28.2, build unknown Do you get some similar behaviours?
    k
    m
    • 3
    • 71
  • d

    Daniel Davee

    05/19/2021, 9:14 PM
    I am running a prefect server executing with a Dask Kubernetes cluster, and I'm runinng the hello world test script and keep getting this error. python test.py  [2021-05-19 21:06:50+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Run Me' [2021-05-19 21:06:50+0000] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at tcp://x.x.x.x:8786 /home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/distributed/client.py:1140: VersionMismatchWarning: Mismatched versions found +---------+---------------+---------------+---------------+ | Package | client    | scheduler   | workers    | +---------+---------------+---------------+---------------+ | python | 3.8.8.final.0 | 3.8.0.final.0 | 3.8.0.final.0 | +---------+---------------+---------------+---------------+  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"])) [2021-05-19 21:06:50+0000] ERROR - prefect.FlowRunner | Unexpected error: ModuleNotFoundError("No module named 'prefect'") Traceback (most recent call last):  File "/home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner   new_state = method(self, state, *args, **kwargs)  File "/home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state   final_states = executor.wait(  File "/home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/prefect/executors/dask.py", line 414, in wait   return self.client.gather(futures)  File "/home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/distributed/client.py", line 1975, in gather   return self.sync(  File "/home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/distributed/client.py", line 843, in sync   return sync(  File "/home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/distributed/utils.py", line 353, in sync   raise exc.with_traceback(tb)  File "/home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/distributed/utils.py", line 336, in f   result[0] = yield future  File "/home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/tornado/gen.py", line 762, in run   value = future.result()  File "/home/dani1057/anaconda3/envs/faraday3.8/lib/python3.8/site-packages/distributed/client.py", line 1840, in _gather   raise exception.with_traceback(traceback)  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads ModuleNotFoundError: No module named 'prefect' [2021-05-19 21:06:50+0000] ERROR - prefect.Run Me | Unexpected error occured in FlowRunner: ModuleNotFoundError("No module named 'prefect'") [2:15 PM] I also have a kubernetes agent running on the server [2:16 PM] Does the agent have to be running on the cluster?
    m
    • 2
    • 20
  • d

    Daniel Davee

    05/19/2021, 9:15 PM
    I also have a kubernetes agent running on the server
  • d

    Daniel Davee

    05/19/2021, 9:16 PM
    Does the agent have to be running on the cluster?
  • d

    Dean Magee

    05/19/2021, 11:57 PM
    Hi There, Im trying to use a state handler to send an error message to MS Teams when a task fails...
    def alert_failed(obj, old_state, new_state):
    if new_state.is_failed():
    myTeamsMessage = pymsteams.connectorcard(os.getenv("MSTEAMS_WEBHOOK_URL"))
    myTeamsMessage.title("An error has occurred!")
    Im trying to grab the text that Python spits out as an error message and include that in my MS Teams message. Any idea how to do that?
    c
    • 2
    • 1
  • j

    jaehoon

    05/20/2021, 1:30 PM
    i wanna pass result of task to flow as parameter, but error ocurred like this flow_run_id = client.create_flow_run( File "c:\users\krims\appdata\local\pypoetry\cache\virtualenvs\data-pipeline-i-bq0ir6-py3.9\lib\site-packages\prefect\client\client.py", line 1108, in create_flow_run res = self.graphql(create_mutation, variables=dict(input=inputs)) File "c:\users\krims\appdata\local\pypoetry\cache\virtualenvs\data-pipeline-i-bq0ir6-py3.9\lib\site-packages\prefect\client\client.py", line 302, in graphql params=dict(query=parse_graphql(query), variables=json.dumps(variables)), File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\__init__.py", line 231, in dumps return _default_encoder.encode(obj) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 199, in encode chunks = self.iterencode(o, _one_shot=True) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 257, in iterencode return _iterencode(o, 0) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 179, in default raise TypeError(f'Object of type {o.class.name} ' TypeError: Object of type TaskMetaclass is not JSON serializable help me!
    k
    • 2
    • 7
  • r

    Romain

    05/20/2021, 2:01 PM
    Hi folks, Have an issue with running a flow using a Kubernetes Agent, and a Module storage. So here is what I do. The agent docker image is based on prefecthq/prefect:0.14.19, and in my dockerfile, I have the following folder hierarchy:
    /A/
      B/
         flows/
             my_flow.py
    in my_flow.py, imagine something like that:
    def get_flow():
        with Flow('my_flow', storage=Module("B.flows.my_flow:get_flow")) as flow:
            ....
        return flow
    In the dockerfile, I ensure that the
    PYTHONPATH
    env var holds the folder
    A
    so that I can import
    <http://B.flows.my|B.flows.my>_flow
    setting this at the end of the DockerFile:
    ENV PYTHONPATH "${PYTHONPATH}:/A"
    After that I register this flow with a
    KubernetesRun
    and a `DaskExecutor`:
    flow.run_config = KubernetesRun()
    flow.executor = DaskExecutor()
    flow.register(project_name='my_project',
                  idempotency_key=flow.serialized_hash())
    Then from a prefect server, I trigger the flow run, but I get the following error:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'A'")
    I don't really get why my module is not found. I have tested a local deployment using docker-compose with a local agent (running in the compose stack), and it was working fine. So I am missing something here. Any ideas?
    k
    • 2
    • 4
  • d

    Daniel Davee

    05/20/2021, 4:08 PM
    I'm being told by my Kubernetes engineer that he can not pull
    image: prefecthq/prefect:latest
    on to GKE, can prefect be ran on GKE
    k
    t
    • 3
    • 39
  • d

    David Glaister

    05/20/2021, 4:49 PM
    Hi, I'm trying create a local Prefect system to present to my employer. Using a Windows 10 VM I can get things working without issue. To create the demo I need to interact with other systems on the company network... which is where the problems start. The web UI only shows buttons, which do not respond. These are the steps I take to reach this point: • Start Docker • Cmd prefect backend server • Cmd prefect server start • Browser (Edge & Chrome) localhost:8080 The debug from the browser gives the error: Uncaught (in promise) Error: passed invalid or empty tenant object My company laptop connects uses a proxy, I'm not sure if this is related to the issue
    k
    n
    • 3
    • 4
  • a

    Austin Mackillop

    05/20/2021, 6:44 PM
    Hello, is there a way to have the prefect agent execute a flow in a single threaded manor? I converted an existing ml model training script that makes use of some datatypes that cannot be pickled. Returning these types from tasks works fine when executing the flow locally in a single thread but using the DaskExecutor or running the flow on the agent causes the flow to fail.
    k
    • 2
    • 8
Powered by Linen
Title
a

Austin Mackillop

05/20/2021, 6:44 PM
Hello, is there a way to have the prefect agent execute a flow in a single threaded manor? I converted an existing ml model training script that makes use of some datatypes that cannot be pickled. Returning these types from tasks works fine when executing the flow locally in a single thread but using the DaskExecutor or running the flow on the agent causes the flow to fail.
k

Kevin Kho

05/20/2021, 6:45 PM
Hey @Austin Mackillop, you can explicitly specify LocalExecutor, which is the default (or maybe even LocalDaskExecutor). I’ve seen this some for some people. What
Storage
are ytou using?
a

Austin Mackillop

05/20/2021, 6:47 PM
I tried specifying the executor like so:
with Flow("train-model", executor=LocalExecutor()) as flow:
But on the Ui it still appears to be doing things in parallel. I’m using whatever the default storage is.
k

Kevin Kho

05/20/2021, 6:51 PM
Could you run your flow with debug level logs and see the executor that shows when the flow starts?
a

Austin Mackillop

05/20/2021, 7:06 PM
How do I do that with the local agent?
k

Kevin Kho

05/20/2021, 7:08 PM
One sec let me try
Add
flow.run_config = LocalRun(env={'PREFECT__LOGGING__LEVEL':'DEBUG'})
to the flow code
a

Austin Mackillop

05/21/2021, 9:22 PM
I changed my tasks such that none of them returned un serializable objects and that solved my problem
k

Kevin Kho

05/21/2021, 9:23 PM
Glad you got it working @Austin Mackillop!
View count: 1