https://prefect.io logo
Join Slack
Channels
ask-community
announcements
ask-marvin
best-practices
data-ecosystem
data-tricks-and-tips
events
feedback-deployment-concurrency
find-a-prefect-job
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
intros-test
livestream-chatter
marvin-ai
marvin-in-the-wild
pacc-apr-10-11-2024
pacc-apr-30-may-1-2024
pacc-apr-9-10-2025
pacc-aug-16-2023
pacc-aug-22-23-2023
pacc-aug-28-29-2024
pacc-aug-29-30-2023
pacc-clearcover-june-12-2023
pacc-dec-17-18-2024
pacc-feb-13-14-2024
pacc-feb-26-27-2025
pacc-jan-28-29-2025
pacc-july-11-12-2023
pacc-july-17-18-2023
pacc-july-30-31-2024
pacc-july-6-2023
pacc-june-14-2023
pacc-june-20-21-2024
pacc-london-2023
pacc-london-sept-2024
pacc-mar-12-13-2024
pacc-may-31-2023
pacc-nov-19-20-2024
pacc-nov-8-2023
pacc-nyc-may-2024
pacc-oct-11-12-2023
pacc-oct-1-2-2024
pacc-oct-16-17-2024
pacc-sept-13-14-2023
pacc-sept-20-21-2023
pacc-sept-26-27-2023
ppcc-may-16-2023
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-contributors-archived
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-and-tell
Powered by
# ask-community
  • e

    Eli Treuherz

    01/27/2022, 5:14 PM
    I’ve got a task mapping set up, but the child tasks sometimes have to retry until the external data they read is ready. When one of them enters the Retrying state, it seems to hold up the entire flow. The parent doesn’t produce any more tasks until the retrying one is complete. Is there way around this behaviour? Ideally I’d like the parent to spawn all its tasks right away, and for Prefect to identify which ones can be run, rather than one retry blocking everyone else
    k
    • 2
    • 3
  • k

    Kevin Mullins

    01/27/2022, 5:20 PM
    Can tasks accept a Callable argument from another task? For example, I have an upstream task that needs to make a decision on whether to use
    function a
    in a downstream task or
    function b
    in a down stream task (essentially deciding on a strategy function or factory function). Could the down-stream task properly get what function to execute from the other? Hopefully this makes sense.
    a
    k
    • 3
    • 9
  • m

    Matthew Webster

    01/27/2022, 6:10 PM
    Hi, I’m looking for some guidance on how to use the same flows across multiple projects. For context: we are doing ETL for customers and some flows are identical. We’re trying ECS Agent/Prefect Cloud with a custom Docker image and S3 storage. I am currently registering the flow with the CLI for multiple customers and passing in customer-specific information as Parameters when scheduling the run. This currently breaks down in a few places: 1. Having some problem with Storage and unpicking flows. Is there project-specific info stored there? So when a flow is registered/picked for one project it can’t be unpickled by another? 2. Using secrets. We have some customer-specific API Keys that need to be stored as Secrets and these appear to only be set globally in Prefect Cloud (or AWS SSM). There are no per-project secrets that I can pass in. One option I can think of would be to make a service that takes a Parameter and returns a secret that then gets passed to a task but maybe there’s a better way? I’m still pretty new to Prefect but couldn’t find answers to these. Hoping for some help from the community!
    k
    • 2
    • 21
  • s

    Suresh R

    01/27/2022, 6:17 PM
    Hi, Will all mapped task run in paralell, can we put some delay between the mapped tasks
    k
    • 2
    • 23
  • v

    Vamsi Reddy

    01/27/2022, 9:03 PM
    Hi everyone, so i have a task that return a list of module_names. However this list is not fixed and can vary in length. I am iterating over this list to get the names and then run those modules in a seperate task. Is it possible to specify the output as a list. I tried using nout =1 but it just outputs 1 element of the list.I get the following error :
    Copy code
    TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
    k
    • 2
    • 5
  • p

    Parth Patel

    01/28/2022, 12:36 AM
    Hi all! In one of my prefect tasks I am capturing all stdout because I am importing a class from another module which uses logging.py to log, I am trying to caputure the log statements but this isn't working, any thoughts? None of the log_levels are being captured by the prefect task. Wondering if I'll have to import the prefect logger somehow in this module as well.
    k
    • 2
    • 4
  • m

    Miroslav Rác

    01/28/2022, 1:35 PM
    Hi. I have a little problem I cannot wrap my head around. I have created a flow, nothing complicated just “hello world” from getting started docs. I deployed it to Prefect cloud. then started a local agent and tried to “quick run” the flow. everything works, which is great. but then I started a docker agent on my VPS (not locally on the machine where the flow has been created) and tried to run the flow. It wasn’t working. The agent picked out the flow but an error was raised immediately:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/miro/'")
    … Obviously, when I created my flow, my local path has been probably been pickled and so it cannot run on another machine. Is this expected behavior? How can I run flows created on a different machine?
    a
    • 2
    • 14
  • b

    Bruno Murino

    01/28/2022, 1:58 PM
    Hi everyone — I have a tiny flow I’m using to start other flows, and I’m passing a “run_name” argument to the “create_flow_run” method but the “run_name” depends on parameter and it’s not working as I expected. Can anyone help? I feel like I need to “delay” the “resolution of what ‘scope’ is” but not sure
    a
    • 2
    • 6
  • a

    Andrea Nerla

    01/28/2022, 3:19 PM
    Hi folks, I have a problem of failed code passed as success. For instance in the T part of my ETL( which is simply a os.system...) I've put a test and even if it fails Prefect still declares it a success.
    a
    a
    • 3
    • 9
  • d

    David Yang

    01/28/2022, 4:13 PM
    Hi All, I have a flow that will run in DEV,QA and PROD. the flow connects to a database and I saved the credentials info as secrets in Prefect cloud. The secret variable name has surffix for environments. For example: "username_DEV","username_QA" etc. In flow, I use this code to get the password and username: pasword_var = "password_$FLOW_ENV"     snowflake_pass = PrefectSecret(pasword_var)     user_var = "user_$FLOW_ENV"     snowflake_user = PrefectSecret(user_var) and the environment variable is configured in runconfig: runconfig = DockerRun(image="[imagename]",env={"FLOW_ENV": "TEST"}) It returns error that PrefectSecret can't find secret with name "password_$FLOW_ENV" questions: 1: Is this a good approach to get these sensitive data through prefect secrets? 2: I though $[environment variable name] gives me the environment variable value? 3: It seems that PrefectSecret running as a task and doesn't evaluate the parameter value in running time. Is that right?
    a
    • 2
    • 8
  • p

    Pedro Machado

    01/28/2022, 4:28 PM
    Hi everyone. What is the best pattern to implement retries/ability to restart when using a flow of flows? I created a function task that combines
    create_flow_run
    and
    wait_for_flow_run
    and I am using this task to start the child flow(s) from the parent. This task is set to retry. Today, I noticed that if the child flow fails, this task won't start a new flow run when it retries. It seems to check for the same
    flow_run_id
    . I suspect it's somehow related to an idempotency key that is set by default. Would it be better to use an idempotency key I explicitly set that will be different across retries? Is there a better way to set this up? The goals are: 1) ability to retry the child flow when it fails and 2) ability to restart the parent flow from failed if needed. Thanks!
    a
    • 2
    • 8
  • s

    Suresh R

    01/28/2022, 6:25 PM
    Hi, I raised skip signal for all upstream mapped tasks, now the downstream mapped task is in pending state, What might be the reason
    a
    • 2
    • 1
  • j

    Jovan Sakovic

    01/28/2022, 6:41 PM
    👋 Is there any known issues with Prefect and multi-threading? I’m having a difficult time trying to get the logging from the Thread’s target function to actually log in the UI.. Tried a bunch of stuff, and I thought I am now in the right place where I: • only make the logger objects within functions that are tasks (with
    logger = prefect.context.get("logger")
    ) • pass these logger object from function to function to finally get to the one that is running on threads ◦
    extract_thread = Thread(target=extract_messages, name=f"Extractor #1", args=(extract_queue, load_queue, logger))
    k
    • 2
    • 5
  • m

    Michael Bell

    01/28/2022, 10:02 PM
    Hi folks. I have a simple example flow that I've written to test using a Dask executor configured to use a Fargate cluster, and I'm encountering an error with S3 permissions that I have not faced with other flows. I'm trying to run the flow below and am getting the error
    Error uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    . If I run the flow w/out the DaskExecutor, it works just fine. I have previously tested using
    dask_cloudprovider.aws.FargateCluster
    directly within a task to create a cluster, send a dask array calculation to the cluster, and tear down all within one Prefect task and that worked fine as well. Any thoughts as to where the permissions error might be coming from? The execution role I specify in the
    cluster_kwargs
    has s3:* permissions.
    Copy code
    import os
    import sys
    from typing import List
    sys.path.append('.')
    import prefect
    from prefect import task, Flow
    from prefect.storage.s3 import S3
    from prefect.run_configs.ecs import ECSRun
    from prefect.executors.dask import DaskExecutor
    from dask_cloudprovider.aws import FargateCluster
    from config.build import ECR_IMAGE
    
    @task(log_stdout=True)
    def prefect_task(n: int) -> int:
        print(f'Running task w/ input {n}')
        time.sleep(2)
        return n * 10
    
    @task(log_stdout=True)
    def reduce_task(ns: List[int]) -> int:
        print(f'Running reduction task')
        return sum(ns)
    
    schedule = None 
    account = os.environ.get("AWSENV")
    
    with Flow("dask_poc", storage=S3(bucket=f'prefect-{account}-us-east-1')) as flow:
        results = prefect_task.map([1,2,3,4,5])
        reduced_result = reduce_task(results)
    
    
    flow.run_config = ECSRun(
        image=ECR_IMAGE,
        execution_role_arn=...,
        task_role_arn=...,
        run_task_kwargs={
            "cluster": "PrefectCluster",
        },
        labels=[account]
    )
    
    
    flow.executor = DaskExecutor(
        cluster_class="dask_cloudprovider.aws.FargateCluster",
        cluster_kwargs={
            "image": ECR_IMAGE, 
            "n_workers": 5,
            "cluster_arn": ...,
            "execution_role_arn": ...,
            "vpc": ...,
            "subnets": [..., ],
            "security_groups": [..., ],
        },
    )
    a
    • 2
    • 5
  • j

    Jovan Sakovic

    01/29/2022, 11:30 AM
    The Slack Notifier Trying to get the error message of a dbt run in Slack, and have set up the state_handler to only handle the
    Failed
    state. But, the DbtShellTask raises the FAIL signal with the message that first occurs
    Command failed with exit code 2
    and the notifier doesn’t get to the Error that I actually want - dbt’s error. Is there a way to push these other error messages to the notifier?
    a
    k
    • 3
    • 10
  • t

    Tara

    01/31/2022, 6:11 AM
    Hi, Im wondering if it’s possible to run multiple flows using only prefect core (and not running prefect-server) ?
    a
    k
    • 3
    • 8
  • b

    Bruno Murino

    01/31/2022, 11:26 AM
    Hi everyone — is it possible to register flows built with Orion on the prefect cloud currently? I know Orion is still in Alpha but would be good to test our deployment strategies and etc if possible
    a
    e
    • 3
    • 4
  • m

    Mary Clair Thompson

    01/31/2022, 1:23 PM
    Hi folks! We're running the Prefect server on a local machine. After the update to Prefect 0.15.12 we've run into an issue where zombie processes are getting spawned by GraphQL.
    ps ef
    yields ~7k zombie processes on the machine; on closer inspection with
    ps -auxwwf
    we're seeing graph-ql as the culprit for all of those hanging processes. This seems to be an issue elsewhere (eg this report), and I'm wondering if there's anything that may have changed in 0.15.12 to cause this? And what mitigation you all would suggest?
    a
    • 2
    • 2
  • t

    Tim Wright

    01/31/2022, 1:50 PM
    Hey community. I was hoping someone could help with a way to generate API Keys from a single admin account? As part of our deployment, we're trying to limit the number of manual steps and would like to be able to create API Keys for our Service Accounts using an automated script. Is this possible? (We're using Prefect Cloud) Thanks in advance for the help.
    a
    • 2
    • 3
  • t

    Tim Wright

    01/31/2022, 3:12 PM
    Another question for early Monday - is there a set of RBAC permissions that would allow a user of Prefect Cloud to be able to deploy flows - and change the flow parameters - but NOT be able to change the flow tags? Additionally, is there any way to limit permissions on a per project basis in Prefect cloud? The use case here is that we currently have two Prefect Cloud accounts - one that we share between our Dev/Tst environment, the other between Stg/Prod environments. Each account has two projects- one project for each environment whose flows are tagged in such a way as to associate them with an Agent running infra for that environment. Wondering what our options are for allowing users to access one project and not the other and also to ensure that a user cannot target Tst infra from Dev project by simply changing the flow tags. The agents themselves are environment specific and targeted via Flow Tags.
    k
    • 2
    • 12
  • s

    shijas km

    01/31/2022, 4:11 PM
    hi I have a flow running in prefect cloud which is having three tasks , suppose if third task fails due to an issue with code, so I correct the code and register the flow again with prefect cloud so now if I restart the flow will it run from third task or restart from beginning ? how its working
    k
    • 2
    • 2
  • k

    Kevin Mullins

    01/31/2022, 5:02 PM
    I’m wanting to make sure I understand best practices for using secrets between tasks. I see in the documentation that it is preferred to use a secret task passed to other tasks. How does this interact with input caching and checkpointing? I want to make sure I handle things correctly so credentials don’t get leaked.
    k
    • 2
    • 3
  • t

    Talmaj Marinč

    01/31/2022, 5:10 PM
    Hi, I’m running some flows on prefect cloud and tasks are running sequentially instead of in parallel. The reason I decided to use prefect is to easily parallelize task execution. I am using
    task.map
    . What am I doing wrong?
    k
    • 2
    • 7
  • d

    Dev Dasgupta

    01/31/2022, 5:13 PM
    Hi All, I'm new to prefect and after making a few refactoring to my existing flow.py. Initially, the folder structure looked the following and it was getting registered successfully (
    prefect register --project my_project -p bar
    )
    Copy code
    foo_root_folder
    ├── README.md
    ├── bar
    │   └── flow.py
    ├── pyproject.toml
    ├── requirements.txt
    ├── setup.cfg
    Since the file
    flow.py
    was cluttered with all utils functions required for the flow execution I refactored it into a separate python package and made calls to those functions in
    flow.py
    Copy code
    from bar.utils import get_some_useful_function
    The resultant folder looked like the following:
    Copy code
    foo_root_folder
    ├── README.md
    ├── bar
    │   ├── __init__.py
    │   ├── utils.py
    │   └── flow.py
    ├── pyproject.toml
    ├── requirements.txt
    ├── setup.cfg
    When I try to register now I am getting error as
    ModuleNotFoundError: No module named 'bar'
    Can anyone please let me know if anyone faced similar issues before and how they resolved it? Thanks
    k
    • 2
    • 2
  • a

    Andrew Lawlor

    01/31/2022, 5:38 PM
    im trying out prefect for our etl pipelines. we use GCP as our cloud provider. Id like to use prefect cloud and run agents in GCP. is the best option for doing this the vertex agent? If I do use that, what is the process for getting started? do i just create the agents at the beginning via cli and then leave them running, or do i have to continuously update them? is there a way to set up ci/cd for my flow code?
    k
    g
    • 3
    • 108
  • k

    Keith Veleba

    01/31/2022, 5:53 PM
    Does anyone know how to run an AWS batch job and wait for completion?
    k
    • 2
    • 13
  • d

    David Yang

    01/31/2022, 6:46 PM
    Hi All, anyone can get flows from Azure DevOps service successfully using git storage? I got this error "Failed to load and execute Flow's environment: HTTPUnauthorized('No valid credentials provided')". It seems that git storage doesn't support private access token to access azure devops? This is my code: flow.storage = Git(     repo="xxxxxxx/prefect/_git/prefect",                            # name of repo     flow_path ="./flows/azuredevops.py",                    # location of flow file in repo     branch_name ="main",     repo_host="xxxxx@dev.azure.com",                    # repo host name     git_token_secret_name ="azuresecret" )
    k
    x
    • 3
    • 14
  • k

    Keith Veleba

    01/31/2022, 7:16 PM
    when you put AWS_CREDENTIALS in via the UI as a Secret, do you use the text or the json options, and do you put it in as a JSON snippet?
    a
    k
    • 3
    • 22
  • c

    Chris Hemphill

    01/31/2022, 8:05 PM
    Hey everyone! Chris Hemphill from Actium Health
    👋 5
    k
    c
    a
    • 4
    • 4
  • d

    Dave

    01/31/2022, 8:21 PM
    Hi team, our team has recently deployed prefect on gcp kubernete. and it keeps saying
    Couldn't connect to *Prefect Server* at *<http://34.82.119.23:4200/graphql>
    ,* can anyone help?
    k
    z
    +2
    • 5
    • 106
1...247248249...509Latest