https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Alejandro Sanchez Losa

    12/22/2021, 8:51 PM
    ey !!! what do you think about that? it’s a crazy idea? imposible? … I am a bit lost
    a
    k
    • 3
    • 31
  • l

    Lucas Hosoya

    12/22/2021, 9:27 PM
    Hi, is there a way to get a Databricks job/run result in Prefect? My goal is to run a Databricks task and then get the result and put as parameter to the next task (dbutils.notebook.exit). Wondering if thats possible
    a
    a
    m
    • 4
    • 13
  • d

    Danny Vilela

    12/22/2021, 9:34 PM
    Hi! I have a
    @task
    -decorated function with
    max_retries=8, retry_delay=dt.timedelta(minutes=15)
    . However, I know that for certain kinds of errors, I’d actually want it to wait 30 minutes (or even an hour). Is there a way to implement this? I know I can probably just check for that exception then
    time.sleep
    for the extra time (for example, to wait for an hour I’d catch the exception,
    time.sleep
    for 45 minutes, then raise the error so that
    retry_delay
    kicks in), but I’m wondering if there’s a cleaner way to approach this. The above feels like a code smell but I’m not sure how I’d otherwise set context-specific retry delays on a task 🤔
    a
    k
    • 3
    • 2
  • l

    Leon Kozlowski

    12/22/2021, 9:44 PM
    Will a flow restart via the UI use the same
    prefect.context
    information at time of failure? For example
    scheduled_start_time
    ?
    a
    • 2
    • 2
  • a

    Alejandro Sanchez Losa

    12/22/2021, 10:59 PM
    Hi network, I know and I am sure I am the heaviest prefect user … and I apologized
  • a

    Alejandro Sanchez Losa

    12/22/2021, 11:06 PM
    I am trying to disengage tasks from the principal flow python file using ‘custom modules’ … following some public blog and code… I build a custom module to do that, when that flow are execute inside the github actions container as part of the ci/cd, works well, but when I try to start the flow from (example) prefect cloud … maybe because the flow use github store, when the agent try to run this flow just only get the file are reference by the store plugin … and avoid the module … so then fail … the questions is what other way will let me using github actions to register flows with custom modules and this will running well after ?
    k
    • 2
    • 3
  • r

    Ryan Sattler

    12/23/2021, 5:24 AM
    I’m using Prefect Cloud with Kubernetes and have an issue with a zombie job pod stuck in a “Pending” state (due to trying to allocate more resources than possible due to a configuration error). I can force delete it (kubectl delete <pod-name> --force) but it comes back immediately (age of the pod resets to 0s so I think it was deleted). There are no Prefect flows or k8s deployments associated with the pod and the link to the flow run in its k8s description gives a 404. How do I get rid of it?
    a
    • 2
    • 2
  • s

    Suresh R

    12/23/2021, 7:37 AM
    Hi , do prefect has any concurrent task run limitation on Cloud standard plan?
    a
    • 2
    • 1
  • a

    Alejandro Sanchez Losa

    12/23/2021, 10:31 AM
    Hi all I am trying to run task and wait for it … but I am getting and error “AttributeError: ‘FunctionTask’ object has no attribute ‘wait’” any ideas why ?
    a
    • 2
    • 4
  • h

    Henrietta Salonen

    12/23/2021, 1:23 PM
    Hi everyone, I’m new to Prefect and I’m trying to find information about best practices in implementing production and staging deployments. I’m curious to hear would it make sense to handle these as labels for flows & agents or would it make more sense to create separate Prefect projects for each environment? I’m trying to create a CI/CD workflow where push to certain branches in github will register flows to either Prefect staging or production. I have a CI/CD workflow already running for production but would like to be able to expand this approach.
    a
    k
    • 3
    • 4
  • j

    Jason Raede

    12/23/2021, 4:09 PM
    Hi yall. Working through deploying our first flow to production. We have several flows in a repository and some shared modules that are imported into each flow that contain tasks and other utility code. From the documentation it seems like the only way to ensure that the shared modules are available at flow execution time is to package them up in a Docker image and use a Docker/ECR/K8S agent. That feels a little heavy - is there any way to package up dependencies like that during pickling? Folder structure is below. The flow needs access to stuff in
    src/tasks
    and the tasks need access to stuff in
    src/utils
    •
    src
    ◦
    flows
    ▪︎
    my_flow.py
    ▪︎
    my_other_flow.py
    ◦
    tasks
    ▪︎
    shared_task_1.py
    ▪︎
    shared_task_2.py
    ◦
    utils
    ▪︎
    shared_lib_1.py
    Thanks!
    k
    k
    • 3
    • 6
  • l

    Leon Kozlowski

    12/23/2021, 9:07 PM
    Hey all - Im trying to build somewhat of a self healing flow, but I’m having trouble thinking about the best design for the goal I’m trying to achieve (writing details in thread to prevent large messagE)
    k
    • 2
    • 5
  • c

    chelseatroy

    12/23/2021, 9:24 PM
    Hi folks—I am trying to save a Snowflake private key passphrase in a PrefectSecret. The issue is, the Python cryptography library expects the passphrase for a rsa_key to be passed in bytes, and my secret has the value as a string. I cannot do
    PrefectSecret('PRIVATE_KEY_PASS_PHRASE').encode()
    because the encode method is on
    str
    and
    PrefectSecret
    returns a
    PrefectSecret
    . Anyone know how to do this?
    k
    • 2
    • 6
  • c

    Chun Shen Wong

    12/24/2021, 5:03 AM
    hi, would like to ask if it is possible to set prefect logging level through python. Thanks!
    k
    • 2
    • 2
  • a

    Anyelin Calderon

    12/24/2021, 12:24 PM
    Hi, I am executing a flow at 12:05 to run on 12/24 at 08:00 a.m. but when the schedule is about to run, the flow starts 1 second before and the schedule changes to the next day 12/24 08hr. I use prefect == 0.15.6
    a
    k
    • 3
    • 6
  • d

    Dohyeok Kim

    12/24/2021, 4:36 PM
    Can I get alerted when any agent becomes offline? I saw cloud querying to agents on UI
    k
    • 2
    • 5
  • h

    Hammad Ahmed

    12/24/2021, 7:53 PM
    Hey everyone, hope everyone is having a great day, I am not able to figure out if persisting and caching can be used together or not or if cache can cache results in GCS or other repositories
    k
    • 2
    • 5
  • j

    Jason Raede

    12/24/2021, 9:08 PM
    Is there a simple method of grabbing a property of an object returned by a task to pass as the input to a downstream task, without having a task explicitly to just pull out that property? E.g.
    @task
    def task1():
        return Object(foo='bar')
    
    @task
    def task2(foo: str):
        // Do something
    
    @task
    def getfoo(o: Object):
        return o.foo
    
    with Flow('my-flow'):
        task1_result = task1()
    
        // This fails
        task2(task1_result.foo)
    
        // This works, but is annoying
        task2(getfoo(task1_result))
    k
    • 2
    • 2
  • t

    tas

    12/25/2021, 1:53 PM
    Hi all, I am pretty new to prefect and I have come across the Orion announcement. Is it fine for someone totally new to start learning on Orion ?
    k
    u
    x
    • 4
    • 6
  • a

    Amruth VVKP

    12/26/2021, 6:40 PM
    I need some help with the below queries - 1. Does anyone know if Prefect Orion preview supports authentication? 2. Is there a better way to debug Prefect Orion based flows? I usually use VS Code debugger for existing Prefect stable release based project.
    k
    • 2
    • 3
  • a

    Andrey Tatarinov

    12/26/2021, 8:25 PM
    Hi everyone. Need some help with custom K8s Job Template. We need one of the PVCs to be accessible for a Flow run. I provided (what I thought a reasonable job template). And currently I get a behavior that looks like Job starts and exits immediately.
    k
    • 2
    • 9
  • v

    Varuna Bamunusinghe

    12/27/2021, 4:32 AM
    I have a pipeline with many steps to be implemented in Prefect. Is it possible to start the pipeline from a specific step in the middle if necessary? For example, if there's an error in a step, I'd like to fix the step and start from there instead of running from the start.
    k
    • 2
    • 3
  • a

    Akharin Sukcharoen

    12/27/2021, 6:38 AM
    How can I cancel all of the job that summited or pending in one time? Someone help me pls.
    k
    • 2
    • 7
  • m

    M. Siddiqui

    12/27/2021, 12:46 PM
    Hello guys, Wondering if someone has attempted something similar with DBT + Prefect. https://www.astronomer.io/blog/airflow-dbt-1
    k
    a
    • 3
    • 6
  • e

    Eduardo Apolinario

    12/27/2021, 11:52 PM
    Hi, everyone, happy holidays! I have a question about flow registration in Orion. I couldn't find anything about this in the docs. Am I wrong in assuming that this step will no longer be required in Orion? Instead now we "register" flows via Deployments, right?
    k
    • 2
    • 2
  • j

    Jason Raede

    12/28/2021, 1:58 AM
    Anyone know if the
    LocalAgent
    can run using the
    LocalDaskExecutor
    ? It’s parallelizing stuff fine locally when I just run
    flow.run
    but on a server running a local agent it looks like it’s going one task at a time (I’m just watching the logs in prefect cloud). I’m setting
    flow.executor
    before registering it, if that matters. I don’t see any indication in Prefect Cloud of what the flow’s executor is.
    👀 1
    k
    • 2
    • 9
  • a

    Andrea Nerla

    12/28/2021, 12:53 PM
    Hi folks, I'm having this problem: prefect doesn't run the flow functions in the order I want it to. In this case, it runs "transform" before "extract_load" (see code in the answers)
    a
    • 2
    • 6
  • t

    Tom Shaffner

    12/28/2021, 6:34 PM
    Does a local agent delete cached files when it's done with them? I have a machine with a local agent running designed for some larger, high-memory processes, and the disk keeps filling up with the ~/.prefect/results folder. Do I need to do something for cached files to be cleaned up automatically?
    k
    • 2
    • 15
  • b

    Bradley Hurley

    12/28/2021, 8:58 PM
    Hi All - I am working on a proof of concept to run Prefect in K8 via
    microk8s
    for local development alongside
    localstack
    . Ideally, this would allow us to port our existing prefect deployment to k8
    k
    • 2
    • 3
  • c

    Chris McLaughlin

    12/28/2021, 9:48 PM
    Hi everyone, I am just getting started with prefect and am struggling with what is probably a very basic concept. Is it possible to use a @task decorator with tasks from the task library? My primary use case is orchestrating dbt jobs and I would like to add a state handler for my tests to send a notification on task failure.
    k
    • 2
    • 3
Powered by Linen
Title
c

Chris McLaughlin

12/28/2021, 9:48 PM
Hi everyone, I am just getting started with prefect and am struggling with what is probably a very basic concept. Is it possible to use a @task decorator with tasks from the task library? My primary use case is orchestrating dbt jobs and I would like to add a state handler for my tests to send a notification on task failure.
k

Kevin Kho

12/28/2021, 9:51 PM
Hey @Chris McLaughlin, no worries! You don’t need a task decorator with those.
from prefect.tasks... import SomeTask

some = SomeTask(state_handlers=[mystatehandler])

with Flow(..) as flow:
    some(x=1, y=2)
c

Chris McLaughlin

12/28/2021, 9:52 PM
Ah, way easier than I thought! I'm assuming you could define a trigger in the same manner?
k

Kevin Kho

12/28/2021, 9:54 PM
Yes the tasks in the task library take in the same arguments as
@task()
. If your task has inputs, you can still stick them in there.
some = SomeTask(x=1, y=2, state_handlers=[mystatehandler], triggers=...)
with Flow(...) as flow:
    some()
🙌 1
View count: 2