https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Sander

    12/02/2022, 7:03 PM
    Hi! Great to see the enhancements of 2.7! I was wondering if you can schedule a cancellation of flow runs? That would then allow a start and stop time to be configured on the scheduling level.
    ✅ 1
    b
    m
    w
    • 4
    • 17
  • m

    Malek Naski

    12/02/2022, 7:38 PM
    Hi all – I've started using Prefect for some data flows and liking it so far! I have a few questions however as I want to see if there's a better way to do things: In order for a subflow run to be also sent to the queue, I couldn't find a better way other than deploying that subflow and then triggering it via the API with
    deployments.run_deployment()
    . However, this makes it not possible to manually retry a failed subflow, is there a better way to do this? My use case is mostly the parent flow fetching inputs and then each subflow run would handle one of the inputs (I'm looping over the task that triggers the subflow deployment runs) Thanks!
    ✅ 1
    m
    • 2
    • 3
  • c

    Chris McClellan

    12/03/2022, 2:17 AM
    😞 I'm trying to install prefect library on my new(ish) laptop and this is failing: pip install -U prefect How can I diagnose what's actually wrong here?
    j
    • 2
    • 7
  • m

    merlin

    12/03/2022, 6:51 AM
    Is is possible to manage two separate prefect environments on the same machine? I've been experimenting with profiles, and setting the PREFECT_ORION_DATABASE_CONNECTION_URL to perhaps define a second sqlite db, but no luck. Even in different profiles my prefect commands are aware of deployments from the other environment. Ideally there would be no overlap between prefect environments, different db, etc.
    ✅ 1
    r
    • 2
    • 3
  • t

    Tim Galvin

    12/03/2022, 8:18 AM
    Has the behaviour of
    PREFECT_LOGGING_EXTRA_LOGGERS
    changed from
    2.6.7
    to
    2.6.9
    at all? I was previously using it successfully in combination with
    DaskTaskExecutor
    and
    dask_jobqueue.SLURMCluster
    to capture logs from another package. It seems though with an updated to
    2.6.8
    or
    2.6.9
    I have lost this ability. I do see the logs being printed to my slurm output files, and I do see it is formatted in the
    prefect
    style, but I am not seeing these logs being submitted through to my self-hosted Orion server (as presented by the web UI). Any ideas?
    m
    • 2
    • 4
  • a

    Andreas Nigg

    12/03/2022, 1:33 PM
    Hey! I've a little problem with long running parent flows: There is one parent flow and two sub-flows. The subflows both need between 2 and 30 hours (yes, yes, I know...). Now, quite often, the parent flow enters state "crashed" with State message "Flow run infrastructure exited with non-zero status code -1.". Interestingly, it always happens around runtime of 4 hours. Fortunately, the subflows still continue to run and they also finish. Therefore, it does not really impact the pipeline, because the pipeline simply succeeds - but it's not really nice, that parent flows. Any ideas what could cause the parent to fail? In the logs of the infrastructure (see thread), everything seems fine. (One thing I have to admit: The flows are generating approx. 3 lines of log per second. Is this the problem?). (Second thing to admit: There are a looot of tasks. About 2.000 tasks). prefect 2.0 cloud, v2.7.0 agents. Infrastructure is a kubernetes job. EDIT: I changed to logs and tasks a little bit to have way less logs and tasks. In total there are less than 100 tasks and about 10 logs per minute. But still the same. The parent flows crashed after 4 hours - and the subflows happily continued.
    l
    j
    • 3
    • 5
  • t

    Tim Galvin

    12/04/2022, 8:46 AM
    Following my previous query two days ago about the change in behaviour I found in
    PREFECT_LOGGING_EXTRA_LOGGERS
    no longer reporting logs through to orion. I went reading through
    prefect/logging/configuration.py
    and see in #7569 that there was some change to the logic. Now there is a test made against
    config['incremental']
    before the orion log handler is attached to the logger of the extra module. I am not sure exactly what
    incrmental
    is in this sense, but when I disable this check to force the orion handler to attach itself to my extra module, things work as expected and logs are streamed to orion. It seems that by default that this
    config['incremental']
    is set to
    True
    (at least I have not knowingly set it), and the test made against its value is negative when evaluating whether to attach the orion logger. So my question is what is this
    incremental
    configurable, and how does one set it? Is the check made against it actually intended to be negated? Once I remove the check my logs for extra modules outlined in
    PREFECT_LOGGING_EXTRA_LOGGERS
    behave the same as pre v2.6.9 / #7569 @Anna Geller @Michael Adkins - I see both your names on the change -- please don't hate me for tagging you directly 🙂
    ✅ 1
    a
    • 2
    • 3
  • m

    Marwan Sarieddine

    12/04/2022, 2:26 PM
    Hi folks, I am facing an issue unable to remove an agent in prefect 1 (cloud) - I tried doing so via graphql and the UI to no avail
    1️⃣ 1
    m
    • 2
    • 17
  • h

    HAITAM BORQANE

    12/04/2022, 4:09 PM
    Hello everyone, I wanted to ask are there any GitHub repos available for projects with Prefect because I'm currently learning it and I want to see how does a project built with Prefect looks like
    ✅ 1
    j
    • 2
    • 2
  • y

    Yaron Levi

    12/04/2022, 6:57 PM
    Hi 👋 We have several deployments running in production (scheduled with a CRON expression). All of those deployments share a common piece of code coming from a .py file. if I update this .py file, I now need to run the “apply” command for each of the .yaml files. For example:
    prefect deployment apply jobs/selfServiceDaily.yaml
    Are there any shortcuts to apply many yaml files at once?
    m
    • 2
    • 2
  • j

    João Coelho

    12/04/2022, 8:45 PM
    Hello everyone!! Hope you're all fine.
    🙌 1
    a
    m
    • 3
    • 6
  • d

    datamongus

    12/05/2022, 12:16 AM
    Greetings, I’m aware that the Dask functionality in Prefect allows for running parallel flows/tasks across different agents. I’m curious if the K8s operator does the same? Ultimately what I am attempting to do is offload a resource intensive task to different agents. The this particular task was previously done using native pipeline and a for loop. I’ve refactored this logic (in python) to use a thread pool and removed the for loop. While this had great performance improvements the timing of the logic is still not efficient enough. I am now considering moving this logic to a prefect flow in the hopes of improving this parallelization even further across different agents which are setup as kubernetes pods.
    ✅ 1
    m
    • 2
    • 1
  • a

    Abhinav Ayalur

    12/05/2022, 1:05 AM
    Hey there, I was thinking of using Prefect to route messages throughout my platform and I was wondering if there was a way to "join" lists of outputs created by two separate tasks? So if I have task 1 that outputs a list of elements and task 2 that outputs a list of other elements, assuming they have the same length what is the best way to join them together for my third task?
    ✅ 1
    m
    • 2
    • 1
  • a

    Abhinav Ayalur

    12/05/2022, 1:27 AM
    Separate question, was wondering how GPU jobs were orchestrated? We have some large GPU tasks that we'd like to write in, and were wondering if there was a way to have an init function that instantiates a model so we don't have to instantiate it for each call of a task, and that we could maybe have "warm tasks" running that could pickup these jobs faster without having to initialize the model again?
  • o

    oliverek12

    12/05/2022, 4:25 AM
    Hello, small question. Sorry if I missed this in the documentation, but is there a way to view what is in a work-queue? Im running an agent locally (2.7.0) with a --limit. Threading with context.
    m
    • 2
    • 5
  • b

    Ben Muller

    12/05/2022, 6:15 AM
    Is there such thing as task concurrency limits? I have 8 or so tasks all running with
    .submit
    with the
    DaskTaskRunner
    with no cluster. It seems like about two tasks run at a time and then another two start, my assumption was that all 8 would start and Dask handles them and they finish when they are ready? I know this would be limited by the threads too, but the machine it is running on should have more than enough threads to handle those 8 tasks.
    ✅ 1
    t
    m
    • 3
    • 4
  • d

    Deepanshu Aggarwal

    12/05/2022, 6:56 AM
    hi! if i trigger a flow run via deployment. is there a way to get the output of that flow run in return or even using any api ?
    s
    • 2
    • 4
  • i

    iñigo

    12/05/2022, 9:07 AM
    Hi, Is it possible to skip a subflow if it fails, I mean in a flow of flows config to do the next flow. thanks
    m
    • 2
    • 1
  • j

    Joël Luijmes

    12/05/2022, 10:44 AM
    Hi there 👋 I’m working on a flow which (in theory) spawns off a lot of tasks in flow runs (about 10-50k task runs each day). I’m sure Prefect can handle such volumes, however there is also a bit of overhead associated with this (database / disk): • log records for “Created task run”, “Submitted task run” “Execution … immediately” • task_run record • task_run_state record • flow_run record Right now I got about 380k task runs and 17k flow runs in my database, with already 20GB storage. Understandably this originates by passing data to each sub flow. I suppose my current setup is not ideal (well unless I want to pay a lot for storage). Regardless of that, is there a way such that Prefect (Orion), performs roll ups on these tables? Or should I just delete these records myself from the database.
    ✅ 1
    • 1
    • 1
  • v

    Vadym Dytyniak

    12/05/2022, 11:17 AM
    Hi. In Prefect 1 we use
    ENDRUN
    exception to fail and do not retry the task, does it exist in Prefect 2?
    👀 1
    s
    • 2
    • 2
  • t

    Timo

    12/05/2022, 1:26 PM
    Hi all, is there a parameter to define when a flow is considered as late? So when we start at deployment manually the flow always went into state "Late" which then triggers a notification but few seconds later its running. Solution for that would be to define "Late" after e.g. 30 sec. Thanks
    🙌 1
    m
    • 2
    • 6
  • l

    Luca Schneider

    12/05/2022, 2:02 PM
    Hi all, I noticed a weird behaviour lately with kubernetes jobs. My pipelines are doing fine but the flow runs are ending up in “crashed” state. The only error message I could find is :
    State Message
    Flow run infrastructure exited with non-zero status code -1.
    ✅ 1
    v
    • 2
    • 3
  • j

    Justin Trautmann

    12/05/2022, 2:30 PM
    Hello community, hello Prefect team, the flow cancellation feature in 2.7 is very useful and exactly what our internal users were requesting. however, our tasks submit long running jobs to external services that would continue to run even if the flow running infrastructure is terminated. in order to save costs, it would be great to have the possibility to handle sigterm appropriately and cancel any external jobs before actually terminating. i played around with python signal handlers and found that the way how prefect executes the flow in a thread doesn't allow setting custom signal handlers the conventional way. are there any best practices available on how to deal with this type of situation? any input on how to react to sigterm from within a flow is very much welcome. thanks
    :plus-one: 1
    ✅ 1
    j
    j
    • 3
    • 3
  • m

    Matthew Ferry

    12/05/2022, 3:06 PM
    Hello! I'm working with prefect to orchestrate some dbt tasks, but I'm struggling with ECS deployment. I have one flow that I want to run for multiple customers, so I need data access to be single-tenant, but it doesn't seem efficient to run an agent for each customer. I was hoping to: • Run one agent as a long-running ECS task that listens to one queue • When a flow run is requested, that agent spins up a separate ECS task (with client-specific permissions) to run the entire flow • The ECS task runs a prefect flow that includes dbt shell command prefect tasks from prefect-dbt Could somebody help me think through how this can be done? Most of the ECS docs describe v1 not v2, and I only found this discussion in Discourse which said ECS wasn't really supported.
    r
    • 2
    • 4
  • v

    Vadym Dytyniak

    12/05/2022, 3:06 PM
    Hi. How to test the Prefect 2 task that uses
    get_run_logger
    ? When I try to use task.fn it fails with no task or flow context.
    ✅ 1
    j
    • 2
    • 4
  • v

    Vadym Dytyniak

    12/05/2022, 4:06 PM
    Hello. We see issue with SequentialTaskRunner. Why second task is not waiting for first one?
    @task()
    def copy_file(n: int):
        get_run_logger().info('Start file copying')
        time.sleep(n)
        get_run_logger().info('Completed file copying')
    
    
    @flow(task_runner=SequentialTaskRunner())
    def sample() -> None:
        copy_file.map([3, 4])
    
    
    if __name__ == '__main__':
        sample()
    Logs:
    18:05:29.909 | INFO    | Flow run 'devious-aardwolf' - Created task run 'copy_file-6cc8b0a3-0' for task 'copy_file'
    18:05:29.910 | INFO    | Flow run 'devious-aardwolf' - Executing 'copy_file-6cc8b0a3-0' immediately...
    18:05:29.916 | INFO    | Flow run 'devious-aardwolf' - Created task run 'copy_file-6cc8b0a3-1' for task 'copy_file'
    18:05:29.917 | INFO    | Flow run 'devious-aardwolf' - Executing 'copy_file-6cc8b0a3-1' immediately...
    18:05:30.468 | INFO    | Task run 'copy_file-6cc8b0a3-1' - Start file copying
    18:05:30.534 | INFO    | Task run 'copy_file-6cc8b0a3-0' - Start file copying
    18:05:33.536 | INFO    | Task run 'copy_file-6cc8b0a3-0' - Completed file copying
    18:05:33.726 | INFO    | Task run 'copy_file-6cc8b0a3-0' - Finished in state Completed()
    18:05:34.469 | INFO    | Task run 'copy_file-6cc8b0a3-1' - Completed file copying
    18:05:34.658 | INFO    | Task run 'copy_file-6cc8b0a3-1' - Finished in state Completed()
    ✅ 1
    m
    • 2
    • 3
  • t

    Tim-Oliver

    12/05/2022, 4:37 PM
    Hello everyone, How can I get the task-runs of a flow-run via REST API. I tried
    tr = await client.read_task_runs(limit=1, flow_run_filter=FlowRunFilter.construct(id="562d1705cfcc48a0b55df98373fd10bd"))
    m
    • 2
    • 2
  • c

    Christian Vogel

    12/05/2022, 5:14 PM
    hi prefect community! we are using prefect notifications in the prefect cloud to detect late flows in order to check if our agents are healthy. In case a run is late we send a notification via slack to be informed. Unfortunately we encountered some false positives, so flows were marked as LATE but the agent is healthy. We saw that there is the attribute PREFECT_ORION_SERVICES_LATE_RUNS_AFTER_SECONDS, but this is only configurable on the self-hosted cloud. Any idea what we could do?
    :plus-one: 4
    m
    a
    • 3
    • 6
  • a

    Ashley Felber

    12/05/2022, 5:39 PM
    Hello, I want to create an event driven (when files land in s3) deployment. Is this best way to do this with a lambda function? If so, what's the best documentation to follow on this?
    b
    • 2
    • 2
  • j

    Jean-Michel Provencher

    12/05/2022, 6:48 PM
    Quick question, do you plan on allowing to an additional properties when creating a deployment in Prefect 2 that would target a directory for the upload. Currently this seems quite limiting to only have the possibility to upload the current folder and ignore files based on the .prefectignore file. I see a dirty workaround of people actually uploading the whole directory on the first deployment creation, and then ignoring uploading for following deployments command. Ideally I would like to have the option to target specific files/folder for the upload.
    r
    • 2
    • 2
Powered by Linen
Title
j

Jean-Michel Provencher

12/05/2022, 6:48 PM
Quick question, do you plan on allowing to an additional properties when creating a deployment in Prefect 2 that would target a directory for the upload. Currently this seems quite limiting to only have the possibility to upload the current folder and ignore files based on the .prefectignore file. I see a dirty workaround of people actually uploading the whole directory on the first deployment creation, and then ignoring uploading for following deployments command. Ideally I would like to have the option to target specific files/folder for the upload.
Also .prefectignore does not seem to support "*" wildcards
r

Rob Freedy

12/06/2022, 4:18 PM
Hey @Jean-Michel Provencher!! At the moment, there is no flag to specify exact files to upload to remote storage. There is the
--skip-upload
flag which will skip uploading all of the files to remote storage. You can submit a feature request for specifying files here: https://github.com/PrefectHQ/prefect/issues/new/choose The prefect ignore file has the same syntax as a gitignore file, which should support wildcards: https://git-scm.com/docs/gitignore
View count: 7