https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • x

    xyzz

    07/20/2022, 8:49 AM
    Regarding Orion: The concept docs contain the following example for RemoteFileSystem:
    fs = RemoteFileSystem(basepath="<s3://my-bucket/folder/>")
    fs.write_path("foo", b"hello")
    fs.save("dev-s3")
    What das fs.write_path do? Store actual content on the fs? Also, is the secret key stored in the prefect database if you pass them to the settings parameter of RemoteFileSystem and then call fs.save? If so, how is ensured only I can read it and not e.g. a admin at prefect?
    a
    b
    • 3
    • 5
  • a

    Andreas Nigg

    07/20/2022, 10:54 AM
    Hey, I'm using the latest prefect 2.0 b9 and when running "prefect storage ls" all my created blocks are shown - not only storage blocks. Is this expected? (It's actually not a blocking problem, just wanted to report, if not already known).
    🙌 1
    ✅ 1
    a
    b
    l
    • 4
    • 8
  • i

    Isara Ovin

    07/20/2022, 11:30 AM
    Im trying to use local dask executor and add task dependency from the previous task like below (Using prefect 1) but running this gives me and error saying
    add_json_index() missing 1 required positional argument: 'output'
    output = filter_empty_responses(data)
    
        # Parsing Jsons
        indexed_output = add_json_index(output, upstream_tasks=[filter_empty_responses])
    a
    • 2
    • 1
  • d

    David

    07/20/2022, 11:35 AM
    Hello, I am trying to use the vertex agent unfortunately I keep getting
    400 List of found errors:	1.Field: job_spec.worker_pool_specs[0].container_spec.env[12].value; Message: Required field is not set.	2.Field: job_spec.worker_pool_specs[0].container_spec.env[3].value; Message: Required field is not set.	3.Field: job_spec.worker_pool_specs[0].container_spec.env[4].value; Message: Required field is not set.	 [field_violations {
      field: "job_spec.worker_pool_specs[0].container_spec.env[12].value"
      description: "Required field is not set."
    }
    field_violations {
      field: "job_spec.worker_pool_specs[0].container_spec.env[3].value"
      description: "Required field is not set."
    }
    field_violations {
      field: "job_spec.worker_pool_specs[0].container_spec.env[4].value"
      description: "Required field is not set."
    }
    ]
    ✅ 1
    a
    • 2
    • 8
  • c

    Chu

    07/20/2022, 12:56 PM
    Hi community, if we do not use flow of flows framework, is there any downside? Basically from my understanding flow of flows is to use one single flow (orchestrator) to manage other flows (dependency, schedule etc.) Instead of defining those relationship in each flow's script?
    s
    a
    • 3
    • 10
  • m

    Matthew Seligson

    07/20/2022, 1:35 PM
    In Orion, what goes into the serialized representation of a flow when it’s registered? How can the DAG of a flow be visualized before the flow is actually executed?
    k
    • 2
    • 3
  • h

    haris khan

    07/20/2022, 2:00 PM
    i have a flow in which there are numerous subflows and the dependencies of subflow B are depended on subflow A . is there a way to use subflowB.set_upstream(subflow_A) in prefect 2.0?
    k
    m
    • 3
    • 7
  • m

    Matt Delacour

    07/20/2022, 2:37 PM
    Hi, I am running into issues with
    LocalDaskExecutor
    and I cannot understand the problem. I am trying to run parallel calls for Rest API endpoints. The problem is that the Dask executor will run some calls twice for no apparent reason. Here is the logic I have Then I can see that some Dask tasks run on the same endpoint while when I log all the endpoints, there are all unique... I will post the logic as a snippet in the thread 🧵
    k
    • 2
    • 12
  • d

    Denys Volokh

    07/20/2022, 2:39 PM
    Hi Community, could you please help me to understand where should I place GITHUB_ACCESS_TOKEN? I have running agent with kube and defined there environment variable
    PREFECT__CONTEXT__SECRETS_GITHUB_ACCESS_TOKEN
    Registered flow with GitHubStorage
    flow.storage = GitHub(
        repo="company/prefect-workflows",
        path="flows/benchmarks/flow_import_index_data.py",
        ref="master",
        access_token_secret="GITHUB_ACCESS_TOKEN",
    )
    but when I run flow from cloud.prefect.com I am getting error
    Failed to load and execute flow run: ValueError('Local Secret "GITHUB_ACCESS_TOKEN" was not found.')
    k
    • 2
    • 2
  • a

    alex

    07/20/2022, 2:46 PM
    I have been running into an issue with Prefect cloud 1.0 where my flow and task concurrency slots are being used up while the UI reports that no flows are running. There are also flows that have been "cancelled" using the UI and still show up as greyed out boxes with "cancelling..." written for many days. Has anyone run into this issue before?
    k
    a
    • 3
    • 7
  • r

    Riccardo Tesselli

    07/20/2022, 3:05 PM
    hello guys. I’m looking for a suggestion about Prefect 2.0 based on your experience. I was thinking which should be the ideal use case for workspaces. Initially I thought that workspaces could be useful to differentiate between enviroments, such as
    development
    and
    production
    , so to have a dedicated workspace for each environment. Then, after exploring Prefect 2.0 features, I’ve started to question this because one could use one workspace and setup everything in order to distinguish between development and production pipelines. So I wonder, what do you suggest for managing enviroments? Go with different workspaces or have one workspace? Then, what should be the ideal use case for a workspace?
    j
    t
    d
    • 4
    • 4
  • c

    Christian Vogel

    07/20/2022, 3:51 PM
    Hi Prefect Community. I am currently trying to run a prefect flow using the DaskTaskRunner on a local kubernetes cluster. Unfortunately the flow is failing because of the following error:
    File "/opt/conda/lib/python3.8/site-packages/distributed/worker.py", line 2742, in loads_function result = pickle.loads(bytes_object) File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 73, in loads return pickle.loads(x) ModuleNotFoundError: No module named 'prefect'
    When entering the container, prefect seems to be available though. Do you have any idea what could be the reason?
    k
    • 2
    • 11
  • j

    Jason

    07/20/2022, 4:24 PM
    I wanted to double-check before drafting up the wrong solution, but it looks like a flow of flows doesn't need to be in the same project as long as create_flow_run can find the flow_name within a project? https://www.prefect.io/guide/blog/flow-of-flows-orchestrating-elt-with-prefect-and-dbt/. We're considering organizing all of our API calls in a specific project and DBT flows within a DBT Project.
    k
    • 2
    • 1
  • s

    Sam Maradwi

    07/20/2022, 4:29 PM
    Hi Prefect community, I am facing issue while registering flows in prefect and getting below error, Can anyone please help me with this.:
    botocore.exceptions.ClientError: An error occurred (AccessDeniedException) when calling the GetSecretValue operation: User: arn:aws:sts::xxx:assumed-role/code_deployments-role/iddoc- is not authorized to perform: secretsmanager:GetSecretValue on resource: 4i-adl-config because no identity-based policy allows the secretsmanager:GetSecretValue action
    c
    k
    • 3
    • 13
  • m

    Matt Delacour

    07/20/2022, 6:15 PM
    👋 Is anyone scheduling DBT with Prefect here? How do you deal with dependencies (upstream & downstream)? Do you handle the logic of "This model needs to run before this other model"? On our side, we want to leverage the DBT built-in features as well as the visualization of Prefect with 1 DBT model == 1 Prefect flow
    b
    e
    +2
    • 5
    • 12
  • c

    Chris Reuter

    07/20/2022, 7:00 PM
    Hi all! Come join @Khuyen Tran, @Chris White and myself on PrefectLive starting now!
    ❤️ 2
    🙌 1
    🎉 1
  • t

    Tim Enders

    07/20/2022, 8:30 PM
    Question on Prefect 2.0... how is work on a
    map
    operator coming? That is a big stumbling block for us adopting 2.0. Thanks!
    k
    • 2
    • 6
  • k

    kiran

    07/20/2022, 9:46 PM
    Hi y’all. I’m trying to sign up for a 2.0 cloud account with my work email (I already have a cloud 1.0 account). I’m getting the error
    AxiosError: Request failed with status code 401
    . So I thought maybe I could sign in with my 1.0 credentials and got
    Error: Invalid username or password.
    Am I unable to sign up for 2.0 if I also have 1.0?
    ✅ 1
    k
    n
    • 3
    • 3
  • m

    Mansour Zayer

    07/20/2022, 10:48 PM
    Hello. I'm using
    subprocess
    to run my dbt project locally (Prefect 1.2.2, Windows). I create my command (
    dbt run --vars '{data_processing_start_date: 2022-07-20, data_processing_end_date: 2022-07-20}' --profiles-dir ./
    ) like this:
    command = (
            f"dbt run --vars '{{"
            f"data_processing_start_date: {data_processing_start_date}, "
            f"data_processing_end_date: {data_processing_end_date}}}' --profiles-dir ./ "
        )
    The command is created correctly, but dbt gives me this error
    dbt: error: unrecognized arguments: 2022-07-20, data_processing_end_date: 2022-07-20}'
    Seems like dbt interprets
    2022-07-20
    as an argument instead of the value for
    data_processing_start_date
    variable. Keep in mind that when I run the same command in my CLI, dbt works fine. But when it's provided to dbt through
    subprocess
    this occurs. This is my subprocess:
    subprocess.run(
            command,
            check=True,
            stderr=True,
            stdout=True,
            shell=True,
            cwd="dbt",
        )
    Any idea what might cause this, and how to solve this? Thank you
    ✅ 1
    a
    • 2
    • 4
  • r

    R

    07/20/2022, 11:16 PM
    Hello! Could you please check if this issue was fixed - {https://github.com/PrefectHQ/prefect/issues/5663} Thanks!
    ✅ 1
    a
    • 2
    • 9
  • p

    Priyank

    07/21/2022, 7:19 AM
    Hi there! Sometimes our tasks get hung (attaching screenshots for the same), they remain in pending state when their upstream tasks are completed hours before and even after cancelling/restarting em, they won't start remain in pending state. This is happening with us like 3rd time, any possible reasons why it happens? or it is a bug in prefect 1.0?
    ✅ 1
    a
    • 2
    • 2
  • s

    Stefan

    07/21/2022, 8:39 AM
    Did something change in 1.0? My Agent died, and this is how I always started it.
    ✅ 1
    a
    • 2
    • 1
  • d

    Denys Volokh

    07/21/2022, 9:35 AM
    Hi Community! Is there a way to deploy the whole repo and not just 1 flow file with GitHub storage?
    ✅ 1
    a
    • 2
    • 8
  • v

    Vadym Dytyniak

    07/21/2022, 10:07 AM
    Hi. How to pass job template in KubernetesRun as python dict?
    {
                'apiVersion': 'batch/v1',
                'kind': 'Job',
                'spec': {
                    'template': {
                        'spec': {
                            'nodeSelector': {
                                "<http://topology.kubernetes.io/zone|topology.kubernetes.io/zone>": "us-east-1a",
                                "<http://dask.corp.com/subnet-type|dask.corp.com/subnet-type>": "private",
                                "<http://dask.corp.com/storage|dask.corp.com/storage>": 'true',
                            },
                            'containers': [
                                {'name': 'flow'},
                            ]
                        }
                    }
                }
            }
    s
    • 2
    • 2
  • r

    Rajvir Jhawar

    07/21/2022, 10:48 AM
    Hi prefect team, Is there a way in prefect to set a "universal retry" policy for all flows in prefect 2.0? For instance if you didn't specify a retry option in your code it would automatically take this universal setting. If a retry policy is set in code then that would override the universal setting. Second question is there a way to set unlimited retries, besides just setting a really big number for "retries" parameter?
    ✅ 1
    a
    • 2
    • 4
  • m

    Mahesh

    07/21/2022, 12:24 PM
    Hello Team, I am using prefect 1.x. What is the best practice for prefect flow code deployment with git repo version control instead of keep registering flow on code changes.
    ✅ 1
    a
    • 2
    • 1
  • p

    Prass

    07/21/2022, 12:27 PM
    Hi folks, 1. Is there any way in prefect 2.0 I can say do
    task1
    . But do
    task2
    only if
    task1
    is completed, and
    task3
    only if
    task1
    and
    task2
    are complete? 2. Does prefect 2.0 parallelize flows over
    async for
    s? 3. I write (append) to a file in one of my tasks. Is that
    prefect
    safe?
    ✅ 1
    a
    • 2
    • 1
  • j

    Justin Trautmann

    07/21/2022, 12:34 PM
    hey prefect team, i'm excited about the new 2.0 releases and was wondering if there is already a release note for 2.0b10 available. which version is running in prefect cloud and are there any breaking changes? unfortunately the
    admin/version
    api route doesn't work for me. thanks a lot.
    ✅ 1
    a
    • 2
    • 2
  • c

    Chu

    07/21/2022, 12:59 PM
    Hi Community, using Prefect to schedule dbt jobs, is it necessary to have run_id as a variable in dbt run command?
    ✅ 1
    a
    • 2
    • 1
  • t

    Toby Rahloff

    07/21/2022, 2:19 PM
    Hi, community ✌️ After upgrading to prefect==2.0b10, I get the following error:
    KeyError: "No class found for dispatch key 'S3 Storage:sha256:68ed [...]' in registry for type 'Block'."
    . The full error trace and code can be found in the first comment to this post.
    ✅ 1
    👀 1
    k
    a
    • 3
    • 9
Powered by Linen
Title
t

Toby Rahloff

07/21/2022, 2:19 PM
Hi, community ✌️ After upgrading to prefect==2.0b10, I get the following error:
KeyError: "No class found for dispatch key 'S3 Storage:sha256:68ed [...]' in registry for type 'Block'."
. The full error trace and code can be found in the first comment to this post.
✅ 1
👀 1
flow.py
Untitled.Nginx
k

Kevin Kho

07/21/2022, 2:21 PM
Are you on Cloud or local Orion?
t

Toby Rahloff

07/21/2022, 2:23 PM
Cloud
k

Kevin Kho

07/21/2022, 2:27 PM
Ok will ask the team about this
t

Toby Rahloff

07/21/2022, 2:28 PM
Thanks 🙌 !
k

Kevin Kho

07/21/2022, 3:00 PM
You might need to re-create the storage on this one
a

Anna Geller

07/21/2022, 3:15 PM
to explain why: recreating storage will ensure you're using the most recent block schema version and should fix your issue
🙏 1
t

Toby Rahloff

07/22/2022, 6:51 AM
OK, thanks for the explanation! 🙌
🙌 1
View count: 4