https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jake

    01/26/2022, 8:02 PM
    Hi everyone, I’m running into this error:
    Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'cfmeta\'",)\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n  - python: (flow built with \'3.6.15\', currently running with \'3.6.8\')\nThis also may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.',)
    I see that it is common and I read this thread. However, our flow needs to use specifically 3.6.15 because some of our dependencies require it and I do not see an agent that matches this version. I saw that script based storage was suggested, but we generate our flows dynamically so that is not an option. Any suggestions on how to resolve this? Thanks in advance!
    a
    4 replies · 2 participants
  • s

    Saurabh Indoria

    01/27/2022, 5:01 AM
    Hi all, In our prefect deployment, we see task logs like.. "Task finished with final status Pending". Does Lazarus process handle this? I have seen this stuck for hours.
    k
    21 replies · 2 participants
  • s

    Sergi de Pablos

    01/27/2022, 6:43 AM
    Hi everyone. I have a process that submits multiple jobs to an external service, and I have to wait for all of them to be finished before proceeding. I've found how to wait in a task for some external process https://github.com/PrefectHQ/prefect/discussions/3944 to be finished but no idea how to wait for multiple ones to be finished. Thanks
    k
    4 replies · 2 participants
  • m

    massumo

    01/27/2022, 2:00 PM
    Hello everybody, I want to this https://github.com/PrefectHQ/prefect/issues/5297. But i didn't see any docs about it . Can i do that? I want to create dynamic workflow with input.
    a
    1 reply · 2 participants
  • a

    Andreas

    01/27/2022, 3:10 PM
    Hi everyone! I've setup a prefect server on a Google compute instance, but I still have problems grasping some concepts: I have the server and local agent running on the GCE instance and the UI is working. I can register and run a flow that resides on the GCE machine without errors. Now I want to setup GitLab as code repository. My initial intuition was: I can probably setup a connection to my Gitlab repo in a config file and then all flows will be synced from there. When I commit something, it will be updated in prefect as well. This is not the case as it seems. I'm trying to wrap my head around how to use the Storage options, but I really don't understand it. On my server, do I have to create python files for each of my flows that reside in Gitlab? What sense would the repository make then? Thanks for your help! I'm really lost here...
    k
    5 replies · 2 participants
  • s

    shijas km

    01/27/2022, 3:37 PM
    Hi I am facing an issue I have created an extraction flow and run it locally , its working fine, this program contains one task and inside this task its calling another function from different python module for ex import abc @task def fun() abc.fun1() assume like above , its working in local but when i register the flow in cloud its failed saying abc not found how can we resolve this should I write all the logic inside task function itself ??
    k
    3 replies · 2 participants
  • s

    Suresh R

    01/27/2022, 4:26 PM
    Hi! In Prefect cloud flow name should be unique across the team or projects?
    a
    k
    3 replies · 3 participants
  • e

    Eli Treuherz

    01/27/2022, 5:14 PM
    I’ve got a task mapping set up, but the child tasks sometimes have to retry until the external data they read is ready. When one of them enters the Retrying state, it seems to hold up the entire flow. The parent doesn’t produce any more tasks until the retrying one is complete. Is there way around this behaviour? Ideally I’d like the parent to spawn all its tasks right away, and for Prefect to identify which ones can be run, rather than one retry blocking everyone else
    k
    3 replies · 2 participants
  • k

    Kevin Mullins

    01/27/2022, 5:20 PM
    Can tasks accept a Callable argument from another task? For example, I have an upstream task that needs to make a decision on whether to use
    function a
    in a downstream task or
    function b
    in a down stream task (essentially deciding on a strategy function or factory function). Could the down-stream task properly get what function to execute from the other? Hopefully this makes sense.
    a
    k
    9 replies · 3 participants
  • m

    Matthew Webster

    01/27/2022, 6:10 PM
    Hi, I’m looking for some guidance on how to use the same flows across multiple projects. For context: we are doing ETL for customers and some flows are identical. We’re trying ECS Agent/Prefect Cloud with a custom Docker image and S3 storage. I am currently registering the flow with the CLI for multiple customers and passing in customer-specific information as Parameters when scheduling the run. This currently breaks down in a few places: 1. Having some problem with Storage and unpicking flows. Is there project-specific info stored there? So when a flow is registered/picked for one project it can’t be unpickled by another? 2. Using secrets. We have some customer-specific API Keys that need to be stored as Secrets and these appear to only be set globally in Prefect Cloud (or AWS SSM). There are no per-project secrets that I can pass in. One option I can think of would be to make a service that takes a Parameter and returns a secret that then gets passed to a task but maybe there’s a better way? I’m still pretty new to Prefect but couldn’t find answers to these. Hoping for some help from the community!
    k
    21 replies · 2 participants
  • s

    Suresh R

    01/27/2022, 6:17 PM
    Hi, Will all mapped task run in paralell, can we put some delay between the mapped tasks
    k
    23 replies · 2 participants
  • v

    Vamsi Reddy

    01/27/2022, 9:03 PM
    Hi everyone, so i have a task that return a list of module_names. However this list is not fixed and can vary in length. I am iterating over this list to get the names and then run those modules in a seperate task. Is it possible to specify the output as a list. I tried using nout =1 but it just outputs 1 element of the list.I get the following error :
    TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
    k
    5 replies · 2 participants
  • p

    Parth Patel

    01/28/2022, 12:36 AM
    Hi all! In one of my prefect tasks I am capturing all stdout because I am importing a class from another module which uses logging.py to log, I am trying to caputure the log statements but this isn't working, any thoughts? None of the log_levels are being captured by the prefect task. Wondering if I'll have to import the prefect logger somehow in this module as well.
    k
    4 replies · 2 participants
  • m

    Miroslav Rác

    01/28/2022, 1:35 PM
    Hi. I have a little problem I cannot wrap my head around. I have created a flow, nothing complicated just “hello world” from getting started docs. I deployed it to Prefect cloud. then started a local agent and tried to “quick run” the flow. everything works, which is great. but then I started a docker agent on my VPS (not locally on the machine where the flow has been created) and tried to run the flow. It wasn’t working. The agent picked out the flow but an error was raised immediately:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/miro/'")
    … Obviously, when I created my flow, my local path has been probably been pickled and so it cannot run on another machine. Is this expected behavior? How can I run flows created on a different machine?
    a
    14 replies · 2 participants
  • b

    Bruno Murino

    01/28/2022, 1:58 PM
    Hi everyone — I have a tiny flow I’m using to start other flows, and I’m passing a “run_name” argument to the “create_flow_run” method but the “run_name” depends on parameter and it’s not working as I expected. Can anyone help? I feel like I need to “delay” the “resolution of what ‘scope’ is” but not sure
    a
    6 replies · 2 participants
  • a

    Andrea Nerla

    01/28/2022, 3:19 PM
    Hi folks, I have a problem of failed code passed as success. For instance in the T part of my ETL( which is simply a os.system...) I've put a test and even if it fails Prefect still declares it a success.
    a
    a
    9 replies · 3 participants
  • d

    David Yang

    01/28/2022, 4:13 PM
    Hi All, I have a flow that will run in DEV,QA and PROD. the flow connects to a database and I saved the credentials info as secrets in Prefect cloud. The secret variable name has surffix for environments. For example: "username_DEV","username_QA" etc. In flow, I use this code to get the password and username: pasword_var = "password_$FLOW_ENV"     snowflake_pass = PrefectSecret(pasword_var)     user_var = "user_$FLOW_ENV"     snowflake_user = PrefectSecret(user_var) and the environment variable is configured in runconfig: runconfig = DockerRun(image="[imagename]",env={"FLOW_ENV": "TEST"}) It returns error that PrefectSecret can't find secret with name "password_$FLOW_ENV" questions: 1: Is this a good approach to get these sensitive data through prefect secrets? 2: I though $[environment variable name] gives me the environment variable value? 3: It seems that PrefectSecret running as a task and doesn't evaluate the parameter value in running time. Is that right?
    a
    8 replies · 2 participants
  • p

    Pedro Machado

    01/28/2022, 4:28 PM
    Hi everyone. What is the best pattern to implement retries/ability to restart when using a flow of flows? I created a function task that combines
    create_flow_run
    and
    wait_for_flow_run
    and I am using this task to start the child flow(s) from the parent. This task is set to retry. Today, I noticed that if the child flow fails, this task won't start a new flow run when it retries. It seems to check for the same
    flow_run_id
    . I suspect it's somehow related to an idempotency key that is set by default. Would it be better to use an idempotency key I explicitly set that will be different across retries? Is there a better way to set this up? The goals are: 1) ability to retry the child flow when it fails and 2) ability to restart the parent flow from failed if needed. Thanks!
    a
    8 replies · 2 participants
  • s

    Suresh R

    01/28/2022, 6:25 PM
    Hi, I raised skip signal for all upstream mapped tasks, now the downstream mapped task is in pending state, What might be the reason
    a
    1 reply · 2 participants
  • j

    Jovan Sakovic

    01/28/2022, 6:41 PM
    👋 Is there any known issues with Prefect and multi-threading? I’m having a difficult time trying to get the logging from the Thread’s target function to actually log in the UI.. Tried a bunch of stuff, and I thought I am now in the right place where I: • only make the logger objects within functions that are tasks (with
    logger = prefect.context.get("logger")
    ) • pass these logger object from function to function to finally get to the one that is running on threads ◦
    extract_thread = Thread(target=extract_messages, name=f"Extractor #1", args=(extract_queue, load_queue, logger))
    k
    5 replies · 2 participants
  • m

    Michael Bell

    01/28/2022, 10:02 PM
    Hi folks. I have a simple example flow that I've written to test using a Dask executor configured to use a Fargate cluster, and I'm encountering an error with S3 permissions that I have not faced with other flows. I'm trying to run the flow below and am getting the error
    Error uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    . If I run the flow w/out the DaskExecutor, it works just fine. I have previously tested using
    dask_cloudprovider.aws.FargateCluster
    directly within a task to create a cluster, send a dask array calculation to the cluster, and tear down all within one Prefect task and that worked fine as well. Any thoughts as to where the permissions error might be coming from? The execution role I specify in the
    cluster_kwargs
    has s3:* permissions.
    import os
    import sys
    from typing import List
    sys.path.append('.')
    import prefect
    from prefect import task, Flow
    from prefect.storage.s3 import S3
    from prefect.run_configs.ecs import ECSRun
    from prefect.executors.dask import DaskExecutor
    from dask_cloudprovider.aws import FargateCluster
    from config.build import ECR_IMAGE
    
    @task(log_stdout=True)
    def prefect_task(n: int) -> int:
        print(f'Running task w/ input {n}')
        time.sleep(2)
        return n * 10
    
    @task(log_stdout=True)
    def reduce_task(ns: List[int]) -> int:
        print(f'Running reduction task')
        return sum(ns)
    
    schedule = None 
    account = os.environ.get("AWSENV")
    
    with Flow("dask_poc", storage=S3(bucket=f'prefect-{account}-us-east-1')) as flow:
        results = prefect_task.map([1,2,3,4,5])
        reduced_result = reduce_task(results)
    
    
    flow.run_config = ECSRun(
        image=ECR_IMAGE,
        execution_role_arn=...,
        task_role_arn=...,
        run_task_kwargs={
            "cluster": "PrefectCluster",
        },
        labels=[account]
    )
    
    
    flow.executor = DaskExecutor(
        cluster_class="dask_cloudprovider.aws.FargateCluster",
        cluster_kwargs={
            "image": ECR_IMAGE, 
            "n_workers": 5,
            "cluster_arn": ...,
            "execution_role_arn": ...,
            "vpc": ...,
            "subnets": [..., ],
            "security_groups": [..., ],
        },
    )
    a
    5 replies · 2 participants
  • j

    Jovan Sakovic

    01/29/2022, 11:30 AM
    The Slack Notifier Trying to get the error message of a dbt run in Slack, and have set up the state_handler to only handle the
    Failed
    state. But, the DbtShellTask raises the FAIL signal with the message that first occurs
    Command failed with exit code 2
    and the notifier doesn’t get to the Error that I actually want - dbt’s error. Is there a way to push these other error messages to the notifier?
    a
    k
    10 replies · 3 participants
  • t

    Tara

    01/31/2022, 6:11 AM
    Hi, Im wondering if it’s possible to run multiple flows using only prefect core (and not running prefect-server) ?
    a
    k
    8 replies · 3 participants
  • b

    Bruno Murino

    01/31/2022, 11:26 AM
    Hi everyone — is it possible to register flows built with Orion on the prefect cloud currently? I know Orion is still in Alpha but would be good to test our deployment strategies and etc if possible
    a
    e
    4 replies · 3 participants
  • m

    Mary Clair Thompson

    01/31/2022, 1:23 PM
    Hi folks! We're running the Prefect server on a local machine. After the update to Prefect 0.15.12 we've run into an issue where zombie processes are getting spawned by GraphQL.
    ps ef
    yields ~7k zombie processes on the machine; on closer inspection with
    ps -auxwwf
    we're seeing graph-ql as the culprit for all of those hanging processes. This seems to be an issue elsewhere (eg this report), and I'm wondering if there's anything that may have changed in 0.15.12 to cause this? And what mitigation you all would suggest?
    a
    2 replies · 2 participants
  • t

    Tim Wright

    01/31/2022, 1:50 PM
    Hey community. I was hoping someone could help with a way to generate API Keys from a single admin account? As part of our deployment, we're trying to limit the number of manual steps and would like to be able to create API Keys for our Service Accounts using an automated script. Is this possible? (We're using Prefect Cloud) Thanks in advance for the help.
    a
    3 replies · 2 participants
  • t

    Tim Wright

    01/31/2022, 3:12 PM
    Another question for early Monday - is there a set of RBAC permissions that would allow a user of Prefect Cloud to be able to deploy flows - and change the flow parameters - but NOT be able to change the flow tags? Additionally, is there any way to limit permissions on a per project basis in Prefect cloud? The use case here is that we currently have two Prefect Cloud accounts - one that we share between our Dev/Tst environment, the other between Stg/Prod environments. Each account has two projects- one project for each environment whose flows are tagged in such a way as to associate them with an Agent running infra for that environment. Wondering what our options are for allowing users to access one project and not the other and also to ensure that a user cannot target Tst infra from Dev project by simply changing the flow tags. The agents themselves are environment specific and targeted via Flow Tags.
    k
    12 replies · 2 participants
  • s

    shijas km

    01/31/2022, 4:11 PM
    hi I have a flow running in prefect cloud which is having three tasks , suppose if third task fails due to an issue with code, so I correct the code and register the flow again with prefect cloud so now if I restart the flow will it run from third task or restart from beginning ? how its working
    k
    2 replies · 2 participants
  • k

    Kevin Mullins

    01/31/2022, 5:02 PM
    I’m wanting to make sure I understand best practices for using secrets between tasks. I see in the documentation that it is preferred to use a secret task passed to other tasks. How does this interact with input caching and checkpointing? I want to make sure I handle things correctly so credentials don’t get leaked.
    k
    3 replies · 2 participants
  • t

    Talmaj Marinč

    01/31/2022, 5:10 PM
    Hi, I’m running some flows on prefect cloud and tasks are running sequentially instead of in parallel. The reason I decided to use prefect is to easily parallelize task execution. I am using
    task.map
    . What am I doing wrong?
    k
    7 replies · 2 participants
Powered by Linen
Title
t

Talmaj Marinč

01/31/2022, 5:10 PM
Hi, I’m running some flows on prefect cloud and tasks are running sequentially instead of in parallel. The reason I decided to use prefect is to easily parallelize task execution. I am using
task.map
. What am I doing wrong?
k

Kevin Kho

01/31/2022, 5:10 PM
The default executor is a sequential executor that does not parallelize. You need to set the LocalDaskExecutor.
flow.executor = LocalDaskExecutor()
Docs is here
t

Talmaj Marinč

01/31/2022, 5:11 PM
Does it work also with
DaskExecutor
?
k

Kevin Kho

01/31/2022, 5:12 PM
Yes if you have a cluster. Yes you need LocalDask or Dask to parallelize
t

Talmaj Marinč

01/31/2022, 5:28 PM
when I do that and re-register the flow to the prefect-cloud I get `Skipped (metadata unchanged)`and still seeing sequential execution in the prefect cloud ui.
k

Kevin Kho

01/31/2022, 5:29 PM
If you register using the CLI, you can force the change with
prefect register -f ..
t

Talmaj Marinč

01/31/2022, 5:50 PM
thanks.
View count: 1