https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • p

    Peter Roelants

    01/11/2021, 7:20 PM
    Hi Prefect Community, I'm new to Prefect and am trying to learn to work with the tool. However, I'm struggling to understand how to properly decouple building of Storage artefacts, and registering/running these artifacts at a later moment in time. It seems that with
    flow.register
    the creation and registration need to happen in the same call. Is there an example somewhere on how to decouple these steps? For example how to create and store a Docker build artefact that encapsulate a flow, and running/registering the flow stored in the Docker artefact at a later time without access to the original flow file.
    b
    s
    +1
    7 replies · 4 participants
  • j

    Joël Luijmes

    01/11/2021, 7:33 PM
    Hey got another question concerning retries. • What happens if I don’t specify way to persist results? Is it possible that an upstream task is rerun to get the input? Or are teh inputs stored in memory? • And if if the setup of the context manager is the upstream, yield that the same behavior? • And does anything change if I run the flow on my kubernetes agent or locally? I noticed for example that - running locally - my context manager setup task was ran again on a task failure. I’m trying to figure out why exactly, and how to mitigate 😅 And, what is the best way to persist intermediate results for kubernetes? A local volume, such that it deletes all results after completion?
    k
    2 replies · 2 participants
  • j

    Jeremy Phelps

    01/11/2021, 10:16 PM
    Hi all, what version of the
    google.cloud
    Python library does Prefect depend on?
    k
    2 replies · 2 participants
  • j

    Jeremy Phelps

    01/11/2021, 11:10 PM
    How do you get your Google credentials into Kubernetes? I have a Google credentials JSON file on the Agent machine, but when my flow tries to run in Kubernetes, this error happens:
    Failed to load and execute Flow's environment: DefaultCredentialsError('Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see <https://cloud.google.com/docs/authentication/getting-started>')
    c
    4 replies · 2 participants
  • v

    Vincent

    01/11/2021, 11:47 PM
    Has anyone ever seen these null labels on the prefect web ui (Grey bars)
    k
    6 replies · 2 participants
  • p

    Philip MacMenamin

    01/12/2021, 1:10 AM
    Hi, I'd like to write a task that is triggered on triggers.always_run, but take two different actions depending on if any tasks failed, or if all tasks were successful. Any pointers? As in something similar to:
    @task(trigger=prefect.triggers.always_run)  # one new line
    def tear_down_cluster(cluster):
        if all_successful:
            print(":)")
        else:
            print(":-/")
    k
    9 replies · 2 participants
  • a

    Amanda Wee

    01/12/2021, 4:29 AM
    Just a heads up: the docs currently state that "After registration, the flow will be stored in the specified bucket under 
    s3-flow/<slugified-current-timestamp>
    ." over here: https://docs.prefect.io/orchestration/flow_config/storage.html#aws-s3 and also: "If this key is not provided the Flow upload name will take the form 
    slugified-flow-name/slugified-current-timestamp
    ." over here: https://docs.prefect.io/api/latest/storage.html#s3 It turns out that the latter is correct (which was not what I expected grr)
    c
    m
    9 replies · 3 participants
  • k

    Klemen Strojan

    01/12/2021, 8:55 AM
    Maybe this is a dumb question, but Prefect Cloud UI should be compatible with Microsoft Chromium Edge, correct?
    1 reply · 1 participant
  • s

    Sven Teresniak

    01/12/2021, 9:14 AM
    What is the most elegant way to restart a (formerly failed) flow run from zero? I know the "resume button" in the UI to restart beginning with the first failed task. But I want to restart from scratch but with the SAME parameters and without copy&paste of parameter jsons. Can I do this with the STATE system?
    j
    2 replies · 2 participants
  • s

    Sven Teresniak

    01/12/2021, 9:15 AM
    I just stumbled upon the artifact API. That's awesome. The direction Prefect is heading is great. Thanks for the good work, guys!
    🚀 3
  • t

    Tim Pörtner

    01/12/2021, 1:18 PM
    Hi, is there a way of explicitly add a Parameter to a flow? I have a flow and its tasks are defined by a variable. This variable is already a Parameter but since i'm not using it in one of the tasks it doesn't show up on the web frontend. A very simple example to that would be this:
    with Flow("test-flow") as flow:
        tasks_to_run = {
            (here are information on which tasks should be added to the flow)
        }
        parameter_tasks_to_run = Parameter("tasks_to_run", default=tasks_to_run)
    Can i force it so that this Parameter shows up on the web frontend without actually using the parameter in a task within the flow?
    j
    5 replies · 2 participants
  • a

    Adam

    01/12/2021, 1:21 PM
    Hello friends, we’re using a paid plan on Prefect Cloud (1 paid seat, $100 / month) but we are now seeing this error when trying to register new flows:
    This tenant already has the maximum number of flows
    . We have 25 flows in 1 project - I don’t find any reference to an upper limit on flows on the pricing page? Any help would be much appreciated
    j
    2 replies · 2 participants
  • d

    Dolor Oculus

    01/12/2021, 2:08 PM
    I'm using the
    Docker
    flow storage with
    base_image
    , and need to add a file to the image which can only be determined at registration time (flow.storage assignment). I've tried various combinations of
    build_kwargs
    and
    extra_dockerfile_commands
    but am not having any luck. Would appreciate any feedback or pointers!
    my_new_file = "/valid/path/to/file.json"
    flow.storage = Docker( 
        base_image=valid_image_name,
        local_image=True,
        ignore_healthchecks=True,
        build_kwargs={'my_new_file': my_new_file}, 
        extra_dockerfile_commands=['ARG my_new_file', 
                                   'COPY $my_new_file .'])
    Once it gets to the COPY command I get the error
    COPY failed: stat /data/docker/tmp/docker-builder092360870/valid/path/to/file.json: no such file or directory
    I've also tried just the basename of the file as input into the build_kwargs, with no avail. How do I get a file into that temporary docker builder dir? Cheers
    v
    9 replies · 2 participants
  • s

    Sean Talia

    01/12/2021, 5:33 PM
    does anyone know if it's possible to access the labels passed to a flow during the flow's execution / within the flow context? I'd like to implement some logic that says something like:
    with Flow("test-flow") as flow:
        if "label1" in flow.labels:
            function1()
        elif "label2" in flow.labels:
            function2()
    s
    j
    7 replies · 3 participants
  • z

    Zach

    01/12/2021, 6:13 PM
    Were there any breaking changes between prefect 12 and prefect 13? Also between prefect 13 and 14?
    j
    2 replies · 2 participants
  • a

    Alex Rud

    01/12/2021, 6:21 PM
    👋
    👋 1
    d
    1 reply · 2 participants
  • a

    Alex Rud

    01/12/2021, 6:26 PM
    Hi all... Is there a way to implement a dynamic
    manual_only
    trigger? I'm trying to do the following: read file into dataframe -> run GreatExpectations validation on dataframe -> if validation passes continue with next task, if not allow the next task to run when manually triggered
    j
    b
    9 replies · 3 participants
  • v

    Vincent

    01/12/2021, 6:42 PM
    Hi- Does anyone know how to diagnose a k8s job failure with no error messages? After I submitted a job, the 300 worker pods successfully connect to the scheduler and the jobs are set to running in the UI. However, after 8-12 minutes the pod running the scheduler/flow-runner exits (and is resubmitted because the job is still active). The cloud-ui shows still shows the job as running. Thanks for any advice!
    a
    4 replies · 2 participants
  • a

    Arnulfo Soriano

    01/12/2021, 7:50 PM
    Hi, Im trying to migrate some code from using Prefect enviroments to Prefect executors since enviroments have been depracated. When using DaskKubernetesEnvironment the below code registers just fine and works how we want it to work nicely.
    DaskKubernetesEnvironment(metadata={'image': 'someImage'},
                                             worker_spec_file=os.path.join(os.path.dirname(os.path.abspath(__file__)),
                                                                           f'./{worker_spec}'),
                                             min_workers=min_workers,
                                             max_workers=max_workers,
                                             labels=['DASK'])
    However when using DaskExecutor:
    cluster = KubeCluster(make_pod_spec(image = 'someImage')).from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
    #somethings i've tried
    #cluster = KubeCluster.from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
    DaskExecutor(cluster_class=lambda: cluster,adapt_kwargs={"minimum": min_workers, "maximum": max_workers},)
    I keep getting on the registering step:
    File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/core.py", line 512, in _start
        await ClusterAuth.load_first(self.auth)
      File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 85, in load_first
        raise auth_exc
      File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 71, in load_first
        await auth_instance.load()
      File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 125, in load
        kubernetes.config.load_kube_config(
      File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 661, in load_kube_config
        loader = _get_kube_config_loader_for_yaml_file(
      File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 624, in _get_kube_config_loader_for_yaml_file
        raise ConfigException(
    kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.
    v
    s
    4 replies · 3 participants
  • j

    john Pedersen

    01/12/2021, 7:54 PM
    is there a way to monitor prefect with zabbix ? to get state out of failed flows and other useful metrics. sorry for noob question but i am unable to finde infomation on this topic in docs.
    j
    1 reply · 2 participants
  • a

    Akshay Deodhar

    01/12/2021, 10:23 PM
    Hi everyone, I am currently evaluating Prefect as a workflow orchestration platform for our team. Reading thru some docs, I got the sense that Prefect Cloud is not fully managed. Is there a fully managed version of Prefect cloud, similar to what MWAA (AWS) or Astronomer provide for Apache Airflow?
    j
    3 replies · 2 participants
  • m

    matta

    01/13/2021, 1:23 AM
    If you use map on a Local Executor, does it necessarily execute in the order in which the arguments are provided?
    c
    1 reply · 2 participants
  • m

    M Taufik

    01/13/2021, 4:22 AM
    Hi I'm trying to create ingress for graphql in K8s cluster, but the prefect UI can't reach the graphql ingress ip. is there anyone have successful expose graphql using K8s ingress? Thank you
    4 replies · 1 participant
  • k

    Krishna Devenda

    01/13/2021, 9:47 AM
    Hi TL;DR: How to overwrite context in
    config.toml
    by context provided in prefect UI's context section while running a flow? I am facing problem with updating/overwriting context values from prefect UI. I have defined a user configuration file for the agent at
    ~/.prefect/config.toml
    with following syntax
    [context]
    postgresdb = "production"
    Now, I want to update this attribute
    postgredb
    from the UI by passing context with same name and different value like following
    {
      "postgresdb": "test"
    }
    When I obtain this context name in task, it gives me the value from
    config.toml
    not the updated value from the context provided in the UI
    print(prefect.context.get('postgresdb'))
    >>>production
    Can someone guide me through this? Please. I have read the documentation multiple times and have also gone through the prefect source code in an attempt to understand how contexts get merged but I could not understand it. From the code, it appears that whatever context is passed through
    **kwargs
    overwrites the context in the
    config.toml
    . I believe prefect UI is passing context via
    **kwargs
    only.
    j
    4 replies · 2 participants
  • e

    Eric

    01/13/2021, 10:16 AM
    Hi Prefect, I'm facing problem with registering a flow with the set schedule to Prefect, the webpage shows I've set the schedule correctly but there isn't any scheduled flow-run raised. Is there any step I missed here? Thank you!
    g
    j
    4 replies · 3 participants
  • a

    Arsenii

    01/13/2021, 11:57 AM
    Heyo! The new (v0.14) "passing
    flow.serialized_hash
    to
    flow.register
    as the `idempotency_key`" way of registering flows is very useful for CI pipelines, however it's not really viable if you have to spend a lot of time building
    Docker
    flow storage object before calling
    flow.register()
    . Can anyone please share a quick way of getting idempotency key for a specific flow name through GraphQL, if there is one? I am thinking of looping through all the available flows, and using the serialized hash to automatically filter out changed flows. Right now we are using a clunky
    gitpython
    solution that basically does
    git diff
    on flow source files between tagged releases on a Github repo. Thank you.
    m
    3 replies · 2 participants
  • s

    Slackbot

    01/13/2021, 4:50 PM
    This message was deleted.
    j
    1 reply · 2 participants
  • d

    Dane Gardner

    01/13/2021, 4:51 PM
    Hi folks! I'm trying to determine best practices/idioms for deploying and executing code shared between Flows without having everything in a single file. I'm using ECS for running my flows, and everything works great with S3/CloudPickle serialization as long as the code is dead simple (i.e. hello-world), but everything breaks down if I reference another file in my code. I have yet to find an example that utilizes more than one file to define a Flow, but surely there must be a way to structure code more sanely for large, complicated Flows. The various
    Storage
    classes seems to only be able to reference a single file, not entire modules/directories. Are there any examples of a somewhat complicated suite of flows with code shared between them?
    j
    c
    +1
    11 replies · 4 participants
  • f

    Felix Schran

    01/13/2021, 6:02 PM
    Hi all, I have the following problem. Currently I have two tasks defined similar to the following:
    @task(
        
    _name_="Extract",
        
    _checkpoint_=True,
        
    _result_=LocalResult(cache_dir),
        
    _target_="{task_name}--{today}"
    )
    _def_ extract(x):
       
    return 1
    @task(
        
    _name_="Transform",
        
    _checkpoint_=True,
        
    _result_=LocalResult(cache_dir),
        
    _target_="{task_name}--{today}"
    )
    _def_ transform(x):
       
    return x * 2
    Caching works with this example. With each run of the flow, prefect looks for a file "transform--2021-13-01" and if it exists, it uses the cached result. I want to add the following features: 1. eWhenever, I change the source code of theWhenever I change the source code of the
    extract
    task (say to
    return 20
    instead of
    return 1
    ) I obviously don't want to use the cached result (i.e.
    1
    ) as an input for the next task. Instead I want to make
    extract
    recompute whenever I change the content of
    extract
    . How can I do that? 2. When the result of an upstream DAG changes, I want to execute the DAG which follows from that point downwards (although the downstream tasks might also be tasks with a cache). For instance, in this example, I want
    transform
    to take the new input of 20 and compute again with that input (although the result is already cached).
    j
    6 replies · 2 participants
  • m

    Marc Lipoff

    01/13/2021, 8:22 PM
    After a
    RunGreatExpectationsValidation
    tasks, whats the best way to grab the validation results, and log them? It doesn't appear I can do it with a state handler.
    m
    9 replies · 2 participants
Powered by Linen
Title
m

Marc Lipoff

01/13/2021, 8:22 PM
After a
RunGreatExpectationsValidation
tasks, whats the best way to grab the validation results, and log them? It doesn't appear I can do it with a state handler.
I'd like something like: if the validation is successful, continue (insert the dataframe into a database). if failure to validate, log validation
m

Michael Adkins

01/13/2021, 9:12 PM
Are you looking for
case
https://docs.prefect.io/api/latest/tasks/control_flow.html#case ?
m

Marc Lipoff

01/13/2021, 9:13 PM
RunGreatExpectationsValidation
raises a signal.FAIL. How do I deal with that gracefully and branch (via case or something else)?
m

Michael Adkins

01/13/2021, 9:18 PM
Ah I see. It may be reasonable to add a config option to that takes that returns the result in failure cases as well as successes. However, you should be able to just use a state handler.
m

Marc Lipoff

01/13/2021, 9:18 PM
ok awesome
how do i make the "database insert" task dependant on the GE validation?
m

Michael Adkins

01/13/2021, 9:21 PM
I’m looking into an example
import prefect


PRETEND_VALIDATION_PASSED = True


def handle_failed_validation(task, old_state, new_state):
    if isinstance(new_state, prefect.engine.state.ValidationFailed):
        return prefect.engine.state.Success(result=new_state.result)
    return new_state


@prefect.task(state_handlers=[handle_failed_validation])
def fail_task():
    class FakeResult:
        success: bool = PRETEND_VALIDATION_PASSED

    if PRETEND_VALIDATION_PASSED:
        return FakeResult()

    raise prefect.engine.signals.VALIDATIONFAIL(result=FakeResult())


@prefect.task()
def check_ge_result(result):
    return result.success


@prefect.task(log_stdout=True)
def log(message):
    print(message)


with prefect.Flow("test") as flow:
    result = fail_task()
    validation_passed = check_ge_result(result)
    with prefect.case(validation_passed, True):
        log("Handle validation success using result")

    with prefect.case(validation_passed, False):
        log("Validation failed, log it or whatever")

flow.run()
m

Marc Lipoff

01/13/2021, 9:30 PM
ok great. ill give this a try
View count: 1