https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • s

    Scarlett King

    06/23/2021, 10:44 AM
    Hi guys, I’m planning to run Prefect Server on AKS. However, in order to execute our pipelines, the flow needs to read queries from external .sql files. What is the best way to store these sql files and how should Prefect access them? Appreciate any advice
    c
    m
    11 replies · 3 participants
  • l

    Lawrence Finn

    06/23/2021, 4:25 PM
    I am just starting to play with prefect so maybe I’m doing something dumb. I’m trying to use prefect with a fargate dask temporary cluster. The tasks run and finish, but the flow gets stuck in the running state even though each task finished successfully. It works fine with local executor, but not with dask fargate temporary cluster. Any ideas?
    k
    12 replies · 2 participants
  • l

    Lawrence Finn

    06/23/2021, 7:48 PM
    Is there a safe way to stop an agent? I am worried about stopping it mid DAG run
    k
    c
    16 replies · 3 participants
  • r

    Robert Hales

    06/24/2021, 10:10 AM
    Is there any plans to bring concurrency limiting to server?
    k
    1 reply · 2 participants
  • a

    Alderson

    06/24/2021, 12:56 PM
    Hey, first question in here - how do I get started if I'd like to deploy exclusively on docker? Does it really require python on host machine to run? Would prefer a simple docker-compose.yml to start and not have to install python if possible. Thanks!
    k
    m
    27 replies · 3 participants
  • k

    Karim Benjelloun

    06/24/2021, 1:48 PM
    Hello, is there such a thing like a Prometheus exporter to monitor flows (active, successful, failed) ? Or what’s a good monitoring approach?
    👍 1
    k
    d
    +2
    14 replies · 5 participants
  • l

    Lawrence Finn

    06/24/2021, 7:28 PM
    Is there a way to get the output from a kubernetes
    RunNamespacedJob
    and use it as an input for another task?
    k
    11 replies · 2 participants
  • h

    Hugo Kitano

    06/24/2021, 11:03 PM
    Hi, I’m trying to run
    prefect server start
    and running into an error:
    {"internal":"could not connect to server: Connection refused\n\tIs the server running on host \"postgres\" (172.22.0.2) and accepting\n\tTCP/IP connections on port 5432?\n","path":"$","error":"connection error","code":"postgres-error"}
    There’s a lot more to the error I can supply. I’ve done
    prefect backend server
    before trying to start the server
    z
    7 replies · 2 participants
  • a

    Alexander van Eck

    06/25/2021, 11:16 AM
    Hello! 👋 Thank you for the work that you’re doing. I’ve been testing prefect over the past week and stumbled upon the topic of resource management. Created a github ticket for it; https://github.com/PrefectHQ/prefect/discussions/4700 Please review 🙇
    m
    k
    10 replies · 3 participants
  • j

    jcozar

    06/27/2021, 9:01 AM
    Hi, a few days ago I scheduled a flow to run daily. I use LocalDaskExecutor with 8 workers and the processes scheduler (I need to run tasks in parallel). I am using Docker as storage and ECS (fargate) as run config. The problem is that today the execution failed because some mapped tasks were always in Queued state with the following logs:
    ...
    07:17:34 INFO CloudTaskRunner Task 'XXX[12]': Starting task run...
    07:17:35 INFO CloudTaskRunner Task 'XXX[12]': Finished task run for task with final state: 'Queued'
    07:18:05 INFO CloudTaskRunner Task 'XXX[12]': Starting task run...
    07:18:05 INFO CloudTaskRunner Task 'XXX[12]': Finished task run for task with final state: 'Queued'
    ...
    It seems the tasks started but finished in Queued state again… What could be the problem here? I am using task concurrency for those tasks if that helps. Thank you!
    ✅ 1
    k
    5 replies · 2 participants
  • a

    Aiden Price

    06/28/2021, 8:07 AM
    Hi folks, I'm trying to set up a LocalAgent to trigger DaskExecutor runs on my pre-existing Dask cluster. I have been using KubernetesRuns up til now but I figured it was inefficient to create a Kubernetes job every time I wanted to start a new Dask run. I'm getting a
    Failed to load and execute Flow's environment
    error when the flow starts though. I can see that it's missing the Azure storage connection string in the run config. In a KubernetesRun I handled this with a custom job definition but I don't know what to do with a LocalRun. I have the necessary secrets in the LocalAgent's container's environment, but they're coming up null. Am I missing something simple here? Thanks everyone.
    ✅ 1
    k
    3 replies · 2 participants
  • r

    Ruslan

    06/28/2021, 2:23 PM
    Hi! Where can I find examples how to use private github wtorage in prefect? I can-t understand how to use secrets. Seems like it doesn’t pass. Public repos work, but private no
    flow.storage = GitHub(secrets=["github"],
                          repo="corp/etl",
                          path="flows/flow.py")
    and error:
    Repo 'corp/etl' not found. Check that it exists (and is spelled correctly), and that you have configured the proper credentials for accessing it.
    k
    14 replies · 2 participants
  • d

    Daniel Davee

    06/28/2021, 6:11 PM
    Can I use the docker storage to load an entire folder?
    Docker(
        files={
            # absolute path source -> destination in image
            "/Users/me/code/my_mods/": "/my_mods",
            
        },
        env_vars={
            # append modules directory to PYTHONPATH
            "PYTHONPATH": "$PYTHONPATH:modules/"
        },
    )
    k
    13 replies · 2 participants
  • c

    Casey Green

    06/28/2021, 7:34 PM
    Hi! Is there a clean way to limit the number of concurrently executing runs of a flow? For example, let's say I have a flow that runs every 15 minutes. Sometimes the flow run takes 20 minutes, but I don't want to trigger the next flow run until after the last one succeeds (or at the subsequent 15 min interval).
    k
    10 replies · 2 participants
  • k

    Ken Farr

    06/28/2021, 10:17 PM
    Hello! I'm working to port an Argo Workflow to Prefect. All of our Argo WFs are docker based, as such, I'm creating a simple flow that takes 2 Parameters and computes the docker command to execute. The problem I'm running into is that by building the DAG correctly I am referencing a global variable technically before it is defined. This is not a pretty situation, however it is technically producing the correct DAG (based on dag visualization and execution) I've included a paired down snippet. What is the correct/preferred way to accomplish a task like this? Essentially I need to set a Task's arguments based on the execution of a prior Task.
    ...
    
    cft_container = CreateContainer(image_name="twodub:v0.4")
    cft_start = StartContainer()
    cft_logs = GetContainerLogs()
    cft_status_code = WaitOnContainer()
    
    
    @task
    def set_cft_container_command(cft_name: str, account_id: str):
        logger = prefect.context.get("logger")
        ret = f"python -m twodub cft install --cft-name {cft_name} --account-id {account_id}"
        new_container.command = ret # <--- Referencing new_container before declared
        <http://logger.info|logger.info>(f"command is: '{ret}'")
        return ret
    
    
    with Flow("install-cft-flow") as flow:
        cft_name = Parameter("CFT Name")
        account_id = Parameter("Account ID")
    
        scc = set_cft_container_command(cft_name, account_id)
    
        # I'm creating a new version of this Task here with bind
        # I suspect there is a cleaner way than this, but it wasn't
        # apparent to me in the documentation 
        #
        # If I did not do this, then the container would be created
        # before the set_cft_container_command was called and the
        # correct command argument would not be set
        new_container = cft_container().bind(upstream_tasks=[scc])
    
        start_container = cft_start(container_id=new_container)
    
    ...
    🙌 1
    ✅ 1
    k
    12 replies · 2 participants
  • b

    Bruno Murino

    06/29/2021, 2:24 PM
    Hi everyone — we just deployed Prefect Server as an ECS service and all looks good except for one thing, it looks like the PREFECT_SERVER__APOLLO_URL environment variable for the UI container is not being used, at all? I tried to replicate the issue locally and succeeded:
  • b

    Bruno Murino

    06/29/2021, 2:26 PM
    i.e., the URL I passed as config did make its way into some other files, but in the UI it still shows the default URL and it fails to connect
  • b

    Bruno Murino

    06/29/2021, 2:39 PM
    Hi everyone again — I’m struggling to find a way to create the tenants since I deployed the prefect server as an ECS service — do I need to go to a specific container or what?
    k
    4 replies · 2 participants
  • k

    Krapi Shah

    06/29/2021, 6:12 PM
    Hi everyone, Is there any way to set filter for business days while setting the schedule? I know we have a filter for weekdays, but anything similar possible for checking if its a business day?
    k
    6 replies · 2 participants
  • d

    Deckel de Lange

    06/30/2021, 11:59 AM
    Hi all! Does anyone have experience following this guide to running a prefect server on GCP? https://medium.com/the-prefect-blog/prefect-server-101-deploying-to-google-cloud-platform-47354b16afe2 All has gone well until the last stage
    prefect server start
    where I get the following error:
    ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
    any help would be appreciated thanks!
    ✅ 1
    g
    i
    7 replies · 3 participants
  • a

    Arkady K.

    06/30/2021, 3:25 PM
    Hi everyone , how does one download a log file for a running job from a Kubernetes Executor ?
    k
    d
    +1
    7 replies · 4 participants
  • m

    Madison Schott

    06/30/2021, 5:15 PM
    Hi all, I keep getting this error when trying to set up my Fivetran task using the class created in the documentation:
    azure_sql_task = FivetranSyncTask.run(
      File "/usr/local/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 453, in method
        kwargs.setdefault(attr, getattr(self, attr))
    AttributeError: 'str' object has no attribute 'connector_id'
    k
    4 replies · 2 participants
  • t

    Thomas Opsomer

    06/30/2021, 5:49 PM
    Hi community 🙂 I’m wondering the best way to handle a long running flow with several tasks having different resource requirement, without wasting too many resources. For instance a flow with 2 tasks that are both quite long: the first task needs many GBs of memory and the second one just a few MBs. Using the k8s agent/executor, prefect is creating a single pod that run the entire flow, so I need to requests the GBs of memory for the whole flow 😕 while it’s needed only for the first task.
    k
    5 replies · 2 participants
  • s

    Serdar Tumgoren

    07/01/2021, 12:31 AM
    Hey everyone, Been banging my head on the keyboard for a bit on this one, and would be indebted for any guidance. I’m trying to create a flow that: 1. adds metadata to FAILED signals via a mapped task 2. applies a state handler to the flow that can look up metadata from the FAILED task states and alert the user with a final notification that includes detailed metadata from each failed task The strategy is based on the notification docs: https://docs.prefect.io/core/concepts/notifications.html#responding-to-state When I execute a basic flow run locally (without using server or Cloud), the script works as expected and I get a log message listing the failures (I’m testing with a logger, but the goal is to eventually alert via Slack). But when I run the same code against Server or Cloud, the individual task failures no longer seem accessible via the state handler. It seems like
    new_state.result
    inside the state handler is the problem locale. Locally, it is a dict populated with (<task>, <state>) items; on Server or Cloud,
    new_state.result
    appears to return an empty dict. I’ve created two basic flow scripts to demonstrate this behavior (one for a local manual run, the other for Server or Cloud) and will attach them below. Each contains a
    NOTE
    marking the divergent status of
    new_state.result
    . Would be indebted if someone could advise on what I’m doing wrong!
    a
    k
    52 replies · 3 participants
  • d

    Dmitry Lyfar

    07/01/2021, 2:31 AM
    Hi there! I'm wondering what is the best way to use Dask primitives, such as semaphore to orchestrate things as the API rate limits. I'm having a restriction on 5 API calls being executed at a single moment. Hence, I'm using a semaphore shared among tasks that acquire it before an API call. This semaphore is created in the starting task and then is passed to the downstream tasks for use. My Dask cluster has 8 workers / 8 threads if that's relevant. Is it good practice to use these https://docs.dask.org/en/latest/futures.html within my Prefect tasks?
    k
    3 replies · 2 participants
  • b

    Bruno Murino

    07/01/2021, 8:38 AM
    Hi everyone — I’m running prefect server as an ECS service, and I’m wondering in which container I need to add the config.toml file, or if I can’t use this file config because I’m not running “prefect server start”… Point is, I want to automate this URL but not sure how to, does anyone know?
    c
    a
    +2
    17 replies · 5 participants
  • m

    Michael Wheeler

    07/01/2021, 6:05 PM
    How does the Apollo service accept configuration for the Hasura admin secret? It looks like in
    services/apollo/src/executors.js
    it reads the Hasura URL from an env var, but not the secret.
    k
    m
    2 replies · 3 participants
  • c

    Chris Leber

    07/02/2021, 11:00 PM
    Greetings all! I am trying to use Prefect Task Secrets (stored in the prefect server) with the DBTShellTask, but running into some issues. I want to use secrets to set the
    db_kwargs
    account, etc
    dbt = DbtShellTask(
        return_all=True,
        profile_name="test",
        environment="dev",
        overwrite_profiles=True,
        log_stdout=True,
        helper_script="cd dbtProject",
        log_stderr=True,
        dbt_kwargs={
            "type": "snowflake",
            "account": "${ACCOUNT}",
             ...
    I then retrieve the account name as a prefect secret, and set it as an env variable called ACCOUNT when I run the flow:
    with Flow() as flow:
            account = PrefectSecret("snowflake-account")
            parse = dbt(
                command=f"dbt run --models ./models/{database}",
                env={"ACCOUNT": account}
            )
    I have used this approach with ShellTasks, and it works just fine. However, it is not working with DBTShellTasks. Instead of using the value of the Prefect Secret for the account name, it is trying to connect me to snowflake with "${ACCOUNT}" as the account name. Any ideas?? Thanks in advance for the help 😁
    k
    2 replies · 2 participants
  • w

    wiretrack

    07/03/2021, 6:31 PM
    Hey guys, I’m trying to deploy the server in kubernetes (digitalocean) with my own manifests and external db, hasura and graphql seem to work fine, but I can’t seem to manage to make apollo’s service to work, I keep getting
    connection refused
    in the liveness and readiness probes. does anybody have any idea on what I might be doing wrong?
    k
    7 replies · 2 participants
  • w

    wiretrack

    07/03/2021, 11:07 PM
    hey guys, still trying my way into deploying the server on k8s, but having trouble with the agent. i used mostly the same template generated by
    prefect agent kubernetes install
    but I keep getting a
    ClientError
    with
    Name os service not known
    . what I can think of is the lack of a
    tenant
    , so I created one with
    slug=default
    and
    name=default
    . I’m using
    PREFECT__CLOUD___API=<http://prefect-apollo-service:4200/graphql>
    .
    RBAC
    is in place, so running out of ideas on what I might be doing wrong. the UI also can’t connect to
    apollo-service
    , but health checks for apollo are ok, and
    curl localhost:4200/graphql
    also works inside
    apollo
    pod. Also tried querying apollo-service from within the agent pod with:
    curl \
    -X POST \
    -H "Content-Type: application/json" \
    --data '{ "query": "{ tenant { id } }" }' \
    <http://prefect-apollo-service:4200/graphql>
    And it works fine, I get the tenant Id. Any inputs? Thanks!
    1 reply · 1 participant
Powered by Linen
Title
w

wiretrack

07/03/2021, 11:07 PM
hey guys, still trying my way into deploying the server on k8s, but having trouble with the agent. i used mostly the same template generated by
prefect agent kubernetes install
but I keep getting a
ClientError
with
Name os service not known
. what I can think of is the lack of a
tenant
, so I created one with
slug=default
and
name=default
. I’m using
PREFECT__CLOUD___API=<http://prefect-apollo-service:4200/graphql>
.
RBAC
is in place, so running out of ideas on what I might be doing wrong. the UI also can’t connect to
apollo-service
, but health checks for apollo are ok, and
curl localhost:4200/graphql
also works inside
apollo
pod. Also tried querying apollo-service from within the agent pod with:
curl \
-X POST \
-H "Content-Type: application/json" \
--data '{ "query": "{ tenant { id } }" }' \
<http://prefect-apollo-service:4200/graphql>
And it works fine, I get the tenant Id. Any inputs? Thanks!
For anyone that might come across a similar issue ni the future, the problem is the
env_vars
: basically even if the health checks are ok, communication between services might not be working because of the way environment variables was passed, a more detailed explanation in this issue: https://github.com/PrefectHQ/prefect/issues/4738
View count: 2