https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • r

    Robert Hales

    07/15/2021, 9:55 AM
    Hi again 😄 Im renamed a flow run from a task using the
    client.set_flow_run_name
    method, and it sucessfully updates the UI. However, when I then do
    create_flow_run
    the subtasks use the old auto-generated name. Im assuming because the context is stale?
    k
    30 replies · 2 participants
  • r

    Robert Hales

    07/15/2021, 10:26 AM
    Also, it would be useful if
    wait_for_flow_run
    failed if the state of the flow run is failed
  • s

    Scarlett King

    07/15/2021, 3:34 PM
    Hi, we’re configuring persistent volume for the postgres database but having issues connecting to it. What do we need to set in the helm chart?
    k
    m
    +2
    12 replies · 5 participants
  • e

    Eric Mauser

    07/16/2021, 7:54 PM
    Hello! I'm trying to run a DBTShellTask using prefect cloud. I have a lot of my connection details stored as Prefect secrets. It looks like when prefect generates the profiles.yml file for this task it is storing literal values instead of injecting the Prefect secrets. Is there a way to inject these into the dbt_kwargs argument?
    k
    6 replies · 2 participants
  • g

    Garret Cook

    07/16/2021, 11:13 PM
    Hey Team, how do i index a dict of flow.state_handler status results? I get something like this, but I don’t know how to get the keys to index it and get what I want: {<Task: AWSSecretsManager[‘AZ_ACCOUNT_NAME’]>: <Success: “Task run succeeded.“>, <Task: <lambda>>: <Success: “Task run succeeded.“>}
    z
    4 replies · 2 participants
  • g

    Garret Cook

    07/17/2021, 2:19 AM
    Is there any way to get task results in a flow state handler?
    👀 1
    k
    18 replies · 2 participants
  • b

    Bruno Murino

    07/17/2021, 9:17 AM
    Hi everyone, I’m deploy some of my flows via ECS Service with 1 container that registers the flow and starts a local agent. Problem is that no exceptions are thrown on the service, however the flows are stuck on “scheduled” and never run and my hasura container is throwing this error:
    {
      "type": "http-log",
      "timestamp": "2021-07-17T08:43:42.608+0000",
      "level": "error",
      "detail": {
        "operation": {
          "user_vars": {
            "x-hasura-role": "admin"
          },
          "error": {
            "path": "$.selectionSet.insert_project.args.objects",
            "error": "Uniqueness violation. duplicate key value violates unique constraint \"project_tenant_id_name_key\"",
            "code": "constraint-violation"
          },
          "request_id": "c8aedf52-53fc-4e54-9149-a1d62db270f0",
          "response_size": 193,
          "query": {
            "variables": {
              "insert_objects": [
                {
                  "tenant_id": "712db680-3e15-469d-b6e7-eee61a60593d",
                  "name": "Health Monitoring",
                  "description": null
                }
              ]
            },
            "query": "mutation($insert_objects: [project_insert_input!]!) {\n    insert: insert_project(objects: $insert_objects) {\n        returning {\n            id\n        }\n    }\n}"
          }
        },
        "http_info": {
          "status": 400,
          "http_version": "HTTP/1.1",
          "url": "/v1alpha1/graphql",
          "ip": "172.17.0.14",
          "method": "POST",
          "content_encoding": null
        }
      }
    }
    k
    4 replies · 2 participants
  • s

    Son Mai

    07/18/2021, 1:50 PM
    Please show me your project structure of prefect ?
    k
    1 reply · 2 participants
  • g

    Garret Cook

    07/19/2021, 3:13 AM
    When I generate a child flow from inside a running flow using create_flow_run, the child runs and is marked successful in the web interface. However, the following error is logged in the parent flow, and the create_flow_run task is marked as failed.
    Task 'create_flow_run': Exception encountered during task execution!
    Traceback (most recent call last):
    File "/usr/local/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    logger=self.logger,
    File "/usr/local/lib/python3.6/site-packages/prefect/utilities/executors.py", line 327, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
    File "/usr/local/lib/python3.6/site-packages/prefect/tasks/prefect/flow_run.py", line 142, in create_flow_run
    run_url = client.get_cloud_url("flow-run", flow_run_id)
    File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1217, in get_cloud_url
    as_user=(as_user and using_cloud_api and self._api_token is not None)
    File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1252, in get_default_tenant_slug
    res = self.graphql(query)
    File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 563, in graphql
    raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}]
    1 reply · 1 participant
  • g

    Garret Cook

    07/19/2021, 4:09 PM
    I have a mapped task, it returns a list of lists. I then flatten it and iterate over the results in another mapped task, as recommended in the docs. However, if the first task returns TRIGGERFAIL instead of a list of lists, when I run flatten, I get an error: Traceback (most recent call last): File “/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py”, line 48, in inner new_state = method(self, state, *args, **kwargs) File “/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py”, line 520, in get_flow_run_state mapped_children=children, executor=executor File “/usr/local/lib/python3.6/site-packages/prefect/utilities/executors.py”, line 588, in flatten_mapped_children [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children] File “/usr/local/lib/python3.6/site-packages/prefect/utilities/executors.py”, line 588, in <listcomp> [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children] File “/usr/local/lib/python3.6/site-packages/prefect/executors/local.py”, line 28, in submit return fn(*args, **kwargs) File “/usr/local/lib/python3.6/site-packages/prefect/utilities/executors.py”, line 588, in <lambda> [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children] TypeError: object of type ‘TRIGGERFAIL’ has no len()
    k
    15 replies · 2 participants
  • m

    Marko Jamedzija

    07/20/2021, 3:33 PM
    Hello 🙂 I’m experiencing a problem with some tasks getting stuck in the
    Running
    state even though the underlying k8s
    RunNamespacedJob
    task completed successfully. I’m using prefect
    0.15.1
    and
    LocalDaskExecutor
    . This happens almost always for the longer-running tasks of this kind. Any suggestion how to resolve this? Thanks!
    k
    14 replies · 2 participants
  • l

    Leonardo Rocha

    07/20/2021, 5:59 PM
    Hi, I'm kind of new to prefect, so it's probably a dumb question, but I wanted to know if anyone has tried to do something similar to me. I created a python class to extend the prefect.Flow. Among other things, it overrides the run function (mainly to enforce a few company rules for logging and to setup the context on runtime for each flow), but now I'm having dificulty running my code on a local agent, since the agent does not call flow.run. Is there a way for me to configure those things on my flow class without needing to create my own FlowRunner?
    k
    6 replies · 2 participants
  • a

    Alex Furrier

    07/20/2021, 10:06 PM
    I'm trying to run a "flow of flows" following the documentation on dependent flows here. I'm trying to parametrize this flow and pass those down to the dependent flows. I can pass params to the first flow but when it tries to run the dependent flow with params they all come back as null and the sub flow fails.
    flow_a = StartFlowRun(flow_name="flow a", project_name="my project", wait=True)
    flow_b = StartFlowRun(flow_name="flow b", project_name="my project", wait=True)
    
    with Flow('flow-of-flows') as flow_of_flows:
        param_a = Parameter('param_a', 'a')
        param_b = Parameter('param_b', 'b')
    
        flow_a(parameters={"param_a": param_a})
        flow_b(parameters={"param_b": param_b}, upstream_tasks=[flow_a])
    In the UI if I attempt to run this flow with params like
    {'param_a': 'parametrized_1', 'param_b':'parametrized_b'}
    it will create a task running
    flow_a
    and fail. Viewing the initiated flow for
    flow_a
    it shows the parameters being
    {'param_a': null}
    .
    flow_a
    fails and
    flow_b
    fails before it even runs since it's dependent on
    flow_a
    and says a reference task failed. Is there a proper way to pass parameters between parent and child flows?
    k
    5 replies · 2 participants
  • h

    Hugo Polloli

    07/21/2021, 9:42 AM
    Hi, I'm trying to setup my prefect backend and one agent to be ever-running on my EC2 instance, docker is already automatically starting and since restart is set to true in docker-server's compose.yml, prefect server also automatically starts ever since I launched it once with --detach. But how do I make the agent automatically start on boot too ? according to the docs I could use supervisor with the local agent, is there any alternative using the docker agent ? Should I make my own compose.yml for the agent ?
    m
    k
    +1
    64 replies · 4 participants
  • d

    Daniel Davee

    07/21/2021, 6:37 PM
    If I have a docker agent running on a vm with a local docker repo and am using dockers stoage and a docker run, it should pickup the job?
    k
    6 replies · 2 participants
  • a

    Amogh Kulkarni

    07/22/2021, 8:31 AM
    Hi I want to deploy prefect-server on local docker how we can do that?
    m
    1 reply · 2 participants
  • y

    YD

    07/22/2021, 9:06 PM
    If I have a prefect.io running on a VM and I want to test the cloud server, do I need to stop the agent running on my laptop first, or can I have the agent for the cloud server and the agent for the VM server running at the same time ?
    k
    8 replies · 2 participants
  • j

    Jonas

    07/23/2021, 1:44 PM
    Hello, I am currently trying to register a flow to a local prefect server, but I ran into the following issue:
    TypeError: cannot pickle 'weakref' object (running this with flow.run worked fine)
    I narrowed it down to the use of a sqlalchemy session object I use to send and retrieve data from a database and that the object can't be serialized. Now I'm a bit clueless as to how I should solve this because I'm unsure with what steps to take next.
    from prefect import task, Flow,case, Parameter
    from sqlalchemy import orm
    from sqlalchemy import orm
    
    engine = create_engine("mysql+mysqldb://{USER}:{PASSWORD}@{HOST}:{PORT}/{NAME}".format(**json.load(f)))
    session = orm.Session(bind=engine)
    
    @task(max_retries=3, retry_delay=timedelta(seconds=10))
    def commit_to_db(object_list: List):
    
        session.add_all(object_list)
        try:
            session.commit()
    
        except exc.SQLAlchemyError as error:
            session.rollback()
            #logger.warning(error)
            raise Exception(error)
    
        session.close()
    k
    12 replies · 2 participants
  • c

    Chris Bowen

    07/23/2021, 6:18 PM
    Hello, not sure if this will be helpful for anyone else, but figured I'd share. Not sure where else to post it. A couple weeks ago I posted here asking how to manage the Markdown for flow groups in a more programmatic way than just writing it in the UI (I want to keep mine in
    .md
    files in a repo). @Zach Angell provided me with the GraphQL to start out and I expanded on it to make a solution that works for me. I'm storing mine in a few functions, but I'll just provide the entire thing as a script here. This script assumes a few things (I'll edit/update if I think of any more or anyone points any out): 1. This script is running in a directory that contains all of your
    .md
    files that you want to deploy 2. Your
    .md
    files are named the same thing as your flows (see the
    active_flows
    object returned from the GraphQL query) 3. Any
    \
    characters in your
    .md
    files need to be escaped with a second
    \
    or the deployment won't work 4. I believe including any `"`characters in the
    .md
    files also blew up the API call, but
    '
    was fine CODE IN THREAD I've got this divided up into three functions, but posting it in this format seemed like it might be easier to read. One thing I did notice is that the Prefect UI doesn't seem to handle code blocks very elegantly. I tried using the standard three ticks (`), tildas (~), with/without specifying the code base, and using four spaces. I attached a couple pictures that show the input and output. Not a huge deal, but curious why it doesn't "block" like most other platforms I've seen. Anyways, happy to hear any feedback, suggestions, etc. Also just happy to share it if it helps anyone else at all!
    k
    m
    +1
    8 replies · 4 participants
  • n

    Nathan Walker

    07/23/2021, 8:00 PM
    Ohhhh, I did a bad and I need some help. What I did: I created a Flow that creates other Flows and haven't implemented the "don't do this infinitely" part yet, so it grew and grew and now I have a few dozen Flows in "Running" and "Scheduled" statuses. I took my Agent offline, so it's not creating new Flows any more, but I need to cancel all the Flows. Going through each one and clicking "Cancel" in the UI will take quite some time. What I need: Some GraphQL query to cancel all Running and Scheduled Flows. I can query for them and I can call the mutation to cancel them, but I don't know how to have GraphQL iterate through each returned id and run a mutation on it.
    k
    2 replies · 2 participants
  • m

    Mateusz Pazdzior

    07/26/2021, 10:15 AM
    Hello! Do you know if it's possible to pay via bank wire? I've already contacted sales@prefect.io but I have no answer yet.
    k
    h
    +1
    4 replies · 4 participants
  • a

    Alfie

    07/27/2021, 12:03 AM
    Hi Team, now I’m using (Prefect core 0.13) flow.state_handlers to do some actions once the state is changed, and the call back method takes three params: flow, old_state, new_state. What I want is to access some data generated during the flow execution in the handler and it’s used to assistant the triggered action. Any suggestions about how can I make the data accessible in the handler? Thanks
    k
    9 replies · 2 participants
  • e

    Eric Mauser

    07/27/2021, 8:52 PM
    I'm trying to use ECSRun to run a couple of DBTShellTasks. When I run this
    flow.py
    locally it returns that both tasks were successful, however I can see that it didn't actually run the dbt package. Any thoughts here? The dbt package is just in the same folder as the
    flow.py
    . I'm assuming this all gets packaged up and run on ECS.
    k
    7 replies · 2 participants
  • h

    Hugo Polloli

    07/28/2021, 12:33 PM
    Running
    prefect agent docker start
    then
    docker ps
    , I was thinking I'd see a docker agent as a container but I don't, I only have my prefect server. Did I misunderstand something about the "docker" agent or is something wrong with my setup ?
    g
    3 replies · 2 participants
  • j

    Jonas

    07/28/2021, 2:14 PM
    Hello, I am trying to run a flow (A) on a local prefect server but I am running into the following error:
    Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'subpackage1\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    I'm trying to import package 1 and 2 in flow A . I get that the Prefect environment can't find the module but what is they best way of solving this? folder structure: └── myproject └── system1 ├── flow_A.py └── subpackage1 └── package1.py └── subpackage2 └── package2.py └── random files (that I can import)
    k
    2 replies · 2 participants
  • c

    Constantino Schillebeeckx

    07/28/2021, 2:28 PM
    Hey there! Not sure if this is the right channel, so feel free to point me in the right direction. I just started a new job where they use prefect; I've created an account with my work email but I'm never getting the "confirm your email" email. Am I missing something?
    k
    n
    9 replies · 3 participants
  • e

    Eric Mauser

    07/28/2021, 4:12 PM
    Hello again! I've got a couple of tasks set up to run with ECSRun. I have an error in my container (which I'm aware of). However, it looks like Prefect isn't getting the feedback that the flow even started. Prefect shows it getting stuck in the submitted state. However, ECS has already killed the task. Any thoughts on how to get this feedback to Prefect?
    k
    5 replies · 2 participants
  • p

    Pedro Martins

    07/28/2021, 5:45 PM
    Hey there! Is it possible to run prefect tasks in multiple pods? I have a flow with multiple tasks (fetch_data -> pre-processing -> train_model -> register -> deploy), which I'd like to run in different k8s nodes configuration e.g: pre-processing task in a node with more memory; train_model in a node with GPU.
    k
    m
    +1
    6 replies · 4 participants
  • a

    Alex Furrier

    07/28/2021, 10:13 PM
    The issue I'm having is a continuation of this thread, which is on issues with a task mapped to many subtasks and Dask. The most current issue is that the K8s job that the flow starts with gets an OOM error and dies which causes the flow to fail.
    [2021-07-28 22:06:32+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'My Flow'
    [2021-07-28 22:06:32+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `__main__.`...
    Creating scheduler pod on cluster. This may take some time.
    [2021-07-28 22:06:37+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-root-78fcc2c9-3.prefect:8787/status>
    distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=1 maximum=2
    [2021-07-28 22:07:04+0000] WARNING - prefect.CloudFlowRunner | Flow run is no longer in a running state; the current state is: Error: pods ['prefect-job-054cbaf7-rt2mr'] failed for this job">
    The last thing I attempted to change was to have all results for the flow be
    AzureResults
    with a connection to blob storage. The environment variables for connecting to our blob storage are available in the docker image the flow runs on, but I didn't see them on the prefect job. They are on the dask workers that spawn. I don't see successful results showing up in our blob storage. Not sure if the two of these are related but thought it's possible.
    k
    9 replies · 2 participants
  • z

    Zhilong Li

    07/29/2021, 2:42 AM
    Hi there! I installed prefect on an on-prem kubernetes cluster and have been trying to secure the services. I noticed that the apollo server has to be accessible from the UI browser, which exposes the apollo graphql endpoint so anyone with the endpoint and send queries and mutations to the server. Is there anyway to add an auth layer to protect the server? I was thinking about using Istio authorizationservice but that solution probably requires hardcoding some tokens and changes to the UI source code which is not ideal. Thanks for any tips shared!
    k
    2 replies · 2 participants
Powered by Linen
Title
z

Zhilong Li

07/29/2021, 2:42 AM
Hi there! I installed prefect on an on-prem kubernetes cluster and have been trying to secure the services. I noticed that the apollo server has to be accessible from the UI browser, which exposes the apollo graphql endpoint so anyone with the endpoint and send queries and mutations to the server. Is there anyway to add an auth layer to protect the server? I was thinking about using Istio authorizationservice but that solution probably requires hardcoding some tokens and changes to the UI source code which is not ideal. Thanks for any tips shared!
k

Kevin Kho

07/29/2021, 2:16 PM
Hey @Zhilong Li, unfortunately an auth layer for server is something we don’t support here as that would be one use case of Cloud. This question has come up though so there are some threads you could look at. Example thread.
🙏 1
We do have some users editing the UI source code, but we don’t provide advice on it unfrotunately.
View count: 4