• Jacob Blanco

    Jacob Blanco

    1 year ago
    Hey folks, we use flows in conjunction with parameterized schedules A LOT and I was wondering if it’s possible to alter the name of a Flow Run to contain some identifier based on Flow parameters? So for example if one of the flow parameters is a table name, is it possible to have the flow name be something like
    "my_table-intrepid-iguana"
    or something like that. This would make it easier to navigate the flow runs in the Cloud UI.
    Jacob Blanco
    Kevin Kho
    6 replies
    Copy to Clipboard
  • c

    Chris McClellan

    1 year ago
    I'm using Prefect Cloud and have a problem with a flow that takes a long time to run (a lot of heavy processing). Is there a way to send a heartbeat (in my code) to ensure that the flow is not marked as Failed ?
    c
    Kevin Kho
    2 replies
    Copy to Clipboard
  • Domantas

    Domantas

    1 year ago
    Hello guys, I would like to ask if anyone have an example how to mount CephFS with prefect flow? For now I'm trying to use
    job_template_path
    parameter in
    KubernetesRun
    by passing yaml file which looks like this:
    apiVersion: batch/v1
    kind: Job
    metadata:
      name: prefect-job
      labels: {}
    spec:
      template:
        metadata:
          labels: {}
        spec:
          containers:
            - name: flow
              image: test-image:1.2
              imagePullPolicy: IfNotPresent
              command: ["/bin/sh", "-c"]
              args: ["prefect execute flow-run"]
              env:
                - name: PREFECT__CONTEXT__SECRETS__database_psw
                  value: veryStrongPasswrod
              volumeMounts:
                - name: storage
                  mountPath: /tnt/test/domantas_test
          restartPolicy: Never
          imagePullSecrets:
          - name: registry-secret
          volumes:
            - name: storage
              cephfs:
                monitors:
                    - 59.11.129.131
                    - 59.11.129.132
                    - 59.11.129.133
                    - 59.11.129.134
                    - 59.11.129.135
                path: /test
                user: storage
                secretRef:
                    name: ceph-secret
    However, when I try to execute in prefect server UI, it seems the flow get stuck with "Submitted for execution" status with no useful logs. I took job_template example from here: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/job_spec.yaml and additionally added cephFS necessary parameters.
    Domantas
    Kevin Kho
    16 replies
    Copy to Clipboard
  • Gabe Grand

    Gabe Grand

    1 year ago
    Hey guys, @Dana Merrick and I are using
    LocalDaskExecutor
    in combination with
    RunNamespacedJob
    to launch a bunch of trivially parallelizable child processes on Kubernetes. However, it looks like only 2 child jobs are getting run at a time. Is there a way to increase the parallelism // are we doing this right? 😄
    Gabe Grand
    Kevin Kho
    +2
    19 replies
    Copy to Clipboard
  • Marko Mušnjak

    Marko Mušnjak

    1 year ago
    Hi! What would be “the right way” in Prefect to do what Luigi refers to as ExternalTask (e.g. a file in S3 that gets created by some external process, and the rest of the pipeline can not proceed without it) ?
    Marko Mušnjak
    Kevin Kho
    8 replies
    Copy to Clipboard
  • c

    Charles Liu

    1 year ago
    Hello all, I just wanted to rule out a possibility with my autoscaling issues I'm facing right now. If we're paying for unlimited concurrency (pay per run), is there anything else in Prefect that would prevent the scheduling of all the pods at once?
    c
    Kevin Kho
    +3
    43 replies
    Copy to Clipboard
  • e

    Enda Peng

    1 year ago
    Is there an equivalent code for command ?
    prefect backend server (cloud)
    Right now I am doing
    os.system("prefect backend cloud")
    , not sure whether there is a more elegant way
    e
    Kevin Kho
    2 replies
    Copy to Clipboard
  • c

    Chris McClellan

    1 year ago
    I know it's a problem with my code, but can someone give me some tips on a solution ? (I've Googled, but I'm only getting documentation) Error is : Unexpected error: AttributeError("'FunctionTask' object has no attribute load. Did you call this object within a function that should have beendecorated with @prefect.task?") Yes, no space between been and decorated. I have made a few changes, but I didn't think it was anything major enough to cause this error 😞 I've tried back tracking but still can't find the glitch
    c
    nicholas
    3 replies
    Copy to Clipboard
  • r

    Ranu Goldan

    1 year ago
    Hi everyone, I want to ask something about RunConfig and Executor. Im using KubernetesAgent on GKE. My KubernetesRun config was:
    KubernetesRun(
        labels=["agent:k8s-dev"],
        cpu_limit=8,
        memory_limit="12Gi",
        cpu_request=0.5,
        memory_request="512Mi",
    )
    And my executor was:
    DaskExecutor(adapt_kwargs={"maximum": 16, "minimum": 1})
    When I start a flowrun with these config, prefect agent create a new pod with this config: (image attached) The problem is: The dask worker always killed first before the GKE pod going for upscale, and prefect logging says error KilledWorker Any idea how to perfectly adjust between DaskExecutor and KubernetesRun?
    r
    Dylan
    2 replies
    Copy to Clipboard
  • Zach Schumacher

    Zach Schumacher

    1 year ago
    how can i get the state of a task from a task object?
    Zach Schumacher
    Kevin Kho
    7 replies
    Copy to Clipboard