• dh

    dh

    1 year ago
    hello community, could anyone help me understand if there’s an easy way to get FlowRunner “NOT” to submit a Task to executor if any of the task’s upstream tasks are not ready (e.g. in Pending states)? I think the current design decision is to submit regardless of the upstream states and let the TaskRunner decide if that task is runnable or not as per this code [1]. Because of this behavior, I am experiencing a UX problem where non-runnable tasks repeatedly get submitted to our SLURM cluster occupying resources for some time only to confirm those tasks are not actually ready yet due to upstream tasks. Would there a clean solution / workaround to this? [1]: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/flow_runner.py#L593
    dh
    Jim Crist-Harif
    10 replies
    Copy to Clipboard
  • Joël Luijmes

    Joël Luijmes

    1 year ago
    Hey, how can I provide secrets to the Kubernetes Agent? I noticed that I can set some envs at
    KubernetesRun
    but these are only strings. Is there currently an easy way to provide secrets as envs? My intuition says I have to provide a custom job yaml then, is that correct, or am I missing an easier way?
    Joël Luijmes
    Jim Crist-Harif
    +3
    11 replies
    Copy to Clipboard
  • j

    Joseph

    1 year ago
    I’m evaluating Prefect for some data engineering pipelines, but our organization is using Bamboo for CI/CD and it has some very obvious weaknesses (such as config via web rather than in code) and I’d be interested in trying Prefect as a Bamboo replacement. I understand that Prefect was created for data engineering pipelines. Is anyone aware of any write-ups of people using Prefect for more traditional CI/CD tasks in addition to data engineering? Alternatively are there obvious reasons why this is a bad idea?
    j
    Chris White
    4 replies
    Copy to Clipboard
  • Pedro Martins

    Pedro Martins

    1 year ago
    Hey! Following the above thread on secrets in k8s agent. How can ensure that prefect-jobs spawned by the agent contains the secret I specified? I'm running the Aircraft example from a notebook and connecting it to my server in the cluster. Simply passing image_pull_secrets to KubernetesRun do not work: keep getting the
    Error: ErrImagePull
    custom_confs = {
        "run_config": KubernetesRun(
            image="drtools/prefect:aircraft-etl", 
            image_pull_secrets=["regcred"], 
        ),   
        "storage": S3(bucket="dr-prefect"),
    } 
    
    with Flow("Aircraft-ETL", **custom_confs) as flow:
        airport = Parameter("airport", default = "IAD")
        radius = Parameter("radius", default = 200)
        
        reference_data = extract_reference_data()
        live_data = extract_live_data(airport, radius, reference_data)
    
        transformed_live_data = transform(live_data, reference_data)
    
        load_reference_data(reference_data)
        load_live_data(transformed_live_data)
    Pedro Martins
    Dylan
    +2
    34 replies
    Copy to Clipboard
  • r

    Richard Hughes

    1 year ago
    Hi, I am hoping to find some help writing a mutation to delete flows from a query. Any guidance on this so far what I can figure out from lack of understanding of the graphql
    mutation {
      delete_flow(input: 
      
        query {
      flow 
      (where: {name: {_eq: "MyFlowToDelete" }})
        {
    
        id
      }
    }
        ) {
        success,
        error
      }
    }
    r
    Dylan
    10 replies
    Copy to Clipboard
  • v

    Vitaly Shulgin

    1 year ago
    Hi, I'm rewriting some ETL jobs from bonobo based implementation to prefect, question, does Prefect support functionality like FixedWindow, to limit number of input elements?
    v
    Dylan
    +1
    9 replies
    Copy to Clipboard
  • Phil Glazer

    Phil Glazer

    1 year ago
    Any good guides for integrating/deploying with AWS? Apologies if I missed this elsewhere
    Phil Glazer
    Michael Adkins
    2 replies
    Copy to Clipboard
  • h

    Henrik Väisänen

    1 year ago
    Hey, I'm experimenting with prefect and dask workers running on multiple servers and trying to achieve the following: I run a flow from time to time that uses all the workers and would like to cache the results for future flow runs so that servers can access each others cache. The servers do not have a shared drive, and I can not bind the task to a specific server either. Based on https://github.com/PrefectHQ/prefect/issues/2636, having this kind of distributed cache is not possible currently out of the box with dask, or am I missing some crucial piece of prefect knowledge?
    h
    Michael Adkins
    5 replies
    Copy to Clipboard
  • Diogo Munaro

    Diogo Munaro

    1 year ago
    Hey guys, I'm Diogo and using Prefect for the first time and already in love with it ❤️ . I'm doing a Flow calling another registered flow with
    StartFlowRun
    , but I can't get results from that flow. Looking at the code,
    create_flow_run
    ignores result arg: https://github.com/PrefectHQ/prefect/blob/96ef85470872593268c9498b57ac9f0b5a268e01/src/prefect/tasks/prefect/flow_run.py#L160 Do you know a way to get Flow results? Here a test code:
    from prefect.tasks.prefect import StartFlowRun
    from prefect import Flow, task
    from prefect.engine.results.local_result import LocalResult
    
    graph_building = StartFlowRun(
          flow_name="test_flow",
          project_name="test_project",
          wait=True,
          result=LocalResult(".")
    )
    with Flow("Call Flow") as flow:
        end_flow = graph_building()
    
    state = flow.run()
    state.result[endflow].result #nothing here
    Diogo Munaro
    Dylan
    +1
    14 replies
    Copy to Clipboard