https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • n

    Nikita Samoylov

    08/05/2022, 11:05 AM
    Hello community 😛arty-parrot:, Suppose we have 1 queue and 5 agents working with this queue. We can set concurrency-limit for queue. Can we set concurrency limit for each agent to be sure that all agents work with 1 flow only at the same time ? 🤷‍♂️
    👀 1
    m
    2 replies · 2 participants
  • m

    Milan Valadou

    08/05/2022, 12:47 PM
    Hi Trying to create a deployment per the instructions on https://docs.prefect.io/tutorials/deployments/, I was a bit confused by the sentence “and the name of the entrypoint flow function, separated by a colon.“. I thought it was possible to specify the name of the flow (given inside the decorator) after the colon, whereas you need to specify the name of the function that contains the main flow. Perhaps the sentence should read something like “and the name of the function containing the main flow, separated by a colon.” 🙂
    k
    1 reply · 2 participants
  • m

    Muddassir Shaikh

    08/05/2022, 1:49 PM
    can someone help with his error
    k
    3 replies · 2 participants
  • f

    Florian Guily

    08/05/2022, 1:55 PM
    Hey, since yesterday, i try to run an etl flow with prefect 1 running on a eks fargate cluster. I tried it 2 times and it always exits with a k8 error 255 after about an hour of runtime. I don't really have any idea on why it is happenning as i'm fairly novice with k8's and aws
    👀 1
    m
    c
    5 replies · 3 participants
  • v

    Viet Nguyen

    08/05/2022, 2:04 PM
    Hi all, I'm trying to use
    prefect-email
    to send some dummy test email (I know there's prefect notifications features) but I want test out
    prefect-email
    as well, got this error everytime, hard coded password for dummy test, can't be a wrong password 🤔
    smtplib.SMTPAuthenticationError: (535, b'5.7.8 Username and Password not accepted. Learn more at\n5.7.8  <https://support.google.com/mail/?p=BadCredentials> d6-20020a170903230600b0016efc27ca98sm3023696plh.169 - gsmtp'
    Thank you
    k
    a
    9 replies · 3 participants
  • c

    Chu

    08/05/2022, 2:17 PM
    Hi, a simple question 🙂 is there a way to register the flow one by one in the correct order?
    👀 1
    b
    1 reply · 2 participants
  • e

    Evan Curtin

    08/05/2022, 2:22 PM
    hey all, I want to implement custom
    Result
    like implementation for 2.0, but I can’t find anything in the docs. Closest thing I am seeing is
    FileSystems
    but I don’t see example usage of passing data between tasks using a custom persistence layer
    👀 1
    b
    k
    +1
    9 replies · 4 participants
  • t

    Tony Yun

    08/05/2022, 3:35 PM
    Hi- what’s the best practice to store a file in local storage and process in the same task? I tried to write a file and send it, but it only works locally. When I deployed it to run on the k8s cluster, it always report this error of
    file not found
    . So I cannot store it in
    /tmp
    and process later in this task?
    k
    3 replies · 2 participants
  • k

    Keith

    08/05/2022, 3:51 PM
    Hi, trying to upload files generated from my extraction step to GCS and found the examples in
    prefect-gcp
    to do it, but when I combine this with block information it seems like the info is not in the correct format.
    gcs_block = GCS.load("gcs-dev")
    
    @flow()
    def example_cloud_storage_upload_blob_from_file_flow():
        gcp_credentials = GcpCredentials(service_account_info=gcs_block.service_account_info)
        test_upload_file = "test_upload.txt"
        blob = cloud_storage_upload_blob_from_file(test_upload_path, gcs_block.bucket_path, "test_upload.txt", gcp_credentials)
        return blob
    a
    5 replies · 2 participants
  • s

    Seth Goodman

    08/05/2022, 4:31 PM
    Hi All - I have a question about the expected behavior of flows when using task mapping. When I use "apply_map" to map ~1500 tasks what I see in the UI is only a set of 8 "Constant[list][x]" tasks as in the screenshot below. I originally thought perhaps this was somehow tied to use of DaskExecutor but the number of "Constant" tasks is not the same as the number of dask workers I have running. In addition, each Constant task shows the same x/1500 task running, rather than a subset. Based on runtime, it seems plausible that each Constant task is processing all the data. Still doing testing to confirm this but it felt like I was doing something wrong that would be clear to more experienced users. I've included some simple code to represent my implementation below as well. Thanks for the help!
    @task
    def actual_task(arg1, arg2, arg3):
       #does stuff
    
    task_list = [
        (1, "a", "b"),
        (2, "c", "d"),
        (3, "e", "f"),
        (4, "g", "h"),
    ]
    
    def task_map(task):
        return actual_task(task[0], task[1], task[2])
    
    with Flow("my_flow") as flow:
        task_results = apply_map(task_map, task_list)
  • b

    Bruno Grande

    08/05/2022, 6:07 PM
    👋 Hello, everyone! I’m new to Prefect 2.0 and I’m trying to figure out the best way to tackle my (likely unconventional) data pipeline. Briefly, I have a pipeline for processing the files in a manifest, and the number of manifests will grow over time. Each manifest should be processed only once unless it’s updated. I’m wondering how to best handle the dynamic nature of my inputs (i.e. the file manifests) and limit the processing of each manifest to once per update. 🧵 I can elaborate a bit more in the thread.
    ✅ 1
    k
    7 replies · 2 participants
  • v

    Viet Nguyen

    08/05/2022, 6:15 PM
    So I have my NetCDF files to Zarr pipeline orchestrated by Prefect to work smoothly, from firing up a temporary Fargate cluster until shutting down the EC2 instance , but I have one question I wonder
    ✅ 1
    a
    8 replies · 2 participants
  • r

    Rajvir Jhawar

    08/05/2022, 6:17 PM
    @Anna Geller is there any update on this request. I took at discourse and i didn't see any topics related to it, maybe I missed them. I have very similar use case to it.
    ✅ 1
    a
    2 replies · 2 participants
  • j

    John Kang

    08/05/2022, 7:25 PM
    Question, I'm trying to debug one of my functions that I've decorated with the task decorator, but I have the below error. I try to call the task with
    task.fn(function_to_call())
    but that doesn't work as I get this error:
    AttributeError: 'function' object has no attribute 'fn'
    `RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use
    task.fn()
    .`
    ✅ 1
    n
    5 replies · 2 participants
  • a

    Andrew Richards

    08/05/2022, 7:37 PM
    Is there a way to perform retries with prefect-shell tasks? I'm using prefect version 2.0.2 and prefect-shell version 0.1.1. Supplying the
    retries
    parameter to the flow itself doesn't appear to work when I deliberately supply a bad shell command.
    ✅ 1
    a
    2 replies · 2 participants
  • j

    Javier Ochoa

    08/05/2022, 7:39 PM
    Hello, I have a problem, I am using prefect with python and cloud environment. when I try to register workflows to Prefect Cloud with AWS s3 method, they are "registered" but in prefect, the version does not change (it keeps the version at 1 even when I registered 3 times): This is causing issues with code sync, meaning that the agent has the newest code, but the flow runs a different version or something
    flow.storage = S3(
       bucket=DEPLOYMENT_BUCKET, stored_as_script=False, add_default_labels=False
    )
    flow.register(
       PROJECT_NAME,
       add_default_labels=False,
       idempotency_key=flow.serialized_hash(),
    )
    ✅ 1
    a
    2 replies · 2 participants
  • b

    Bruno Grande

    08/05/2022, 8:29 PM
    Should there be a
    .submit
    after my selection in the attached screenshot? This comes up in the docs here. I thought you needed to use
    .submit()
    in order to obtain a future. Just wanted to check if this is a typo.
    m
    m
    5 replies · 3 participants
  • c

    Corris Randall

    08/05/2022, 8:41 PM
    So since 2.0 was released, I thought I’d play around with it a little more seriously…. first question… Can we write our own NotificationBlock implementations? or, are there any instructions or examples? I made a test one called “my-email”, but I get an error “No class found for dispatch key ‘my-email’ in registry for type ‘Block’.” when it triggers ( I was able to add the block, and add a notification using that block, but when it fires, that’s the error I get. I register the block with prefect block register --file myemail.py then add [“notify”] to the block_document row manually in the db.
    from typing import Optional
    from prefect.utilities.asyncutils import sync_compatible
    from prefect.blocks.notifications import NotificationBlock
    
    class MyEmail(NotificationBlock):
    
        _block_type_name = "My Email"
        _block_type_slug = "my-email"
        _block_schema_capabilities = ["notify"]
        
        @sync_compatible
        async def notify(self,body: str,subject: Optional[str] = None):
            await print( f"In my email notify subject: {subject}\nbody: {body}" )
    a
    j
    5 replies · 3 participants
  • k

    Kevin Grismore

    08/05/2022, 9:18 PM
    having trouble running my gcs-stored flows on kubernetes. I feel like it probably has something to do with how my project is structured:
    - project
            └── flows
                └── flow1.py
                └── flow2.py
            └── util
                └── util.py
    if I do
    some/dir/project> prefect deployment build flows/flow1.py:flow_func -n my-flow -ib kubernetes-job/my-job -sb gcs/my-bucket -t k8s
    everything in src ends up in my bucket as expected, but when I run the flow I get:
    FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows/flow1.py'
    a
    c
    +1
    9 replies · 4 participants
  • k

    Keith

    08/06/2022, 12:59 AM
    Have a general question about migrating from Prefect 1.0 to 2.0. In 1.0 there was a generic
    upstream_tasks
    parameter that you could pass to tasks so that each task knew to wait for the previous one to run. Through my reading of the documentation it seems like this is not necessary anymore b/c everything should run like it would in Python so it basically defaults to a sequential executor. Is this the correct logic? Obviously this story changes a bit when adding in the different
    Task Runners
    but just wanted to confirm that using default code blocks that tasks run in sequence and won't run the next task until the previous one is complete.
    ✅ 3
    n
    5 replies · 2 participants
  • b

    Benoit Chabord

    08/06/2022, 7:17 AM
    Hey team, I am doing an RFP for a big company and I am going to use Prefect for the system integration. Is there any resources already existing for this kind of documents? (I am already referencing the case study page) Executive summary, list of clients, key features from a business point of view. I am writing my own from scratch of it but if there is already something existing that would be greatly appreciated.
    ✅ 1
    a
    1 reply · 2 participants
  • j

    Jan Domanski

    08/06/2022, 10:24 AM
    Hi there, i’m having some issues with flow deployments, with an S3 block. My prefect agent pickups the flow run, starts the flow run but fails to get the flow
    10:21:24.290 | INFO    | prefect.agent - Submitting flow run 'cfc4f262-4f05-4685-882e-364192297107'
    10:21:24.474 | INFO    | prefect.infrastructure.process - Opening process 'blond-mammoth'...
    10:21:24.482 | INFO    | prefect.agent - Completed submission of flow run 'cfc4f262-4f05-4685-882e-364192297107'
    10:21:27.334 | ERROR   | Flow run 'blond-mammoth' - Flow could not be retrieved from deployment.
    Traceback (most recent call last):
      File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
        flow = await load_flow_from_flow_run(flow_run, client=client)
      File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
        return await fn(*args, **kwargs)
      File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/deployments.py", line 47, in load_flow_from_flow_run
        await storage_block.get_directory(from_path=None, local_path=".")
      File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/filesystems.py", line 373, in get_directory
        return await self.filesystem.get_directory(
      File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/filesystems.py", line 251, in get_directory
        return self.filesystem.get(from_path, local_path, recursive=True)
      File "/opt/micromamba/envs/main/lib/python3.8/site-packages/fsspec/spec.py", line 801, in get
        self.get_file(rpath, lpath, **kwargs)
      File "/opt/micromamba/envs/main/lib/python3.8/site-packages/fsspec/spec.py", line 769, in get_file
        outfile = open(lpath, "wb")
    FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp3er_ugnvprefect/S3-BUCKET-NAME/alpha/flow.py'
    10:21:27.727 | INFO    | prefect.infrastructure.process - Process 'blond-mammoth' exited cleanly.
    ... 
    $ aws s3 ls <s3://S3-BUCKET-NAME/alpha/>
    2022-08-06 10:20:49       6473 flow.py
    2022-08-06 10:20:49       3204 example_flow-manifest.json
    Created via
    # prefect deployment build ./flow.py:example_flow --name example-flow-alpha --tag alpha --storage-block s3/S3-BUCKET-NAME
    # prefect deployment apply example-flow-alpha.yaml
    Had mixed luck reading and searching similar posts with this error message
    ✅ 1
    a
    j
    11 replies · 3 participants
  • r

    Rio McMahon

    08/06/2022, 11:53 PM
    Hi there - I am trying to implement a recursive flow pattern in prefect 2.0; any tips on how to do this? I’ve seen some methods that leverage async in the code contest submissions but am curious if it is possible to implement it without async? I’ve included my attempts in the comments.
    n
    6 replies · 2 participants
  • y

    Yardena Meymann

    08/07/2022, 7:09 AM
    Hi, I am using Prefect 1.2.1, how can I obtain the location of the result of the previous task (that uses GCSResult) - I want to pass the location of the data, not the data itself to the next task?
    ✅ 1
    a
    3 replies · 2 participants
  • v

    Viet Nguyen

    08/07/2022, 1:57 PM
    Not sure if it's just me, but I find it's very disorganising when subflows showing up the same level with main flows in "Flows" section in the UI, it's just like sub-folders listing at the same level with root folder. My main flow creates multiple subflows, everytime main flow runs the UI gets ugly. It would be great if the UI just displays main flow, then there's an option to show its subflows. When I need to delete a main flow, all its subflows will be deleted too, rather than deleting main flow, then subflows, one by one. Something like this ...
    ✅ 1
    a
    m
    3 replies · 3 participants
  • h

    Hafsa Junaid

    08/07/2022, 8:48 PM
    How can we create block from python code? for prefect 2.0 UI
    a
    1 reply · 2 participants
  • r

    Rajvir Jhawar

    08/08/2022, 2:28 AM
    Does one have the ability to add a description to a flow? None of the API calls give you the ability to add a description to a flow. Even in the UI the flow page is essentially just a hyperlink to the deployment page. Are there any restrictions based on the style of doc strings used?
    👀 1
    b
    1 reply · 2 participants
  • f

    Felix Sonntag

    08/08/2022, 7:46 AM
    Hey, I was wondering on how to deal/set more complex parameter structures in Prefect Orion. E.g. when I have lists ore nested models, I can’t set them on the UI at all.
    ✅ 1
    a
    m
    8 replies · 3 participants
  • v

    Vadym Dytyniak

    08/08/2022, 7:49 AM
    Hi. We moved from ECS to Kubernetes run and sometimes we see non-descript flow failure. Is it possible to stream container logs and see the reason in the cloud logs?
    m
    1 reply · 2 participants
  • j

    jaehoon

    08/08/2022, 9:01 AM
    Hello everybody. I was using the gitlab storage setting in version 1. However, it seems that version 2 is not supported. Do you know how? If there is no way, when will the gitlab storage be updated?
    ✅ 1
    a
    a
    +1
    14 replies · 4 participants
Powered by Linen
Title
j

jaehoon

08/08/2022, 9:01 AM
Hello everybody. I was using the gitlab storage setting in version 1. However, it seems that version 2 is not supported. Do you know how? If there is no way, when will the gitlab storage be updated?
✅ 1
a

Anna Geller

08/08/2022, 9:21 AM
We want to integrate with Gitlab to allow you to easily push your code to your repo and then trigger a CI/CD pipeline deploying your flow from there. So while it's not yet decided whether we may support that as storage, we will certainly provide integration with Gitlab, e.g. via CI/CD recipe
🙌 1
a

Anton L.

08/08/2022, 3:50 PM
but what do you offer instead of gitlab storage not and in future?
a

Anna Geller

08/08/2022, 7:42 PM
The file system block docs list available storage options
a

Anton L.

08/08/2022, 9:14 PM
DockerStorage is also off the table, right?
a

Anna Geller

08/09/2022, 10:29 AM
As a concept yes, but as a problem to be solved and as a feature, not at all. You should see a way of packaging both flow code and dependencies into a single image within 1-2 weeks
🎉 2
b

Ben Hammond

08/31/2022, 2:24 AM
I saw that a GitHub storage block was introduced — particularly one that can be used for deployments. In the context of that, would that create precedent/space for the contribution of a GitLab block?
a

Anna Geller

08/31/2022, 5:01 AM
In theory yes but no plans to add that atm, GitHub is mainly meant for incremental adoption to get started easily, for production we still recommend remote storage blocks or packaging code into container image
👍 1
b

Ben Hammond

08/31/2022, 6:28 AM
Incremental adoption makes sense. I have seen elsewhere where you’ve talked about the emphasis on production deployment done using other storage methods (I.e., we are using SMB - since my organization doesn’t provide us much access to cloud storage services - including GitHub). With regards to incremental adoption, I think GitLab makes sense in a similar way to GitHub, since it’s used by such a broad business user base (just less visible, since so much of it is on-premise and being firewalls). For that reason, and prior to seeing your response, I did experiment and create a proof of concept GitLab storage block based on the GitHub one that works (including authentication to a private repo). That said, I certainly have no issues shelving the idea. I definitely appreciate you humoring my less-common use case scenarios. EDIT — I also just noticed the newer Issue templates introduced a little while back. I can move these kinds of feature and enhancement questions there from now on, since it seems like that’s the intention with those, rather than bothering you here about it.
a

Anna Geller

08/31/2022, 10:15 AM
Thanks for explaining more. How are you using Prefect? if you are running your agent e.g. on an on-prem VM, then you don't need storage at all, you can use this local file system on the VM. Similarly, if you don't want to use remote storage blocks, I think the Docker image UX shown here might be a really good option https://medium.com/the-prefect-blog/prefect-2-3-0-adds-support-for-flows-defined-in-docker-images-and-github-repositories-79a8797a7371
not discouraging GitLab block but I'd love to persuade you that there are some potentially much nicer and robust ways to tackle the problem of "I want to run deployments without remote storage blocks"
🙌 1
b

Ben Hammond

08/31/2022, 1:01 PM
Thanks for the response! My thought was actually more on the contribution level. We are currently using Prefect 1 with Docker for our run time environments. Most of our flows are pulled from an on-premise GitLab and run in shared images. A handful of complex flows are using dedicated images where the flow is baked into the image itself (only because Prefect 1 Git[Lab] storage won’t allow easy use of a submodules. I’m currently working on our move to Prefect 2, and for that we are continuing to use Docker as our runtime, but are moving to SMB storage for deployments (like I said, I like what you’ve said elsewhere about version control/CICD tools remaining version control/CICD tools). We can’t use cloud storage, so it was definitely beneficial that I was encouraged to contribute the SMB block to Prefect (whether or not you remember - you engaged with me about this on a Slack account associated with my work’s organization - trying to consolidate my Prefect community interaction away from multiple usernames🙂). My thought on the GitLab block was mainly on the contribution level itself, not for use as a primary storage option ourselves (though, options are nice, and there are instances where it might be useful for dev/testing/interim storage during migration, etc). I know that there are many organizations that use on-premise GitLab storage, especially organizations that value their code and data remaining on their own infrastructure to a strong degree (I.e., not even in Azure/AWS, etc), and wonder if GitLab being available with the same intention as GitHub might make sense (even if text in docs and/or Orion UI and/or terminal messages show up about it not being intended for full production development). But, like I said, if it doesn’t fit the vision for Prefect’s path forward, that’s ok too. Even if Prefect is good with the idea of a GitLab block for dev/testing/transition, would a more helpful path connected to Prefect’s intended path forward be to contribute other non-cloud storage blocks (like maybe SFTP or SCP, etc)?
:thank-you: 1
a

Anna Geller

08/31/2022, 1:13 PM
where the flow is baked into the image itself
I'd recommend checking the latest release: https://medium.com/the-prefect-blog/prefect-2-3-0-adds-support-for-flows-defined-in-docker-images-and-github-repositories-79a8797a7371
would a more helpful path connected to Prefect’s intended path forward be to contribute other non-cloud storage blocks (like maybe SFTP or SCP, etc)?
Yes, 100%, you are spot on here -- SFTP is a storage block to which we could write to and read from; GitHub and GitLab are really no storage systems, those are version control and engineering collaboration platforms so SFTP would be a much nicer way of solving the problem for on-prem deployments. I'd definitely love to see a contribution for that if you would want to submit a PR
👍 1
b

Ben Hammond

08/31/2022, 1:27 PM
Sounds good. I’ll plan on doing that.
🙌 1
:thank-you: 1
View count: 6