https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Ramzi A

    12/29/2021, 2:35 AM
    Hey everyone I am wondering is there a way to keep versioning in prefect limited to when actual changes are made in the code not every-time a flow is registered? an example is I have a flow factory and anytime I add a new flow I just register the flow factory this creates a new versions for all my flows but none of the code has changed.
    k
    9 replies · 2 participants
  • a

    Aqib Fayyaz

    12/29/2021, 7:27 AM
    Hi, i am getting this error on prefect cloud when i am trying to run the flow.
    k
    20 replies · 2 participants
  • m

    Michal Baumgartner

    12/29/2021, 2:29 PM
    👋 everyone, is there a way to restart the flow (up to N times) if any task throws a specific exception (e.g. the underlying dask cluster/scheduler dies)? (preferably without storing task results and with checkpointing disabled)
    k
    10 replies · 2 participants
  • c

    Carlos Paiva

    12/29/2021, 5:41 PM
    Hi All, Another noob question: having the given Flow
    with Flow(name="name", storage=storage, run_config=run_config) as flow:
        # Pipeline parameters
        event = Parameter('event', required=True, default={})
    
        x = event.get("body", None)
    How can I access the dictionary data? I am getting
    AttributeError: 'Parameter' object has no attribute 'get'
    n
    4 replies · 2 participants
  • e

    Enda Peng

    12/30/2021, 1:46 AM
    Is the feature that loading api_key from
    ~/.prefect/config.toml
    deprecated? I tried with
    prefect agent local start -t <my-token>
    which succeeds, however, after I save it under config.toml and restart without
    -t
    , it complains about missing API key, this is my config.toml file
    [xxx]# cat ~/.prefect/config.toml 
    [cloud]
    api_key = "*******"
    k
    m
    +1
    20 replies · 4 participants
  • a

    Anh Nguyen

    12/30/2021, 4:10 PM
    i want to active flow A , but flow B are in process and has same function as flow A too. So I want to complete flow A first then flow B will be available automatically.
    k
    3 replies · 2 participants
  • s

    Sean Leakehe

    12/30/2021, 4:32 PM
    Can someone help me figure out what causes these
    UnixHTTPConnectionPool
    errors in Prefect Cloud? These happen seemingly at random. I have this particular flow set to retry 3 times and it I got this on all 3 retries.
    k
    17 replies · 2 participants
  • c

    chelseatroy

    12/30/2021, 5:17 PM
    Hi folks! We want to run an incremental flow in Prefect. I.e., we move some data from Snowflake to a AWS SageMaker feature store. We have this part working. However, what we want to do is run the flow on a regular schedule, and each time, only move the rows in Snowflake that are not already moved to AWS from the previous run. Is there a good way to schedule incremental runs like this in Prefect itself? If someone knows of an example to point us to of how this is done, we'd be grateful 🙂
    k
    3 replies · 2 participants
  • c

    Christoph Deil

    12/30/2021, 5:21 PM
    Are flow objects mostly similar or fundamentally different in Prefect Core vs Prefect Orion? I’d be specifically interested in why in Prefect Core you decided to make Flow a context manager and not a decorator like it is now in Prefect Orion. Was that a more or less arbitrary choice, or is there a deeper reason why in the old API a decorator wasn’t possible or desirable? There’s the statement at https://www.prefect.io/blog/announcing-prefect-orion/ that now flows aren’t DAGs any more. But I couldn’t find an explanation what they are now. I presume the prefect.flow decorator still does some analysis and generates a DAG-like data structure that represents task dependencies? Is this change in what a flow object is related to the change context manager -> decorator? https://docs.prefect.io/core/concepts/flows.html https://orion-docs.prefect.io/api-ref/prefect/flows/
    j
    d
    +1
    5 replies · 4 participants
  • t

    Tom Shaffner

    12/30/2021, 8:38 PM
    I've got a case where I start a local flow with num_workers in a LocalDaskExecutor set to 6, but the task seems to start many more tasks than that. For example, that flow is currently showing 15 tasks running simultaneously. Is there something else needed to limit the number of simultaneous tasks? It overloads the machine as is.
    k
    32 replies · 2 participants
  • a

    An Hoang

    01/02/2022, 1:24 AM
    hi there,
    task_run_name
    when templated with input names can be very helpful in debugging mapped tasks. Are they only available on Prefect backend (Cloud and Server) and not Core? If I'm doing local debugging with
    flow_result = flow.run()
    , let's say I have a flow with
    task_a
    mapped 1000 times and
    task_a[25]
    failed but the other indices succeed. What's the quickest way to find out which input caused it to fail? I don't think I can access the result of
    flow_result.result[task_a].result[25]
    k
    2 replies · 2 participants
  • f

    Fina Silva-Santisteban

    01/02/2022, 7:28 PM
    Hi everyone, I’m looking for suggestions for how to use prefect together with a Spark cluster running on aws. Is there an equivalent to Airflow’s SparkSubmitOperator ? I’ve only found a DataBricksTask in the docs, but I’m not interested in getting a Databricks subscription at this point.
    a
    k
    7 replies · 3 participants
  • t

    Tom Klein

    01/02/2022, 7:55 PM
    what's the proper way to make a Kubernetes agent (and a Kubernetes Run) use an S3 storage for the flows? we tried the k8s
    service account
    but it seems insufficient and we noticed some warning in the docs about that but couldn't really decipher what it would mean for us since we don't use these methods to define permissions:
    a
    20 replies · 2 participants
  • j

    John Muehlhausen

    01/02/2022, 11:06 PM
    I'm on "account settings" -> "account" and I don't see a way to see my past payment activity. Where do I see a recap of each credit card charge?
    a
    3 replies · 2 participants
  • o

    Ovo Ojameruaye

    01/03/2022, 6:35 AM
    Hi everyone, I am having issues with one task in my flow and not sure why. I see this on the logs when the task the run. Other tasks run find and I have tried a different flow on the same agent with this error. I have also tried to run the flow as a process with flow.run() and it completes successfully
    a
    18 replies · 2 participants
  • j

    jcozar

    01/03/2022, 7:57 AM
    Hi everyone! First of all happy new year everybody 🙂 I feel that I don’t fully understand the Docker Storage. As far as I know, you need to serialize the flow (using cloudpickle) and when the prefect agent requests a flow run, it downloads the flow and run it. However, if you are using the Docker Storage I don’t understand why cloudpickle is still used, as I expect that the whole flow is pushed as a docker image to some repository. My question comes from a project where I use docker storage (the python 3.8 image
    prefecthq/prefect:0.15.11-python3.8
    ), but I am using python 3.9 to register the flow to prefect cloud. It fails because I am using different python versions. Could you please clarify me how it works? Thank you very much!
    a
    5 replies · 2 participants
  • i

    Ido Slonimsky

    01/03/2022, 7:58 AM
    Hey all, happy new year! I have an odd failure I'm having trouble understanding. I have a flow with a single parameter input that runs on a Fargate cluster with a Dask executor. With some parameters it runs prefectly 😉 While with others, I receive the following error:
    botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: TaskDefinition is inactive
    after this:
    Creating a new Dask cluster with `dask_cloudprovider.aws.ecs.FargateCluster`...
    In the run config, both when it succeeds and when it fails the task_definition is null, and all the other settings are the same, has anybody encountered a similar issue and has any idea what could go wrong?
    a
    4 replies · 2 participants
  • s

    Suresh R

    01/03/2022, 8:01 AM
    Hi! is there a option in prefect something similiar to smart sensor in airflow?
    a
    c
    13 replies · 3 participants
  • k

    Kostas Chalikias

    01/03/2022, 8:06 AM
    Hi everyone, we are cloud users and are getting a 'Usage Exceeded' message in our agent logs as of last night ~9pm GMT. Could this be some kind of new year related issue? I don't remember hearing about a hard limit on runs but that if our usage was higher than planned we'd get a notification to discuss new limits/rates?
    a
    3 replies · 2 participants
  • a

    Andrey Vinogradov

    01/03/2022, 10:25 AM
    Hi everyone! I see that there is an option “max_scheduled_runs_per_flow” in prefect server that was added in August but what about PrefectHQ/prefect? When it will be added there?
    a
    k
    2 replies · 3 participants
  • i

    Isara De Silva

    01/03/2022, 11:37 AM
    Hi, is there a possibility to scale prefect ECS-agents (auto scaling) and if so can someone point me in the right direction, Im struggling to understand how a new agent would register with prefect (and with existing flows)
    a
    3 replies · 2 participants
  • t

    Thomas Opsomer

    01/03/2022, 3:31 PM
    Hello prefect community 🙂, We're using prefect with K8S. Sometime some tasks fail because the pods that run them get killed (for OOM or anything else, doesn't matter), which leads to a "pod missed heartbeat..." message. The issue is that when a flow/task fails like this, the retry mechanism and the slack handler doesn't work. Is there a way to retry tasks on this kind of failure or/and to get any notification about the status ?
    k
    3 replies · 2 participants
  • k

    Kevin Weiler

    01/03/2022, 4:01 PM
    good morning and happy new year! Is it possible to have a task that runs if: 1. some upstream tasks are successful AND 2. some upstream tasks are successful or failed (just completed)
    k
    17 replies · 2 participants
  • a

    An Hoang

    01/03/2022, 4:34 PM
    When testing flows locally, because of breadth first execution, my flow's mapped task is storing a large number of high-memory objects at the same time that cause memory error/segmentation fault. Is there a way to fix this?
    k
    1 reply · 2 participants
  • c

    Constantino Schillebeeckx

    01/03/2022, 5:37 PM
    Happy New Year everyone! Is there a way, from within an active Flow, to know whether the flow was triggered by cron vs Quick run vs Run?
    k
    a
    6 replies · 3 participants
  • a

    An Hoang

    01/03/2022, 6:06 PM
    Is there a way to set terminal task for the flow after its creation? I see that the flow object has
    set_reference_task()
    but not
    set_terminal_task()
    . I'm trying to debug by stopping the flow early and inspect the results
    k
    3 replies · 2 participants
  • d

    dammy arinde

    01/03/2022, 6:10 PM
    Happy New Year everyone! When I run a GE checkpoint in GE it works, but in prefect task, I'm getting the error:
    TypeError: 'Checkpoint' object is not subscriptable
    k
    a
    43 replies · 3 participants
  • c

    Chris McLaughlin

    01/03/2022, 8:40 PM
    Hey everyone - I'm a relatively new user and am struggling with my development workflow. I have found good introductory resources for running both locally and using prefect cloud but what I am struggling with is how to develop/debug locally and deploy to the cloud. For some context, we currently just have 1 flow that will simply be executing dbt commands. We are deploying to azure and using docker. I have a task that clones our dbt repo from github before executing the commands. We also have an action to register our flow each time the flow is updated in github. Can anyone point me in the direction of best practices for developing and testing locally while using cloud for production?
    k
    1 reply · 2 participants
  • j

    Josh

    01/03/2022, 9:07 PM
    how do i view previous versions of a flow and see their creation date and by whom? https://docs.prefect.io/orchestration/concepts/flows.html#ui
    k
    3 replies · 2 participants
  • s

    Sam Werbalowsky

    01/03/2022, 10:26 PM
    Parameters and Case statements are appearing to be less intuitive than I had thought…I can’t figure out a simple way to do something like the following:
    val = Parameter("param", False):
    
    case(val, True):
        do_stuff()
    
    case(val, False):
        dont_do_stuff()
    If my paramter is an actual value, i.e. “value”, it skips both because it’s neither True nor False. So I’m left to make another task to check the value. Is this correct?
    k
    5 replies · 2 participants
Powered by Linen
Title
s

Sam Werbalowsky

01/03/2022, 10:26 PM
Parameters and Case statements are appearing to be less intuitive than I had thought…I can’t figure out a simple way to do something like the following:
val = Parameter("param", False):

case(val, True):
    do_stuff()

case(val, False):
    dont_do_stuff()
If my paramter is an actual value, i.e. “value”, it skips both because it’s neither True nor False. So I’m left to make another task to check the value. Is this correct?
Logs just show this
[2022-01-03 17:21:27-0500] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "VALUE" did not match "False"')
[2022-01-03 17:21:27-0500] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2022-01-03 17:21:27-0500] INFO - prefect.TaskRunner | Task 'case(False)': Finished task run for task with final state: 'Skipped'
[2022-01-03 17:21:27-0500] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "VALUE" did not match "True"')
k

Kevin Kho

01/03/2022, 10:35 PM
I think that is right, except for the fact that the second input of
case
is not limited to True or False. It can be a constant number for example:
from prefect import Flow, task, case, Parameter
import prefect 

@task
def do_something():
    <http://prefect.context.logger.info|prefect.context.logger.info>("this ran")
    return 1

with Flow("conditional-branches") as flow:
    cond = Parameter("cond", 3)

    with case(cond, 3):
        val = do_something()

flow.run()
:upvote: 1
And this is why in Orion you can use the native Python if also (and there is also type checking).
s

Sam Werbalowsky

01/03/2022, 10:38 PM
gotcha - so basically if I want the task to run for ANY non-false parameter value, I’ll need the helper. Waiting for Orion!!
k

Kevin Kho

01/03/2022, 10:39 PM
Yes I think that’s right
View count: 6