https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • p

    Peter Roelants

    05/21/2021, 1:20 PM
    Hi Prefect, Is there a way to limit parallel flows in Prefect Server? For example I have two flows: • flow_B Resource intensive, long-running flow that is run as a dependent flow for flow_A • flow_A : ◦ Flow that is scheduled to run every
    t
    minutes ◦ Calls flow_B
    x
    times using
    StartFlowRun
    ◦ Run with
    LocalDaskExecutor
    to limit parallelism to max
    y
    runs at a time. For one scheduled run, the dependent flow flow_B is limited to run max
    y
    at a time. Now, I noticed that for a single run of flow_A, flow_B is indeed limited to a parallelism of
    y
    runs at one time. However, when a previous flow_A (and dependent *flow_B*s) are still running), and a new flow_A, with new dependent *flow_B*s are scheduled than more than
    y
    *flow_B*s can run at the same time. For example with parallelism `y=2`: • At time 1 there will be 2 *flow_B*s runing.
    Run flow_A 1:
      |--dependent flow_B 1: running
      |--dependent flow_B 2: running
      |--dependent flow_B 3: waiting
    • At time 2 there will be 4 *flow_B*s runing.
    Running flow_A-1:
      |--dependent flow_B-1: finished
      |--dependent flow_B-2: running
      |--dependent flow_B-3: running
    
    Run flow_A-2:
      |--dependent flow_B-4: running
      |--dependent flow_B-5: running
      |--dependent flow_B-6: waiting
    Is it possible to limit the parallelism of flow_B to max
    y
    over all scheduled runs?
    k
    5 replies · 2 participants
  • j

    Jeff Williams

    05/21/2021, 3:11 PM
    Hello all. I have an odd situation that I can't seem to figure out. The objective is to capture information about flow and task runs and save them off to some "external" storage. The environment is ultimately going to be on a Google Cloud instance, but I am not there yet. I have worked through things locally, meaning I have my test script written to run locally. I am taking the next step, which is to register the flow with a local agent (local infrastructure - no problems there) and then run it on a local agent. All of that seems to work fine, for the most part. I am using a custom terminal state handler to capture the final states of the tasks and flow that has just finished executing. The specific issue I am facing right now is that the state.result dictionary is populated when run locally but it is not when it is run using a local agent. Specifically, if I do
    len(state.result.keys())
    I get a non-zero value locally but get zero when run on the agent. Any ideas as to why?
    k
    z
    11 replies · 3 participants
  • m

    Mariusz Olszewski

    05/21/2021, 7:13 PM
    hi, is there a chance to set termination of task/ flow in some X hours time?
    k
    2 replies · 2 participants
  • j

    Julio Venegas

    05/23/2021, 7:30 PM
    Hi everyone, trying to get an external postgres instance running for prefect 0.14.19. Postgres instance is running on Azure, it’s postgresql 11. Long story short, all Azure postgres usernames are in the format
    username@server-name
    . In order to get hasura running I have to escape the
    @
    in the username with
    %40
    but that leads to the following graphql error
    graphql_1  | Could not upgrade the database!
    graphql_1  | Error: invalid interpolation syntax in '<postgres://prefectpgadmin>%40sql-dev-prefect:REDACTED_PSWD@sql-dev-prefect.postgres.database.azure.com:5432/sqldb-dev-prefect' at position 25
    Any suggestions about what I can to avoid the graphql interpolation issue?
    c
    26 replies · 2 participants
  • j

    jaehoon

    05/24/2021, 9:40 AM
    Hi prefect family! I use parameterized flow run with 'idempotency_key' like this
    test_flow.run(parameters=dict(account_id=account.id), idempotency_key=str(account.id))
    I wanna run flows with new unique idempotency_key every time what's the best practice? Is there any good method in Prefect module? plz help me!
    k
    z
    3 replies · 3 participants
  • c

    Charles Leung

    05/24/2021, 3:40 PM
    Hey prefect team, Is there a parameter that allows a task to fail, yet still continue with the rest of the flow?
    k
    2 replies · 2 participants
  • n

    Noah Guilbault

    05/24/2021, 6:00 PM
    Hi prefect -- i'm attempting to submit a job out to a dask cluster and am encountering an error that seems somewhat obtuse. Prefect flow is registered and submitted with a LocalAgent and DaskExecutor(address="scheduler:8786") LOCAL_AGENT Submitted for execution: PID: 159 CloudFlowRunner Beginning Flow run for 'dask_test' DaskExecutor Connecting to an existing Dask cluster at scheduler:8786 CloudFlowRunner Unexpected error: CancelledError('graph_created_address_file_name-d5a25dd2f2624316b837fb11c5077a2e')
    k
    m
    16 replies · 3 participants
  • c

    Charles Leung

    05/24/2021, 8:28 PM
    Hey Prefect Team, i see through automations i can add a cloud hook to send to pagerduty. However, is it possible to PD alert on an individual task fail? e.g., one of our tasks uses triggers to continue even if some optional tasks fail.
    k
    2 replies · 2 participants
  • u

    김응진

    05/25/2021, 9:25 AM
    Hi I have a problem running a test flow. when i run a test code below.
    def test_flow():
        client = prefect.Client(api_token="dddd")
        id = client.register(google_ads_flow, project_name='pipeline-test')
        client.create_flow_run(
            flow_id=id,
            parameters=dict(account_id=46)
        )
    got
    Failed to load and execute Flow's environment: TypeError("missing a required argument: 'date'")
    error. There is nothing like date parameter or something in my flow.
    k
    12 replies · 2 participants
  • s

    Stéphan Taljaard

    05/25/2021, 10:58 AM
    Hi. I have a Linux VM hosted on GCP that I 'm using as a single node for running Prefect Server + a LocalAgent. I added a unit file for each to auto-start them. I SSH'ed into the VM, installed docker + docker-compose, then _pip install_ed prefect. However, prefect is now only available under my user (GCP SSH logs into your GCP user by default). (
    which prefect
    ==
    /home/staljaard/.local/bin/prefect
    ) How would you recommend installing/configuring my prefect CLI and _pip install_ed prefect (agent) to be available for all users SSH'ing into the VM?
    k
    m
    +1
    49 replies · 4 participants
  • r

    Raúl Mansilla

    05/25/2021, 4:38 PM
    Hello, I´m testing with codecommit storage and I have the next error:
    Failed to retrieve task state with error: ClientError([{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}])
    Traceback (most recent call last):
    File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
    task_run_info = self.client.get_task_run_info(
    File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/client/client.py", line 1399, in get_task_run_info
    result = self.graphql(mutation)  # type: Any
    File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
    raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}
    This only happen when I try to run the flow:
    from prefect import task, Flow, Parameter
    from prefect.executors import LocalExecutor
    from prefect.run_configs import LocalRun
    from prefect.storage import CodeCommit
    @task()
    def say_hello(name):
    print("Hello, {}!".format(name))
    with Flow("flow",run_config=LocalRun(),storage=CodeCommit(repo="prefect_flows", path="flows/hello_world.py", commit = "master"),executor=LocalExecutor()) as flow:
    name = Parameter('name')
    say_hello(name)
    flow.run(name='world') # "Hello, world!"
    flow.run(name='Marvin') # "Hello, Marvin!"
    Does anyone has any clue?
    k
    m
    61 replies · 3 participants
  • m

    Matthew Neary

    05/25/2021, 5:53 PM
    Hey everybody, I'm working on a CI/CD pipeline for Prefect using CircleCI. When building the flows I'm getting an error when trying to pull in secrets that are stored on Prefect Cloud. It's looking for them locally but can't find them (because they're not there), is there a way to pull cloud secrets into a CI environment?
    z
    2 replies · 2 participants
  • m

    Mariusz Olszewski

    05/25/2021, 6:11 PM
    Hi, can somebody share with me example how to connect prefect with external postgres? currently evey single time i restart the postgres i loose all history and tasks :(
    z
    m
    2 replies · 3 participants
  • d

    Daniel Davee

    05/25/2021, 7:04 PM
    How can get why flow failed? all get is it didn't work
    k
    3 replies · 2 participants
  • d

    Diego Alonso Roque Montoya

    05/25/2021, 11:51 PM
    Is there a Warning status in flows? our use case is that we want to mark something as successful for the time being but needing review when an engineer is available
    k
    3 replies · 2 participants
  • a

    Arkady K.

    05/26/2021, 9:41 PM
    Hi all ! any docs we can find on how to create a tenant in a k8s cluster, our cluster got rebooted over the weekend and we lost our default tenant, one of our team mates is out on vacation, so need help creating a default tenant
    k
    n
    10 replies · 3 participants
  • n

    nick vazquez

    05/27/2021, 1:08 AM
    Hi! I am new to using prefect but I am trying to utilize my dask cluster and having issues doing so on-prem. I ran into issues where the workers were looking to submit logs to
    localhost:4200
    although the prefect server/agent were running on a dedicated scheduler box. Am I missing something for configuring the workers to work properly? Do they need to point back at the scheduler's ip for logging? Do I need to run an agent on each machine to pass jobs to the workers?
    k
    m
    15 replies · 3 participants
  • l

    Lukáš Polák

    05/27/2021, 6:24 AM
    Hi everyone! We would like to run our Prefect Server in AWS infrastructure. Right now, we are debating whether to use normal AWS RDS for DB or go with Aurora Serverless. Does anybody have any experience running their Prefect on either of those types of DB? I'm mostly keen to know, if you experience problems with max connections limit.
    k
    1 reply · 2 participants
  • t

    Tom Forbes

    05/27/2021, 11:13 AM
    The docs on output caching say that cached states will be stored in memory when running prefect core locally. This seems like a strange limitation, it’s quite handy during debugging to just do
    flow.run()
    , but not having any caching is anoying. Is there a way to work around this to enable caching for locally run tasks?
    k
    1 reply · 2 participants
  • t

    Tom Forbes

    05/27/2021, 11:28 AM
    And I’m slightly confused about how Result’s are expected to interact with external libraries like Dask. I’d like to save my Dask dataframe to Parquet somewhere, depending on what result is configured. Each Task has a unique storage location which can be a local directory or a S3 prefix, so how would I get this inside my task? I’d like to do:
    @task()
    def save_results(dataframe):
        dataframe.save_parquet(UNIQUE_TASK_LOCATION)
        return UNIQUE_TASK_LOCATION
    or somesuch. But results seem to be centred around handling the writing/pickling of data for you? Ideally I’d not like to care if it’s a S3 prefix (for production) or a local directory (for debugging).
    k
    m
    +1
    42 replies · 4 participants
  • s

    Stéphan Taljaard

    05/27/2021, 1:46 PM
    Hi. With a brand-new install, the order of operations is 1.
    prefect backend server
    2.
    prefect server start
    If I don't want "default" displayed on the UI (i.e. I just need one tenant, but don't want it named "default"), I need to add a new tenant. This is done through 3.
    prefect server create-tenant --name "Some Other Name"
    This now creates a new, additional, tenant. I only need one... Ideally, the steps would be, from above, 1 then 3 then 2. However, that order doesn't work because the server (database) has to be up for cmd 3 to work. Would it be useful to have the default tenant name as optional argument to
    prefect server start
    ? Or am I missing something w.r.t. the creation of my tenant?
    m
    n
    6 replies · 3 participants
  • g

    Garret Cook

    05/27/2021, 6:39 PM
    I have a flow scheduled to run every 15 minutes. Occasionally the run is delayed, and in that case, I’d like Prefect to skip missed runs of this particular flow (rather than running missed runs trying to catch up). Is there a built in way to accomplish this?
    k
    7 replies · 2 participants
  • c

    Chohang Ng

    05/27/2021, 10:15 PM
    from prefect import Task
    import pandas as pd
    import os,sys
    from prefect.utilities.tasks import task
    import db as db
    from prefect import task, Flow, Parameter
    import prefect
    from prefect.run_configs import LocalRun
    from prefect.executors import LocalDaskExecutor
    class ETL(Task):
        def __init__(self):
            self.df  = self.extract()
        def extract(self):
            read_conn = db.read_conn
            query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
    from hq.oproducts p 
    JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
    WHERE b.oproduct_id = 5801"""
            df = pd.read_sql(query,read_conn)
            return df
        def load(self):
            self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
    with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
        df = ETL()
        df.load()
    flow.register(project_name="tester")
    k
    18 replies · 2 participants
  • c

    Chohang Ng

    05/28/2021, 3:39 PM
    A colleague of mine asked me if the prefect.io will still execute the flows when the computer is not login. I'm not sure about the answer but I assume a yes, so the agent would still be able to monitor upcoming flows as long as the power is on right/ He is comparing with the concept of window task scheduler where the task execution is optional on whether the computer is login or not.
    k
    1 reply · 2 participants
  • c

    Chris Bowen

    05/28/2021, 4:45 PM
    Hello, I'm very new to Prefect. I have it running on a RHL server, but I'm running into a couple behaviors I'm not able to find answers for via googling- hoping to get some advice. First thing: when I initially did the tutorial/installation and ran
    prefect server start
    , the task submitted and didn't run interactively. I'm not sure if I inadvertently changed a setting, but for the past day when I start the server, it locks up the terminal and runs as an interactive job. I don't see a setting anywhere to modify it and run it as a daemon or comparable solution. Any ideas? I realize this might b e a Linux issue more than a Prefect one. Second thing: I can't get an agent to start. I'm getting a timeout every time I try to fire it up. I can access the UI from a browser on port 8081 outside of the RHL VM that the server is running on. I can also access the utility that runs on port 4200 from a browser. For reference, there is also an Airflow instance running on this RHL server, so I've been using this command to start prefect:
    prefect server start --postgres-port=5433 --ui-port=8081
    . I haven't found documentation, troubleshooting guides, or anybody asking online about the agent timing out. I tried running the start agent job as the root user and it still times out, so I don't think it's permission, but I'm happy to check anything. I did run
    prefect backend server
    . Here's the agent command:
    prefect agent local start
    I did try messing around with the various hostname flags for this command, I believe I tried every port displayed in the docker list below. I think the agent should be coming up on 4200? Here's the docker info for the Prefect containers:
    d343f6b8e7a5   prefecthq/ui:core-0.14.20       "/docker-entrypoint.…"   8 minutes ago   Up 8 minutes (healthy)   80/tcp, 0.0.0.0:8081->8080/tcp, :::8081->8080/tcp   tmp_ui_1
    9c993c53daed   prefecthq/apollo:core-0.14.20   "tini -g -- bash -c …"   8 minutes ago   Up 8 minutes (healthy)   0.0.0.0:4200->4200/tcp, :::4200->4200/tcp           tmp_apollo_1
    f17c041a6ac8   prefecthq/server:core-0.14.20   "tini -g -- python s…"   8 minutes ago   Up 8 minutes                                                                 tmp_towel_1
    65cd55a58004   prefecthq/server:core-0.14.20   "tini -g -- bash -c …"   8 minutes ago   Up 8 minutes (healthy)   0.0.0.0:4201->4201/tcp, :::4201->4201/tcp           tmp_graphql_1
    9a251fd9322f   hasura/graphql-engine:v1.3.3    "graphql-engine serve"   8 minutes ago   Up 8 minutes (healthy)   0.0.0.0:3000->3000/tcp, :::3000->3000/tcp           tmp_hasura_1
    fd22ad83e1a5   postgres:11                     "docker-entrypoint.s…"   8 minutes ago   Up 8 minutes (healthy)   0.0.0.0:5433->5432/tcp, :::5433->5432/tcp           tmp_postgres_1
    Here's the command I'm executing for the agent:
    $ prefect agent local start
    /usr/local/lib/python3.8/site-packages/prefect/tasks/__init__.py:8: UserWarning: SQLite tasks require sqlite3 to be installed
      import prefect.tasks.database
    [2021-05-28 16:32:21,594] INFO - agent | Registering agent...
    And here's the error:
    File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 200, in connect
        conn = self._new_conn()
      File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 174, in _new_conn
        raise ConnectTimeoutError(
    urllib3.exceptions.ConnectTimeoutError: (<urllib3.connection.HTTPConnection object at 0x7f1a862b83d0>, 'Connection to localhost timed out. (connect timeout=15)')
    ...
    requests.exceptions.ConnectTimeout: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f1a862b83d0>, 'Connection to localhost timed out. (connect timeout=15)'))
    Here's the installation guide I followed: https://docs.prefect.io/core/getting_started/installation.html Any help or advice is much appreciated, thank you!
    k
    m
    +1
    20 replies · 4 participants
  • a

    Aurélien Vallée

    05/30/2021, 6:08 AM
    Im having issues when trying to delete a flow with a self-hosted prefect server. If I check "delete all versions" of the flow, then I get the error message "We could not delete your flow. Please try again. If this problem continues, contact help@prefect.io"
    j
    5 replies · 2 participants
  • a

    Aurélien Vallée

    05/30/2021, 6:08 AM
    and nothing appears in the log to explain what went wrong
  • m

    Marko Jamedzija

    05/31/2021, 1:30 PM
    Hi there! I’m trying to run a
    KubernetesRun
    so that it runs jobs in a different namespace than the agent is in. I tried setting the namespace in job template yaml, but it doesn’t work. Any suggestions?
    k
    t
    10 replies · 3 participants
  • d

    Damien Ramunno-Johnson

    06/01/2021, 4:36 PM
    I am currently having an issue running a lot of short tasks at once
    UnixHTTPConnectionPool(host='localhost', port=None): Read timed out. (read timeout=60)
    Flow run is no longer in a running state; the current state is: <Failed: "UnixHTTPConnectionPool(host='localhost', port=None): Read timed out. (read timeout=60)">
    Is this limit on the cloud api side?
    n
    4 replies · 2 participants
  • d

    Dana Merrick

    06/01/2021, 8:47 PM
    is there a way to manually remove an agent? we have an agent that is old and i cant figure out where it is running and i'd like to just ban it. this is a k8s agent btw
    n
    8 replies · 2 participants
Powered by Linen
Title
d

Dana Merrick

06/01/2021, 8:47 PM
is there a way to manually remove an agent? we have an agent that is old and i cant figure out where it is running and i'd like to just ban it. this is a k8s agent btw
n

nicholas

06/01/2021, 8:51 PM
Hi @Dana Merrick - since the agent is still polling, you won't be able to remove it (since everytime it polls again it'll re-register). You'll need to either 1) revoke the token/key it was created with to stop it from picking up any work or 2) stop the process
d

Dana Merrick

06/01/2021, 8:52 PM
is there a way to get more info on it? the IP address?
n

nicholas

06/01/2021, 8:54 PM
Hm we don't track any identifying info about the agent except its type and config info
d

Dana Merrick

06/01/2021, 8:54 PM
RIP me haha
n

nicholas

06/01/2021, 8:55 PM
Yeah that's a bit of a pickle you're in there, sorry >.<
d

Dana Merrick

06/01/2021, 8:55 PM
salright, ty
update: i manually replaced our oldest k8s nodes and the agent went away. i think we just had a pod in there that had gone rogue. thanks for the help!
n

nicholas

06/02/2021, 3:45 PM
Glad you figured it out! 🤖
View count: 2