https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Azuma

    05/20/2020, 7:01 AM
    Hello everyone! I'm new to the prefect, can someone tell me or give an example where I can set url_endpoint if I want to be able to access the prefect as http: // public_dns_name/prefect on my VM EC2 on AWS?
    n
    • 2
    • 3
  • s

    Sandeep Aggarwal

    05/20/2020, 1:47 PM
    Is there a way to pass on parameters/task outputs from current task to another task when using FlowRunTask? I get below error when trying to do the same:
    TypeError: Object of type 'Parameter' is not JSON serializable
    Below is a sample snippet:
    with Flow("sample flow") as sample_flow:
        param = Parameter("task_param")
    
        FlowRunTask(flow_name="next flow", parameters={"task_param": param})()
    👀 1
    n
    • 2
    • 12
  • q

    Questionnaire

    05/20/2020, 2:11 PM
    Hello folks, I'm trying to register my flow with UI, I ran the
    docker-compose
    and
    docker
    files added these lines:
    flow.run_agent()
    c = Client()
    c.create_flow_run()
    but facing this error:
    n
    c
    • 3
    • 58
  • q

    Questionnaire

    05/20/2020, 2:11 PM
    Traceback (most recent call last):
      File "main.py", line 315, in <module>
        flow.register()
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1429, in register
        no_url=no_url,
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 619, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1279, in serialize
        self.storage.add_flow(self)
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/environments/storage/local.py", line 100, in add_flow
        flow_location = flow.save(flow_location)
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1346, in save
        cloudpickle.dump(self, f)
      File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 1067, in dump
        CloudPickler(file, protocol=protocol).dump(obj)
      File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 477, in dump
        return Pickler.dump(self, obj)
      File "/usr/lib/python3.6/pickle.py", line 409, in dump
        self.save(obj)
      File "/usr/lib/python3.6/pickle.py", line 521, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 876, in save_set
        save(item)
      File "/usr/lib/python3.6/pickle.py", line 521, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 544, in save_function
        return self.save_function_tuple(obj)
      File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 756, in save_function_tuple
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 521, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 521, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 496, in save
        rv = reduce(self.proto)
    TypeError: can't pickle _thread.lock objects
  • d

    Darragh

    05/20/2020, 3:43 PM
    Back with another bunch of stupid! Today’s questions… • I build my flow into a Docker storage, and register it with the Prefect Server I have running on AWS - what’s the simplest execution model I can use to get it running? I’m assuming if I’ve built it locally then the registry I push it to needs to be accessible by both the builder [local machine] and the server? • Am I right in assuming I want a RemoteEnvironment with LocalExecutor? And does the execution agent need to be a Docker agent? • The agent running alongside my server is crashing out because it’s trying to connect to http://localhost:4200 rather than the IP of the AWS EC2 instance it’s running on - is there an environment variable override for this?
    [2020-05-18 17:52:01,285] ERROR - agent | HTTPConnectionPool(host=‘localhost’, port=4200): Max retries exceeded with url: /graphql/alpha (Caused by NewConnectionError(‘<urllib3.connection.HTTPConnection object at 0x7f52470ae0f0>: Failed to establish a new connection: [Errno 111] Connection refused’,))
    Thanks in advance 🙂
    t
    n
    j
    • 4
    • 20
  • m

    Mark McDonald

    05/20/2020, 5:20 PM
    Is there a way to add all states to a cloud hook via the graphql api? Right now, I'm having to provide a list of all possible states and would like to not have to maintain this if states get introduced/deprecated in the future. so instead of this
    create_cloud_hook = f"""
    mutation {{
      create_cloud_hook(input: {{
        type: WEBHOOK,
        name: "{cloud_hook_name}",
        version_group_id: "{version_group_id}",
        states: ["Cached", "Cancelled", "Failed", "Finished", "Looped", "Mapped", "Paused", "Pending", "Queued", "Resume", "Retrying", "Running", "Scheduled", "Skipped", "Submitted", "Success", "Timedout", "Triggerfailed"],
        config: {{
          url: "{web_hook_url}"
        }}
      }}) {{
        id
      }}
    }}
    """
    something like this:
    create_cloud_hook = f"""
    mutation {{
      create_cloud_hook(input: {{
        type: WEBHOOK,
        name: "{cloud_hook_name}",
        version_group_id: "{version_group_id}",
        states: ["All"],
        config: {{
          url: "{web_hook_url}"
        }}
      }}) {{
        id
      }}
    }}
    """
    n
    j
    c
    • 4
    • 9
  • t

    tkanas

    05/20/2020, 5:37 PM
    Is it possible to retrieve the flow that was run from a result object?
    n
    • 2
    • 3
  • j

    james.lamb

    05/20/2020, 8:54 PM
    Hello from Chicago! I'm new to this space, so if this is the wrong channel please let me know and I'll move my question. I have a flow that could benefit from parallelism, and code that can provision / start a Dask cluster in Kubernetes. I know that I want to use a
    DaskExecutor
    to speed it up. It's clear from the documentation how to do this if you already have a Dask cluster up and running and just want to use it as an executor for a flow run. For my use case, I'd like to start the Dask cluster at the beginning of a flow run and stop it at the end of a flow run. Right now I'm running this flow in a standalone Python process (just a script with flow code that ends in
    flow.run()
    ), not using an agent talking to Prefect Cloud. What is the recommended way to get the behavior I want, where the Dask cluster gets started when the flow run starts and stopped when it stops, without using Prefect Cloud? • Somehow use Dask Cloud Provider Environment or Dask Kubernetes Environment without Prefect Cloud • Extend DaskExecutor by overriding its setup and teardown to start / stop the cluster • something else that I'm missing • it's not possible, use Prefect Cloud Thanks very much!
    👋 2
    n
    • 2
    • 12
  • m

    Marwan Sarieddine

    05/20/2020, 9:37 PM
    Hello everyone - I have been struggling with
    DaskKubernetesEnvironment
    and setting imagePullSecrets to pull images from our private gitlab container registry … Note I am using a Kubernetes Agent polling from prefect cloud and using prefect version 0.11.2. A couple of questions: 1. I thought if I created the docker-registry secret using kubernetes, added imagePullSecrets to the podSpec and specified a custom scheduler_file and worker_spec file to DaskKubernetesEnvironment - this approach doesn’t rely on prefect secrets and I thought should work fine - but I am getting an empty dict for imagePullSecrets when inspecting the job and pod specs … I thought the kubernetes agent might be overwriting them (I saw the
    replace_job_spec_yaml
    method ) and so I also specified the secret name in the kubernetes agent manifest under the environment variable (IMAGE_PULL_SECRETS) — but still I am getting an empty dict for imagePullSecrets - any idea why ? 2. The other approach seems to be not to specify the worker_spec and scheduler file but to set private_registry to True and docker_secret to the name of the Prefect Secret, then I use the client to create the prefect secret by setting a name and value - I don’t see where in the code is the prefect secret’s value taken and a kubernetes secret is created ? (so I tried both using a dictionary of docker-server, docker-username, docker-password, and docker-email or just setting the value to the name of the k8s secret that I created) both approaches didnt work - and I am getting an empty dict for imagePullSecrets … Any idea what might be going on here ? and what is the best practice to setting k8s imagePullSecrets for
    DaskKubernetesEnvironment
    ?
    j
    • 2
    • 24
  • s

    Slackbot

    05/20/2020, 11:22 PM
    This message was deleted.
    q
    c
    • 3
    • 8
  • i

    itay livni

    05/21/2020, 1:38 AM
    Hi - I am trying to understand if there is (1) a reason to configure a result in each task as opposed to just the flow . (2) And is setting the target enough if setting the result in the flow suffices ?
    c
    • 2
    • 2
  • a

    Avi A

    05/21/2020, 7:34 AM
    Hey! We’re need to use some retries in our flow. Is there a way to have a variable
    retry_delay
    that increases in each retry? (e.g. implement an exponential backoff)
    l
    • 2
    • 2
  • a

    Arsenii

    05/21/2020, 9:58 AM
    Hey all! I'm trying to set up a CircleCI job that builds and pushes flows Docker images to AWS ECR. CircleCI provides a remote docker execution environment, and to connect to it I need to pass on TLS configuration to the docker client. Unfortunately Prefect seems to only expose the
    base_image
    of
    docker.APIClient
    (https://github.com/PrefectHQ/prefect/blob/1447982cc63a1f87c4a19cc7be18f2b2693fb883/src/prefect/environments/storage/docker.py#L508). Any suggestions on how to do this? Thanks!
    👀 3
    l
    m
    • 3
    • 4
  • q

    Questionnaire

    05/21/2020, 10:41 AM
    Hello folks, I'm trying to register my flow but my API status in local UI remains Attempting to connect... I'm sharing
    config.toml
    and UI screenshot. Please help. and getting an error.
    l
    • 2
    • 30
  • a

    Avi A

    05/21/2020, 12:17 PM
    is there a clean way to invoke tasks as functions? e.g. for unit testing outside a prefect flow? The cleanest way I could think of is to define the function regularly (without the
    task
    decorator) and treat them as tasks only inside the flow. i.e.:
    def f():
        pass
    
    # regular call
    foo = f()
    
    # inside a flow
    with Flow() as flow:
        foo = task(f)()
    any nicer suggestions?
    j
    l
    m
    • 4
    • 6
  • a

    Avi A

    05/21/2020, 12:44 PM
    suppose I have a mapped task that connects to a certain DB. I’d like it to run parallel, but I’d also like to limit the number of concurrent running copies of the task, regardless of the number of available executors in my cluster (e.g. I might have 32 executors but the DB can handle only 4 connections). Is there a way to do that in prefect?
    👀 1
    d
    • 2
    • 5
  • m

    Matias Godoy

    05/21/2020, 1:57 PM
    Hi guys! I just came here to thank you for the implementation of case for conditionals. I was using
    ifelse
    until now, and it was really hard to read and to explain to my coworkers. Now with
    case
    everything is a lot mor readable. Thanks a lot for your excellent work and keep it up! 💪
    🙏 5
    🚀 12
  • g

    Geoffrey Gross

    05/21/2020, 6:27 PM
    Hey everyone. I'm new here. I am at the beginning stages of setting up a POC for Prefect on our kubernetes cluster. I see that there is a Kubernetes Agent which makes sense to me. Then I see the Kubernetes Execution Environment which is also cool. I am curious as to which agent you recommend for the Kubernetes Execution Environment? Or I guess how do Execution Environments interact with Agents?
    👀 1
    l
    • 2
    • 2
  • a

    Alex Welch

    05/21/2020, 6:56 PM
    seems like a very basic question but i couldnt find it in the docs, where do I put my tasks so that prefect will pick them up?
    👀 1
    l
    • 2
    • 5
  • a

    Alex Welch

    05/21/2020, 6:56 PM
    or am I too stuck in the Airflow mentality here
  • b

    Ben Fogelson

    05/21/2020, 7:38 PM
    Say a
    Flow
    run fails. Is there a recommended way to store the state of the failed run so that I could load it up, raise the exception stored in the failed task’s result, and step into a debugger?
    👀 1
    l
    • 2
    • 1
  • m

    Manuel Mourato

    05/21/2020, 7:43 PM
    Hello everyone! I am trying to run a flow via the UI, which I stored in a Docker image. The docker image itself builds with no issue, as shown in the code and image bellow.
    storage = Docker(image_name="flows-storage", image_tag="v1.0",local_image=True,dockerfile="DockerFile-PrefectDependencies")
    storage.build()
    test_flow.storage=storage
    test_flow.register(build=False,labels=['docker-execs-test'])
    But when I start a docker agent and run the flow, the run is permanently in Submitted state. Any idea why this is?
    👀 1
    d
    j
    • 3
    • 11
  • s

    Shreyansh Pandey

    05/21/2020, 8:33 PM
    Hey, community! So I had a question from more of an operational perspective. From what I understand, I am supposed to start the Prefect server which will bootstrap, provision and start the dependent services (Postgres, Hasura, etc.) Then we have agents which communicate with the Prefect API to get flows assigned to them and, vis-a-vis, execute them. Imagine I have two servers: server A and server B. Server A is responsible only for the Prefect backend and Server B is my agent farm containing 5 agents. For the purpose of this question, assume that they are in the same CIDR/subnet and can communicate with each other. How would I tell what domain name/IP address my Prefect server is running on so that it can communicate and function like it's supposed to. Another question which arises from this is how would this look like as a standard version-controlled workflow? Imagine I have a repository on my Git servers which contains my flow definition; how will the CI function to automatically register new flows or update existing ones?
    🤔 1
    👀 1
    d
    • 2
    • 7
  • s

    Shreyansh Pandey

    05/21/2020, 8:45 PM
    From my understanding of the Prefect ecosystem, I reckon that the latter part of what I asked is possible but I would still love a green light from the community to make sure I got this right. Suppose we have the standard CI/CD flow: • engineer pushed code to the VCS; • our CI/CD agents receive a task; • they spin up a Docker container to run that task; • the Docker container runs
    python3 flow.py
    (where
    flow.py
    is the entry-point); • this, in turn, calls
    flow.register()
    which registers/updates the flow on upstream. If that's that case, what would be the best way to accomplish the dynamic configuration of the Prefect API endpoint? (Env var, etc.) I understand that I can use
    PREFECT__CONTEXT__SECRET__<NAME>
    to dynamically serialize secrets.
    l
    • 2
    • 1
  • n

    Noah Nethery

    05/21/2020, 9:27 PM
    Hello, I’m getting the following error with DaskKubernetes environment. It has to do with the environment not being able to read the secret in my prefect k8s agent:
    Docker registry secret default-docker does not exist for this tenant.
    
    Creating Docker registry kubernetes secret from "prefect" Prefect Secret.
    
    Failed to load and execute Flow's environment: ClientError([{'message': 'No value found for the requested key', 'locations': [{'line': 2, 'column': 9}], 'path': ['secret_value'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'errors': [{'message': 'No value found for the requested key', 'locations': [], 'path': ['secret_value']}]}}}])
    Here is my flow:
    from prefect import task, Flow, Parameter
    from prefect import Client
    from prefect.environments.storage import Docker
    from prefect.environments import DaskKubernetesEnvironment
    import json
    
    @task(log_stdout=True)
    def say_hello():
        print("Hello, K8s!".format(name))
        return True
    
    @task
    def print_completed(status):
        print('Success')
    
    flow = Flow("Simple-Flow", 
        environment=DaskKubernetesEnvironment(min_workers=1, max_workers=3, private_registry=True, docker_secret='prefect'), 
        storage=Docker(image_name="hello-prefect", registry_url='<http://my-company-reg.com|my-company-reg.com>'))
    
    print_completed.set_upstream(say_hello, flow=flow)
    print_completed.bind(status=say_hello, flow=flow)
    
    flow.register(project_name="Hello K8s")
    And here is the description of my agent’s pod:
    Name:               prefect-agent-7fd7ff499-nzvlr
    Namespace:          default
    Priority:           0
    PriorityClassName:  <none>
    Node:               **********
    Start Time:         Thu, 21 May 2020 15:51:33 -0400
    Labels:             app=prefect-agent
                        pod-template-hash=7fd7ff499
    Annotations:        <http://kubernetes.io/psp|kubernetes.io/psp>: eks.privileged
    Status:             Running
    IP:                 100.64.16.213
    Controlled By:      ReplicaSet/prefect-agent-7fd7ff499
    Containers:
      agent:
        Container ID:  <docker://d6a7fd06ea04adbf956d7a4f9aefdaf421929f1bbaf2b6af4e90a9262095fe4>f
        Image:         prefecthq/prefect:0.11.2-python3.6
        Image ID:      <docker-pullable://prefecthq/prefect@sha256:d0f685016f5f82a373a0b3aeadb4598529e7f31139ca9f585b2077e1f6097c64>
        Port:          <none>
        Host Port:     <none>
        Command:
          /bin/bash
          -c
        Args:
          prefect agent start kubernetes
        State:          Running
          Started:      Thu, 21 May 2020 15:51:34 -0400
        Ready:          True
        Restart Count:  0
        Limits:
          cpu:     100m
          memory:  128Mi
        Requests:
          cpu:     100m
          memory:  128Mi
        Liveness:  http-get http://:8080/api/health delay=40s timeout=1s period=40s #success=1 #failure=2
        Environment:
          PREFECT__CLOUD__AGENT__AUTH_TOKEN:     ********
          PREFECT__CLOUD__API:                   <https://api.prefect.io>
          NAMESPACE:                             default
          IMAGE_PULL_SECRETS:                    prefect
          PREFECT__CLOUD__AGENT__LABELS:         []
          JOB_MEM_REQUEST:                       
          JOB_MEM_LIMIT:                         
          JOB_CPU_REQUEST:                       
          JOB_CPU_LIMIT:                         
          PREFECT__BACKEND:                      cloud
          PREFECT__CLOUD__AGENT__AGENT_ADDRESS:  http://:8080
        Mounts:
          /var/run/secrets/kubernetes.io/serviceaccount from default-token-5nvmj (ro)
    Conditions:
      Type              Status
      Initialized       True 
      Ready             True 
      ContainersReady   True 
      PodScheduled      True 
    Volumes:
      default-token-5nvmj:
        Type:        Secret (a volume populated by a Secret)
        SecretName:  default-token-5nvmj
        Optional:    false
    QoS Class:       Guaranteed
    Node-Selectors:  <none>
    Tolerations:     <http://node.kubernetes.io/not-ready:NoExecute|node.kubernetes.io/not-ready:NoExecute> for 300s
                     <http://node.kubernetes.io/unreachable:NoExecute|node.kubernetes.io/unreachable:NoExecute> for 300s
    Events:          <none>
    I’ve verified that the secret named
    prefect
    pulls down my image from our company’s registry.
    👀 1
    d
    z
    • 3
    • 12
  • m

    Marwan Sarieddine

    05/21/2020, 10:56 PM
    Hello everyone - I am running into issues with my tasks getting
    Marked "Failed" by a Zombie Killer process.
    - basically I created a simple flow and the flow runs fine when the size of the input data is small, but when I increase the size of the input data (but remain well below the memory limit of the dask worker) - parts of the flow run, but some parts get marked as “failed” by a zombie killer process - Also the entire flow’s status gets stuck at “running” even though the task has been marked as “failed” I am sharing my flow’s code and dask-worker spec under this issue here (https://github.com/PrefectHQ/prefect/issues/1954) - to give a concrete example of the failure
  • m

    Manuel Mourato

    05/22/2020, 1:51 PM
    Hello again guys Question: if I run a docker agent like bellow, and run a flow, it works with no issue.
    prefect agent start docker -l docker-execs-test
    If I try to set the network to host however, like bellow, the flow is permanently in "Submitted" state.
    prefect agent start docker -l docker-execs-test --network host
    Any ideas on why this might be?
    j
    • 2
    • 20
  • d

    Darragh

    05/22/2020, 1:56 PM
    Hey guys, today’s Q&A session - is there such a concept as picking a runtime execution environemnt during a flow? To be more descriptive… • I run task A, provides some output X • X is provided as input to Task B, and based off say a number of available executors, N, running on Fargate I map/partition the data and designate Task B to run with partition X/N of the data, and on an environment or worker that I specify during execution?
    j
    • 2
    • 6
  • a

    Andor Tóth

    05/22/2020, 2:11 PM
    hello
  • a

    Andor Tóth

    05/22/2020, 2:11 PM
    can you hive me a hint how to give name to each of a series of mapped tasks?
Powered by Linen
Title
a

Andor Tóth

05/22/2020, 2:11 PM
can you hive me a hint how to give name to each of a series of mapped tasks?
View count: 1