https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • v

    Vaibhav Shetye

    05/18/2021, 3:33 AM
    Hi some of our scheduled tasks have started failing with
    [Errno 11] Resource temporarily unavailable
    any hints on what could be the underlying issue?
    w
    k
    4 replies · 3 participants
  • j

    Jacob Blanco

    05/18/2021, 5:58 AM
    Hey folks, we use flows in conjunction with parameterized schedules A LOT and I was wondering if it’s possible to alter the name of a Flow Run to contain some identifier based on Flow parameters? So for example if one of the flow parameters is a table name, is it possible to have the flow name be something like
    "my_table-intrepid-iguana"
    or something like that. This would make it easier to navigate the flow runs in the Cloud UI.
    k
    6 replies · 2 participants
  • c

    Chris McClellan

    05/18/2021, 7:34 AM
    I'm using Prefect Cloud and have a problem with a flow that takes a long time to run (a lot of heavy processing). Is there a way to send a heartbeat (in my code) to ensure that the flow is not marked as Failed ?
    k
    2 replies · 2 participants
  • d

    Domantas

    05/18/2021, 12:32 PM
    Hello guys, I would like to ask if anyone have an example how to mount CephFS with prefect flow? For now I'm trying to use
    job_template_path
    parameter in
    KubernetesRun
    by passing yaml file which looks like this:
    apiVersion: batch/v1
    kind: Job
    metadata:
      name: prefect-job
      labels: {}
    spec:
      template:
        metadata:
          labels: {}
        spec:
          containers:
            - name: flow
              image: test-image:1.2
              imagePullPolicy: IfNotPresent
              command: ["/bin/sh", "-c"]
              args: ["prefect execute flow-run"]
              env:
                - name: PREFECT__CONTEXT__SECRETS__database_psw
                  value: veryStrongPasswrod
              volumeMounts:
                - name: storage
                  mountPath: /tnt/test/domantas_test
          restartPolicy: Never
          imagePullSecrets:
          - name: registry-secret
          volumes:
            - name: storage
              cephfs:
                monitors:
                    - 59.11.129.131
                    - 59.11.129.132
                    - 59.11.129.133
                    - 59.11.129.134
                    - 59.11.129.135
                path: /test
                user: storage
                secretRef:
                    name: ceph-secret
    However, when I try to execute in prefect server UI, it seems the flow get stuck with "Submitted for execution" status with no useful logs. I took job_template example from here: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/job_spec.yaml and additionally added cephFS necessary parameters.
    k
    16 replies · 2 participants
  • g

    Gabe Grand

    05/18/2021, 4:23 PM
    Hey guys, @Dana Merrick and I are using
    LocalDaskExecutor
    in combination with
    RunNamespacedJob
    to launch a bunch of trivially parallelizable child processes on Kubernetes. However, it looks like only 2 child jobs are getting run at a time. Is there a way to increase the parallelism // are we doing this right? 😄
    k
    d
    +1
    19 replies · 4 participants
  • m

    Marko Mušnjak

    05/18/2021, 6:49 PM
    Hi! What would be “the right way” in Prefect to do what Luigi refers to as ExternalTask (e.g. a file in S3 that gets created by some external process, and the rest of the pipeline can not proceed without it) ?
    k
    8 replies · 2 participants
  • c

    Charles Liu

    05/18/2021, 8:12 PM
    Hello all, I just wanted to rule out a possibility with my autoscaling issues I'm facing right now. If we're paying for unlimited concurrency (pay per run), is there anything else in Prefect that would prevent the scheduling of all the pods at once?
    k
    d
    +2
    43 replies · 5 participants
  • e

    Enda Peng

    05/18/2021, 9:10 PM
    Is there an equivalent code for command ?
    prefect backend server (cloud)
    Right now I am doing
    os.system("prefect backend cloud")
    , not sure whether there is a more elegant way
    k
    2 replies · 2 participants
  • c

    Chris McClellan

    05/19/2021, 12:48 AM
    I know it's a problem with my code, but can someone give me some tips on a solution ? (I've Googled, but I'm only getting documentation) Error is : Unexpected error: AttributeError("'FunctionTask' object has no attribute load. Did you call this object within a function that should have beendecorated with @prefect.task?") Yes, no space between been and decorated. I have made a few changes, but I didn't think it was anything major enough to cause this error 😞 I've tried back tracking but still can't find the glitch
    n
    k
    3 replies · 3 participants
  • r

    Ranu Goldan

    05/19/2021, 2:00 AM
    Hi everyone, I want to ask something about RunConfig and Executor. Im using KubernetesAgent on GKE. My KubernetesRun config was:
    KubernetesRun(
        labels=["agent:k8s-dev"],
        cpu_limit=8,
        memory_limit="12Gi",
        cpu_request=0.5,
        memory_request="512Mi",
    )
    And my executor was:
    DaskExecutor(adapt_kwargs={"maximum": 16, "minimum": 1})
    When I start a flowrun with these config, prefect agent create a new pod with this config: (image attached) The problem is: The dask worker always killed first before the GKE pod going for upscale, and prefect logging says error KilledWorker Any idea how to perfectly adjust between DaskExecutor and KubernetesRun?
    d
    2 replies · 2 participants
  • z

    Zach Schumacher

    05/19/2021, 2:24 AM
    how can i get the state of a task from a task object?
    k
    7 replies · 2 participants
  • p

    Peter Roelants

    05/19/2021, 6:10 AM
    Hi Prefect, I'm running into the following warning when running my flow:
    /venv/lib/python3.9/site-packages/prefect/engine/task_runner.py:865: UserWarning: This task is running in a daemonic subprocess; consequently Prefect can only enforce a soft timeout limit, i.e., if your Task reaches its timeout limit it will enter a TimedOut state but continue running in the background.
    I'm running a scheduled flow via a Docker Agent with DaskExecutors (multiple processes due to CPU intensive nature of the task). Some of the tasks in the flow can hang and thus I've added a timeout on the task. This warning scares me because, if I interpret it correctly, it might mean that certain processes might hang indefinitely. What is the best way to avoid this and still benefit from running multiple CPU intensive tasks in parallel with Prefect? (I'm running Prefect 0.14.19 and Dask 2021.5.0).
    k
    m
    11 replies · 3 participants
  • y

    Yohann

    05/19/2021, 7:07 AM
    Good morning everyone 🙂 I need some help with prefect. I'm trying to set up a use case where a flow runs subflow. But I got strange behaviors and I don't know why prefect works like this. I put a small example below. My goal is to launch every night a flow that iterates over a list and for each element runs a new flow. Each subflow is configured with the value of this list (it's an url). At the beggining, I was calling StartFlowRun without the idempotency_key. And subflows were called only for the first item. After this, I have used the idempotency_key configured with the item value. I thought It was working but I got strange bevaviors while scheduling my_flow. The flow has worked the first night, but for the others days subflows were not called (like if they were stuck). Then I put a random key for the idempotency_key. And this time everything works correctly for me. Is this normal ? I don't know if it is a good idea to use prefect like this because the GUI will be flooded with new flows. Does prefect rotate the postgres database to remove old flows ?
    from prefect import Flow, task
    from prefect.tasks.prefect import StartFlowRun
    from prefect.schedules import Schedule
    from prefect.schedules.clocks import CronClock
    from uuid import uuid4
    
    @task
    def run_subflow(item):
            StartFlowRun(flow_name="my_sub_flow", project_name="test", wait=True, parameters={"item":item}).run(idempotency_key=str(uuid4()))
        
    
    with Flow("my_flow", schedule=Schedule(clocks=[CronClock("0 0 * * *")]),) as flow:
        run_subflow.map(item=["a", "b", "c"])
    flow.run()
    k
    7 replies · 2 participants
  • d

    David

    05/19/2021, 10:53 AM
    Hi all Can someone elaborate on this error “Cannot provide
    task_definition_arn
    when using
    Docker
    storage”? I have a flow that I want to register with docker (ECR) but I dont want to generate_task_definition every deploy (because besides the docker itself nothing changed) Why cant I pass task arn when using docker storage? Thanks
    k
    11 replies · 2 participants
  • m

    Michael Hadorn

    05/19/2021, 1:13 PM
    Hi all Starting from 0.14.17 (also in 0.14.19) the gitlab storage does not work anymore:
    Failed to load and execute Flow's environment: GitlabGetError('404 Project Not Found')
    We use it with a own hosted gitlab environment. Beside of the new based docker image, we changed nothing:
    prefecthq/prefect:0.14.12-python3.8 -> prefecthq/prefect:0.14.17-python3.8
    prefecthq/prefect:0.14.12-python3.8 -> prefecthq/prefect:0.14.19-python3.8
    Till 0.14.16 it's working:
    prefecthq/prefect:0.14.12-python3.8 -> prefecthq/prefect:0.14.16-python3.8
    The new Git storage is not able to handle the company's certificate (this was my first try): git storage: ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1131) Are there any problems known? We use it like this in the flow configuration:
    , storage=GitLab(
        host="<https://gitlab.xxx.ch>"
        , repo="group/repo"
        , path="test_flow.py"
        , ref="feature"
        , secrets=["GITLAB_ACCESS_TOKEN"]
    )
    ✅ 1
    k
    6 replies · 2 participants
  • p

    Peter Roelants

    05/19/2021, 2:04 PM
    Hi Prefect, It seems that a Tasks
    on_failure
    handler is called even when there is retry logic added to the Task, and the task still hasn't exhausted its
    max_retries
    . (I think I just had a Task on which the
    on_failure
    handler was called, even though it succeeded in a retry.) Is there a way to add a failure handler that is called only after the task has exhausted all its retries and has truly failed?
    k
    4 replies · 2 participants
  • n

    Newskooler

    05/19/2021, 3:43 PM
    Hi Prefect 👋 Is there a way to not keep any data into the
    flows
    folder (my folder is about 200GB of size. from just 2 days worth of flows)? Ideally I would like to delete the flows data once a flow is completed with 100% success.
    k
    a
    32 replies · 3 participants
  • b

    Brian Mesick

    05/19/2021, 4:31 PM
    Hi folks. We’re still trying to move to github storage from Docker, but have run into a new snag. It seems like our toml config files are no longer being picked up. We’ve always configured them at registration time as
    PREFECT__USER_CONFIG_PATH=prod.toml
    but
    prefect.config
    no longer seems to have the values from the toml file. Any idea how we can work around this?
    k
    m
    +1
    13 replies · 4 participants
  • w

    Walee

    05/19/2021, 5:42 PM
    Hey y’all, What’s the best way to batch items after running a map. So, as an example, let’s say I’m processing rows from CSV in parallel and writing them into a DB. Writing each in a separate map is quite in efficient. I can reduce them with a into batches of 10000K items. But then the reducer task becomes a single source of bottle neck. Is there any other efficient approaches to batch to items together?
    k
    3 replies · 2 participants
  • b

    Braun Reyes

    05/19/2021, 6:10 PM
    hey yall! I would like to connect with any leaders that have any experience with bootstrapping a Data Governance focused team. We are looking to start up an initiative team around this and would love to hear any experiences that might be out there. Job titles and description nuances compared to standard DE or AE or DA. Let me know if anyone has time to chat and I can DM
    👍 2
    :upvote: 2
  • k

    kevin

    05/19/2021, 7:43 PM
    Got a question about the GQL query tool: How difficult would it be to query the execution time of all task instances in a given flow with a given identifier?
    k
    2 replies · 2 participants
  • j

    Jeff Williams

    05/19/2021, 7:57 PM
    Have a question about Prefect Context. I Context document under Core Concepts / Execution as well as API Latest / prefect.utilities Context. Both seem to say that users can store items in the context, being careful to not step on any Prefect items, of course, and then retrieve them later. I am attempting to do that but running into trouble. I have created a simple task which receives a "label" to store a time value under. For example, "Start_Flow" would be used to store a time.time() value into the context. That part seems to be working correctly in the task as I can store and then print the stored value. However, in a subsequent task, when I retrieve the "Start_Flow" item from the context, I get a None value. Is there something that I am missing about Context? I am using code similar to the following:
    To store:
    prefect.context[label] = time.time()
    To retrieve:
    time_val = prefect.context.get(label)
    k
    3 replies · 2 participants
  • s

    Sean Talia

    05/19/2021, 8:18 PM
    Hi all, I've got a question about the workflow for upgrading the version of Prefect you're working with. I'm doing an upgrade right now where my plan is to first upgrade the version that my prefect agents are running, and from there I'm going to update the version of prefect in the RunConfig images that my flows are running with; my main question is around what will happen if I update the RunConfig images to have a new version of prefect (in my case, 0.14.17) installed, but I don't yet re-register the actual flows themselves (so they'll still show as having been registered with the old version of prefect I've been using). Is there a huge cause for concern here? When you upgrade the version of prefect that you're "running", do you also need to make sure you re-register all of your flows?
    d
    m
    4 replies · 3 participants
  • s

    Sébastien Arnaud

    05/20/2021, 2:17 AM
    Hi, does anybody know if it is possible to register Worker Plugins when connecting to a Dask Cluster? (as
    client.register_worker_plugin
    allows)
    c
    3 replies · 2 participants
  • g

    Gee Xun Chen

    05/20/2021, 3:48 AM
    Hi everybody! I was just wondering why my task not running in the correct order when I'm using
    case
    .
    🙌 2
    c
    m
    7 replies · 3 participants
  • j

    Joël Luijmes

    05/20/2021, 7:26 AM
    Hi there! I’m running into an issue where an exception is raised, my code still continues. This however doesn’t really agree with Prefect (understandably). How I notice this? I can see the exception being logged, and Prefect already noticed that the task failed. However, the task still continues, and I see more logs. Check the thread for the details ➡️
    k
    24 replies · 2 participants
  • j

    Jocelyn Boullier

    05/20/2021, 7:54 AM
    Hey ! Quick question regarding automations in Prefect Cloud. It seems the automation isn't triggered if the flow fails to start, for instance if it couldn't be found in the registered storage. Is it expected ? Is that kind of automation part of the agent automation, and not the flow run ?
    k
    j
    25 replies · 3 participants
  • j

    Joël Luijmes

    05/20/2021, 1:35 PM
    Hi there again, got different error. I’m running dbt through prefect with custom ShellTask (thus not the existing DbtShellTask). I tried couple commands and it works fine. However now Im running
    dbt test
    which generates loads of logging output, and it seems something starts to break then:
    RuntimeError: reentrant call inside <_io.BufferedWriter name='logs/dbt.log'>
    ✅ 1
    k
    12 replies · 2 participants
  • n

    Newskooler

    05/20/2021, 2:35 PM
    Hi 👋 I was using prefect on localhost:8080 locally (not clould) and for sorme reason, now it started to re-direct me to cloud every time I go to
    localhost:8080
    . and I can’t go back to my local prefect… why is that and how can I fix this? 🤔
    n
    6 replies · 2 participants
  • m

    May

    05/20/2021, 2:58 PM
    HELP - How do I get prefect secret to work on prefect that is deployed on kubernetes via helm? as you can see below it seems Secrets are grayed out 😞 along with a bunch of other things
    n
    3 replies · 2 participants
Powered by Linen
Title
m

May

05/20/2021, 2:58 PM
HELP - How do I get prefect secret to work on prefect that is deployed on kubernetes via helm? as you can see below it seems Secrets are grayed out 😞 along with a bunch of other things
n

nicholas

05/20/2021, 3:04 PM
Hi @May - Prefect Secrets aren't available in Server, as they require a distinct piece of managed infrastructure; Prefect Cloud has these on every tier though (including free!)
:thank-you: 1
m

May

05/20/2021, 3:09 PM
hahaha Thank you @nicholas It all makes sense now 🙂
n

nicholas

05/20/2021, 3:09 PM
Glad I could help! 🙂
View count: 1