https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • l

    Lone Pine Account

    05/14/2021, 5:14 PM
    I'm trying to do is wrap that task with another that will tell me what happened when/if the task failed
  • l

    Lone Pine Account

    05/14/2021, 5:14 PM
    from prefect import task, Flow
    from prefect.tasks.secrets import PrefectSecret
    from prefect.tasks.databricks import DatabricksRunNow
    import requests
    
    
    conncfg = {"host":"<https://xxx.cloud.databricks.com/>", "token":"xxx"}
    
    
    job_json = {
          "job_id": 24242,
          "notebook_params": {}
        }
    
    
    @task
    def run_databricks(job_config):
        notebook_run = DatabricksRunNow(json=job_config).run()
        run_id = notebook_run(databricks_conn_secret=conncfg)
        headers = { "Authorization" : "Bearer xxx" }
        r = requests.get('<https://xxx.cloud.databricks.com/api/2.0/jobs/runs/get-output?run_id=%s'%(runid)>, headers=headers)
        results = r.json()
        output =  results['metadata']['notebook_output']
        if output.has_key('message'):
            raise Exception(output['message'])
    
    
    
    with Flow('testflow') as flow:
        run_databricks(job_json)
    k
    25 replies · 2 participants
  • l

    Lone Pine Account

    05/14/2021, 5:15 PM
    the DatabricksRunNow task works when part of a flow, but when part of a task I get:
  • l

    Lone Pine Account

    05/14/2021, 5:15 PM
    Unexpected error: AssertionError('A databricks connection string must be supplied as a dictionary or through Prefect Secrets')
  • l

    Lone Pine Account

    05/14/2021, 5:16 PM
    is there some scope problem here?
  • r

    Robert Bastian

    05/14/2021, 8:48 PM
    Hello! I’m seeing an issue where mapped tasks are limited to 2 in Prefect Cloud but are unlimited locally. Is this a standard plan limitation?
    k
    5 replies · 2 participants
  • h

    haf

    05/15/2021, 2:01 PM
    I'm trying to find the
    Secrets page
    in the cloud UI: I know I've been there, but I can't for the life of me find my way back there... Where is it? Could someone provide a link? Google says nothing and the docs link back to itself.
    k
    6 replies · 2 participants
  • h

    haf

    05/15/2021, 2:16 PM
    If I may point out a design inconsistency: that parameters and secrets sometimes are "values as declaration time" and sometimes "values as output by tasks" — this looks like an anti-pattern to me:
    k
    19 replies · 2 participants
  • h

    haf

    05/15/2021, 2:38 PM
    When trying to set parameters in the Cloud UI, the parameters input textarea alternates between the existing value, an empty object after I start to change it (back and forth making it impossible to insert anything there)
    j
    m
    6 replies · 3 participants
  • h

    haf

    05/15/2021, 2:43 PM
    The docs aren't reflecting the latest release, I just noticed. https://github.com/PrefectHQ/prefect/releases
    d
    2 replies · 2 participants
  • k

    Kha Nguyen

    05/15/2021, 4:42 PM
    I have been looking at the Executor options, and there is a DaskExecutor
    executor = DaskExecutor(
        cluster_class="dask_cloudprovider.FargateCluster",
        cluster_kwargs={
            "image": "prefecthq/prefect:latest",
            "n_workers": 5,
            ...
        },
    )
    Does it mean that Prefect can also create a Dask cluster when there is some flows to run, and tear down the Dask cluster on Fargate once all flows have been run? I already have a setup using ECSAgent. Does this executor mean that I don’t need that ECS agent any longer?
  • k

    Kha Nguyen

    05/15/2021, 4:44 PM
    I mainly use Prefect for forecasting. An important feature that I am looking for is to pass data (large pandas dataframe of about 1.5GB in csv, or numpy array) between tasks. Does Prefect already support serialising significant data between tasks? I saw something called PandasSerializer, but not sure what it is for.
    k
    6 replies · 2 participants
  • m

    Matej

    05/16/2021, 8:10 AM
    Hi i am going through the tutorials and https://docs.prefect.io/core/examples/mapping.html
    from prefect import task
    
    @task
    def add(x, y):
        return x + y
    
    add(1, 2)  # 3
    this example throws me:
    ValueError: Could not infer an active Flow context while creating edge
    and suggests using add.run() instead. In the tutorial however, they use add(1,2) regularly. Are the tutorials outdated for the newest version of prefect? edit: I am also unable to use .map as it is used in the tutorial:
    add.map([1, 10], [2, 3])
    throws the same error? How am I supposed to experiment with tasks without flow context?
    a
    1 reply · 2 participants
  • m

    Matej

    05/16/2021, 8:50 AM
    One more question, does prefect support looping of a subflow? that a part of the flow is repeated? It seems loop only works on a single task, what I have in mind is that I could loop through a chain of tasks.
    a
    k
    3 replies · 3 participants
  • m

    Mark McDonald

    05/16/2021, 1:26 PM
    We're seeing duplicate flow runs scheduled in Prefect Cloud. This flow id, for example, is currently duplicated: 5842f8d7-2f86-4c92-a291-3fcc87e6fbf7
    j
    8 replies · 2 participants
  • p

    Pedro Machado

    05/17/2021, 2:42 AM
    Hi there. I am trying to use docker storage with a custom Dockerfile that is based on an image that contains a conda environment. I am having an issue with the health checks failing, but the flow itself runs. Can you tell if I am doing something wrong? More details in thread.
    m
    4 replies · 2 participants
  • s

    Stéphan Taljaard

    05/17/2021, 7:09 AM
    Hi. Any issues with using poetry for dependency management (for Server + Agent on a single node)? Here and there there seems to have been issues, but besides potentially this, I can't see any reason to not use poetry?
    k
    1 reply · 2 participants
  • e

    Elliot Oram

    05/17/2021, 9:03 AM
    Hey folks is anyone else getting a deprecation warning on marshmallow? Something like:
    ...
    /home/rof/.pyenv/versions/3.9.1/lib/python3.9/site-packages/marshmallow/fields.py:198
    /home/rof/.pyenv/versions/3.9.1/lib/python3.9/site-packages/marshmallow/fields.py:198: RemovedInMarshmallow4Warning: Passing field metadata as a keyword arg is deprecated. Use the explicit `metadata=...` argument instead.
    ...
    Tried to update to the latest version of prefect and still no joy!
    n
    k
    +1
    9 replies · 4 participants
  • d

    Domantas

    05/17/2021, 12:06 PM
    Hello prefect, I'm trying to use secrets which are saved in the kubernetes. To access them, I'm using KubernetesRun agent parameter:
    job_template_path
    where there is a path to yaml file which is supposed to create environment variables and used them in the prefect code:
    apiVersion: v1
    kind: Pod
    metadata:
      name: secret-test-pod
    spec:
      containers:
        - name: test-container
          image: <http://k8s.gcr.io/busybox|k8s.gcr.io/busybox>
          command: [ "/bin/sh", "-c", "env" ]
          envFrom:
          - secretRef:
              name: mysecret
      restartPolicy: Never
    Yaml file template is took from kubernetes documentation(https://kubernetes.io/docs/concepts/configuration/secret/#use-cases) However, when I try to run it on prefect server, I got this error:
    "'spec'"
    (will attach an error log screenshot) Is there are anything wrong with the template? Is this approach is a good way to define and use secrets in the kubernetes agent? P.S I tried to define
    env
    parameter with secrets by using KubernetesRun agent but it is not a good option for this case since all secrets could be visible as a plain text in the prefect server UI.
    d
    2 replies · 2 participants
  • p

    Pedro Machado

    05/17/2021, 1:29 PM
    Hi everyone. I am reposting this message from Sunday which may be overlooked. Thanks for your help!
    k
    5 replies · 2 participants
  • z

    Zach Schumacher

    05/17/2021, 3:09 PM
    When i specify a
    job_template
    in my
    KubernetesRun
    config, I’m stilling seeing default set in the UI. I believe previously I would see the template there. Anyone know what is going on? We are running prefect cloud and my core version is
    0.14.19
    k
    l
    7 replies · 3 participants
  • t

    Trevor Kramer

    05/17/2021, 3:35 PM
    I have a mapped task that needs to wait until a previous mapped task is 100% complete before starting. If I add upstream_tasks=[fp_validation_predicted_locations] it kicks off task 1 when task 1 of fp_validation_predicted_locations finishes. How can I have it wait until they are all finished? Can I do upstream_tasks=[unmapped(fp_validation_predicted_locations)]?
    k
    14 replies · 2 participants
  • j

    Joseph Loss

    05/17/2021, 3:56 PM
    does anyone have an example of using "on_failure" in a flow argument to re-run the whole flow after a certain period? I see the examples for specific tasks, would it work the same way for a flow I'm guessing?
    k
    28 replies · 2 participants
  • h

    haf

    05/17/2021, 4:13 PM
    Editing schedules isn't loading the schedule information in Cloud.
    n
    m
    6 replies · 3 participants
  • d

    Damien Ramunno-Johnson

    05/17/2021, 4:52 PM
    I am trying out Prefect cloud Enterprise and the account page shows I can add 100 users but when I try to add more members I get
    Your team has no users available; upgrade your plan to get more users and access to more features!
    Any ideas?
    k
    d
    5 replies · 3 participants
  • d

    DK

    05/17/2021, 6:38 PM
    Hi Everyone, I'm looking at switching to Prefect, but I have a question: I currently have a scheduling system that is organized by different "jobs". These jobs are essentially python classes that can be instantiated with varying parameters. For example, I have a job class that moves files from an SFTP to a local directory and I create multiple instances of it through the UI and specify the parameters through the UI as well without creating additional code. Is it possible to do something similar with Prefect? Part of the advantage of the above system is that I can have non-technical people add jobs without adding any code or doing a release. Perhaps I'm not thinking about Prefect's tasks and flows correctly, but any hep or clarity would be appreciated.
    k
    9 replies · 2 participants
  • e

    Enda Peng

    05/17/2021, 8:20 PM
    Is there a way to remove the local label added by
    flow.register()
    function? I notice that if I use
    prefect register
    command line tool, the label is exactly what I provided, but if I use python code, it will be added. I read the doc and it says this is the desired behavior, wonder if there is a way to get it removed?
    k
    4 replies · 2 participants
  • d

    Damien Ramunno-Johnson

    05/17/2021, 10:17 PM
    One more question, in the docs
    A result subclass that matches the storage backend of your 
    prefect.config.flows.storage
     setting will automatically be applied to all tasks, if available; notably this is not yet supported for Docker Storage
    Does this mean we can not use checkpointing if using Docker Storage?
    d
    22 replies · 2 participants
  • w

    Walee

    05/17/2021, 11:22 PM
    Hey folks, we are trying to move our task execution from ECS to EKS but running into this error as the agent tries to roll out pods for the job:
    TypeError: '<' not supported between instances of 'NoneType' and 'datetime.datetime'
    [2021-05-17 23:14:44,507] ERROR - agent | Error while managing existing k8s jobs
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/agent/kubernetes/agent.py", line 383, in heartbeat
        self.manage_jobs()
      File "/usr/local/lib/python3.7/site-packages/prefect/agent/kubernetes/agent.py", line 236, in manage_jobs
        pod_name
    TypeError: '<' not supported between instances of 'NoneType' and 'datetime.datetime'
    we’re using the config generated from the prefect cli for kubernetes agent. Any idea on how to work resolve it?
    m
    4 replies · 2 participants
  • j

    Joseph Loss

    05/18/2021, 2:14 AM
    sorry I know that's a lot, maybe add it to the task example library?
Powered by Linen
Title
j

Joseph Loss

05/18/2021, 2:14 AM
sorry I know that's a lot, maybe add it to the task example library?
View count: 1