https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Cansu Kavılı

    11/21/2022, 8:47 PM
    hey everyone! 👋🏻 I am running Prefect Orion and the agents on a Kubernetes cluster and I was looking for a way to define Blocks as code manner. Deployment of Prefect itself with GitOps approach is super easy (nice helm chart!) but I do not want to define my Blocks via UI, I want them to persist and be consistent every time I reinstall Prefect..is there a way to achieve that? Thank you!
    ✅ 1
    a
    r
    • 3
    • 5
  • d

    Devin McCabe

    11/21/2022, 8:54 PM
    Does anyone know why a mapped task might indicate "Ready to proceed with mapping" but it never proceeds? It works fine with LocalExecutor but not with DaskExecutor (via FargateCluster). I'm really frustrated because I know I've encountered and solved this exact issue before...
    m
    b
    • 3
    • 4
  • m

    Michael Z

    11/21/2022, 9:34 PM
    Hello so I have a state handler that checks for failed state and send an email. However, I want to add retry logic to this handler (because the handler is not a task) if the email fails, is there anyway to do this using prefect? This is for prefect v1.
  • a

    Ashley Felber

    11/21/2022, 10:07 PM
    Hello, I need to create an API key and URL for the prefect agent. To confirm, can a service account only be created on the paid tier?
    ✅ 1
    b
    • 2
    • 1
  • m

    Madison Schott

    11/21/2022, 10:40 PM
    Hi all, deploying a Prefect pipeline that uses dbt models- I upgraded to their new 2.0 version and then had to downgrade again to run our production pipeline (my mistake- I needed to create a venv). I changed my DockerFile to use this image from Prefect-
    FROM prefecthq/prefect:1.4.0-python3.9
    and now I get this error when running
    dbt deps
    logs.csv
    m
    • 2
    • 1
  • a

    Ashley Felber

    11/21/2022, 11:03 PM
    Hello, I am running into an issue in the docker build step. Error " Step 4/4 : COPY flows/ flows/ 129 COPY failed: file not found in build context or excluded by .dockerignore: stat flows/: file does not exist. Using the docker image from this repo. cc: @Taylor Curran
    t
    • 2
    • 5
  • s

    Steven Wilber

    11/22/2022, 12:02 AM
    Hi, I'm running into, what is probably a very simple issue with docker networking. Setup: • Orion Server (running locally: 127.0.0.1:4200) • Prefect Agent (running locally) • Airbyte Server (running locally: 127.0.0.1:8000) • Docker Infra Block - Docker running locally The agent can see the server. I can create and apply a deployment. On starting the flow: • The agent kicks off. • The agent creates the docker image. • The agent applies any pip requirements from block env setting. • It then fails with
    ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 4200)
    I'm assuming the flow running in the docker container cannot see the orion server on port 4200, however, I'm not sure what to do to fix that. No doubt the next failure will be accessing the Airbyte server, but I'm not there yet. Any help is much appreciated.
    ✅ 1
    c
    • 2
    • 9
  • b

    Ben Muller

    11/22/2022, 12:08 AM
    Hey Prefect - I get a heap of these errors now days with 2.0: 🧵
    t
    a
    +5
    • 8
    • 36
  • t

    Tuoyi Zhao

    11/22/2022, 12:31 AM
    Prefect 2.0: The Prefect Cloud - Flow Run - Date Range , if the range include today, it won't really shows any flow runs today........i think the backend data base may use the 0 am of today. Anyone having the same issue here?
    ✅ 1
    k
    • 2
    • 2
  • f

    Fernando Silveira

    11/22/2022, 3:39 AM
    Hi folks I'm trying to setup my Prefect v2 flows to run in `KubernetesJob`'s but I'm having trouble figuring out the best way to setup my storage block. In my deployments, I create a docker image with all my flow code and corresponding dependencies. I wish I could use something like the
    DockerContainer
    storage block to my deployment indicating that it should just pull my docker image from AWS ECR and run the flow. However, as far as I understand, the
    DockerContainer
    block is also an infrastructure block and is meant to run the image against docker directly on a machine - i.e., NOT in a
    KubernetesJob
    . This means that so far, I've been relegated to using the
    S3
    storage block which pushes my flow code to S3, only to pull it back to my docker image at flow run time (when the image already contains all the code it needs to run). This feels wrong so I'm sure I must be missing something here. How do I configure
    KubernetesJob
    to just pull a docker image and not rely on any additional storage block like S3? PS: so far, I'm really enjoying how quickly I'm getting up to speed on prefect v2. Despite of kinks like this, I'm really enjoying the development experience.
    ✅ 1
    m
    • 2
    • 19
  • m

    Mahesh

    11/22/2022, 3:56 AM
    Hello Team, am new to prefect2. Am working on migrating flow from prefect1 to prefect2.
    👀 1
    m
    k
    • 3
    • 13
  • d

    davzucky

    11/22/2022, 5:09 AM
    We just had an edge case with Prefect 1 (v0.15.6) self-hosted, where two agents which started with the same tags started exactly at the same time a flow_run. As we are moving to prefect 2, can multiple agents listen to the same work queue ? this would be for self-hosted on which we have multiple instances of Orion.
    ✅ 1
    j
    • 2
    • 2
  • i

    Ikkyu Choi

    11/22/2022, 5:44 AM
    Hi, I’m trying to sending alarm to slack when my flow failed. Normal case it works well, but when flow about lack of aws resource (i.e., gpu) fails i didnt’t get the alarm. I’m using ECS agnet now. Anyone could help? Thanks.
    :aws: 2
    1️⃣ 1
    b
    • 2
    • 3
  • l

    latif

    11/22/2022, 8:07 AM
    Hello everyone. This more of a general question than a specific issue. I have a set of requirements and I'm unsure if Prefect is suitable for them (perhaps ETL tools in general aren't). I need to aggregate data from different sources, process it in chunks, and then stream the chunks to a browser for each client. I think conceptually there are 2 ways to map this onto Prefect. (1) Have tasks for aggregation, and then a separate long-lived task that handles chunking, processing and streaming. In this case one flow takes care of the entire job. (2) Have tasks for aggregation, then a subflow for each chunk, where we have tasks for processing the chunk and a separate one for streaming it back. Both approaches do something that maybe Prefect isn't designed for, I don't know enough about Prefect's internals to understand the performance/space cost of a single flow, or the potential problems of a really long-lived task. (1) would have a really long-lived task, maybe even days. (2) Would have huge amount of subflows (there could be around 50,000 chunks) for a single job. I prefer (2) just because (1) feels like it doesn't have the task granularity to really take advantage of Prefect. I'm open to alternative approaches. Ordinarily I'd reach for something like Kafka for this, but I like getting the benefits of Prefect (logging, observability, retries..etc) for free.
    k
    • 2
    • 2
  • d

    Deepanshu Aggarwal

    11/22/2022, 8:22 AM
    hi everyone! im facing a weird issue in flow runs . im running prefect on aws eks ( self hosted instance of orion and kubernetes jobs on different namespaces ) 1. i have 9 running flows which i can see on the ui but i cant find the corresponding job on my cluster 2. when i check the concurrency-limit for the tags that these flow use i can see 14 task runs ( these flow have parallel task runs so some of them are having more than one task stuck in running state) so basically 9 running flows with 14 running tasks as i can see on the ui but i cant find the jobs on my cluster. doubt - how did these flow runs get terminated on the cluster and even if they did why did it not update in the ui or lets say why was the state not updated in meta db attaching screenshots for the same
    r
    c
    t
    • 4
    • 21
  • t

    Tim-Oliver

    11/22/2022, 8:40 AM
    Hello, Can I get the input arguments of a task-future?
    👀 1
    k
    • 2
    • 5
  • n

    Nic

    11/22/2022, 11:00 AM
    For our setup, we're incorporating unittest in the build phase of our ci/cd pipeline. So that it won't deploy it to the cloud, unless the test passes in Azure devops I'd like to be able to run unittest on singular tasks and not start a flow run but when running
    import unittest
    from tasks import address_matching as am
    from prefect import flow,task
    
    
    class TestDatahandling(unittest.TestCase):
        
        def test_tilslutning_forsyning(self):
            am.address_matching()
    
    
    
    if __name__ == '__main__':
        unittest.main()
    I get following error.message
    ERROR: test_tilslutning_forsyning (__main__.TestDatahandling)
    ----------------------------------------------------------------------
    Traceback (most recent call last):
      File "c:\Users\nho\Desktop\git\Prefect\etl\geoserver\tilslutning_forsyning\test.py", line 9, in test_tilslutning_forsyning
        am.address_matching()
      File "C:\Users\nho\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\tasks.py", line 353, in __call__
        return enter_task_run_engine(
      File "C:\Users\nho\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\engine.py", line 674, in enter_task_run_engine
        raise RuntimeError(
    RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use `task.fn()`.
    Two questions 1. What would the syntax for task.fn() in my example be? I can't get it to run 2. Are there better ways, or best practices of running tests on flows or does this setup seems okay?
    k
    • 2
    • 7
  • a

    ash

    11/22/2022, 2:51 PM
    Hello everyone , i am using prefect and dask resource manager and running some tasks within Dask resource manager context and few outside the context manager. Flow looks like this
    with Flow("Test flow") as flow:
       
        with DaskCluster(n_workers=n_workers) as client:
            data = extract()
            processed_data= transform(data)
    
        save_data(processed_data)
    The problem is dask cluster is not shutting until save_data function completes but i expected that as soon as
    transform
    function completes , the
    cluster cleanup
    should happen, Is there any way i can initiate
    save_data()
    after
    transform()
    is done and the
    cluster_cleanup
    is done as well.
    1️⃣ 1
    b
    • 2
    • 2
  • x

    Xavier Babu

    11/22/2022, 4:19 PM
    Hi Prefect Community, I am running a machine learning model as a workflow using Prefect. If I don't use Flows and Tasks, the same model completes within 90 minutes. But if I convert them as Flow and Tasks, the same model takes 300 minutes to complete. I think, especially it takes more time in dropna() or drop() functions (probably more than that). Is there any way I can allocate more memory for each task to make it run faster since it is a memory-based process? Or do we aware of any memory allocation restriction while running a task or flow? Please shed some light. Even I tried with DASK parallel processing, it didn't help when we run it with a Flow and Tasks. BTW, I am using Prefect Orion 2.4.5 and in a on-premise Linux server.
    ✅ 1
    r
    k
    • 3
    • 7
  • p

    Philip MacMenamin

    11/22/2022, 4:50 PM
    Hello, I'm trying to implement the following kind of logic within a Prefect1 flow.
    @task(name="Task A")
    def task_a(x: int) -> int:
        if x == 2:
            raise signals.FAIL
        return x + 2
    
    @task(name="Task B")
    def task_b(x: int) -> int:
        return x + 2
    
    @task(name="check")
    def checker(x: int) -> None:
        if anything_upstream_broke():
            this_didn't_work(ID)
        else:
            this_was_ok(x)
    
    
    with Flow("x example") as flow:
        l = [2,5,7]
        la = task_a.map(x=l)
        lb = task_b.map(x=la)
        lc = checker.map(x=lb)
    That is, I have a list of things I want to run through the WF, and I
    map
    these. Sometimes some of the elements in the list won't run properly. I'd like a way to look through all of the upstream tasks and check if any failed and do a thing for that specific input.
    1️⃣ 2
    m
    • 2
    • 5
  • d

    Dmitrii Egunov

    11/22/2022, 6:03 PM
    Hi! What is the best way to create a scheduled flow with multiple scheduled subflows? Couldn’t find an article or discourse discussion. We are using Prefect Cloud 1.0. Example of structure: • Main flow A, run every week, dependant on: ◦ Sublow B, run every day, dependant on: ▪︎ Sublow C, run every hour So, if any of subflows fail, all upstream subflows should not start. Is there a way to do so? cc @Taylor Curran
    k
    • 2
    • 3
  • j

    Jimmy Le

    11/22/2022, 6:04 PM
    Is it safe to delete the ~/.prefect/storage folder? mine has ballooned to over 100 GB now. anyway we can set some settings to limit how large this folder can get?
    ✅ 1
    m
    • 2
    • 4
  • j

    Joshua Grant

    11/22/2022, 6:47 PM
    What is the Prefect 2 equivalent of
    apply_map
    ? Details in 🧵
    ✅ 1
    m
    • 2
    • 8
  • j

    James Zhang

    11/22/2022, 8:09 PM
    hi guys, I’m trying to build and apply a deployment from our gitlab-ci pipeline, but I got
    httpx.ConnectTimeout
    error, my prefect-orion runs on our own k8s and should be reachable from our gitlab-ci pipeline, has anyone seen this error? any idea how I could debug? Thanks!
    a
    • 2
    • 3
  • j

    jack

    11/22/2022, 9:27 PM
    Is it safe to stop the prefect agent in order to update the packages installed in the virtualenv? Or would that cause any flow-runs-in-progress to be killed/restarted? Asking for both prefect 1.x and 2.x, as we are using both.
    k
    m
    • 3
    • 7
  • g

    geoffc

    11/23/2022, 1:31 AM
    Hi, I'm creating a flow-of-flows and was wondering if anyone had any recommendations for my scenario. Background: My parent flow has potential to create a lot of child flows, with the precise number (N) determined at run-time. To handle this, I've been using mapping. However I've noticed
    create_flow_run.map
    will create all of flow runs at once, which is clogging up my agent's backlog when N gets too large. This creates a lot of failures, which occur before my child flow runs can even begin. Unfortunately, I am not the admin and don't have control over the agent we're using, so I don't have a way to easily investigate why this is or resolve it. Even if I was, it seems like kicking off this many flows at once probably isn't a good idea. Thus, I'm wondering if there is some way to throttle
    create_flow_run
    my flow without pre-defining my N child flows. I'm even OK with running all N flows in sequence. I.e. kick off flow run 1 -> wait for flow run 1 -> kick off flow run 2 -> etc. Here is the structure I'm using now:
    def create_wait_on_child(parameters):
        create_id = create_flow_run(flow_name="GenericChild",
                                    project_name="Project",
                                    parameters=parameters)
        return wait_for_flow_run(create_id, 
                                 raise_final_state=True)
    
    with Flow('Parent Flow') as flow:
        flow_runs_params_list = get_flow_runs_params()
        apply_map(schedule_run_backfill, flow_runs_params_list)
    My Question: Is there some way to combine the functionality of`create_flow_run` and
    wait_for_flow_run
    in the same task? I know I can't just add a
    task()
    decorator to
    create_wait_on_child
    in the example above (since that would involve tasks with tasks), but that is the functionality I'm going for. Or does anyone have alternative recommendations? Appreciate anyone's input.
    👀 1
    m
    e
    • 3
    • 6
  • t

    Tim Galvin

    11/23/2022, 7:47 AM
    Two questions. What is the collective noun for people who use Prefect? Is there a command that tries to communicate with an Orion server in a "hello world" sense to purely test for connectivity? I would like to perform an initial test up from before starting my python workflow. I have a self-hosted Orion server and the aim is purely to make sure this Orion server is accessible.
    m
    p
    • 3
    • 5
  • p

    Pekka

    11/23/2022, 7:49 AM
    Is there some general logic for what code should be @prefect.task'd and what code can be left in the @prefect.flow without tasks? (talking strictly prefect 2)
    a
    • 2
    • 1
  • a

    ash

    11/23/2022, 11:17 AM
    Hello everyone , i am using prefect and dask resource manager and running some tasks within Dask resource manager context and few outside the context manager. Flow looks like this
    with Flow("Test flow") as flow:
       
        with DaskCluster(n_workers=n_workers) as client:
            data = extract()
            processed_data= transform(data)
    
        save_data(processed_data)
    The problem is dask cluster is not shutting until save_data function completes but i expected that as soon as
    transform
    function completes , the
    cluster cleanup
    should happen, Is there any way i can initiate
    save_data()
    after
    transform()
    is done and the
    cluster_cleanup
    is done as well.
    t
    t
    • 3
    • 5
  • d

    Dan Wise

    11/23/2022, 12:54 PM
    Hi all, in Prefect 2 we are using environment overrides in our deployments. However I cannot see these in the UI. Does anyone know if these can be made visible? If not, are there plans to show these overrides in the UI? thanks!
    a
    • 2
    • 3
Powered by Linen
Title
d

Dan Wise

11/23/2022, 12:54 PM
Hi all, in Prefect 2 we are using environment overrides in our deployments. However I cannot see these in the UI. Does anyone know if these can be made visible? If not, are there plans to show these overrides in the UI? thanks!
a

Anna Geller

11/23/2022, 1:36 PM
we are aware https://github.com/PrefectHQ/prefect/issues/6635
d

Dan Wise

11/23/2022, 1:54 PM
Thanks @Anna Geller although I cannot see any explicit reference in that issue requests to viewing the overrides in the UI, unless I have missed it. We currently override on the deployment so those take precedence over the infra. Also am finding that I can override all environment variables specified on the infra but not just one particular one.
a

Anna Geller

11/23/2022, 2:54 PM
it's about the same, incl. UI feel free to comment on the issue directly to show you are interested in adding that feature in v2
View count: 1