https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tim Enders

    08/24/2021, 9:09 PM
    How do I setup a set of mapped runs to not fail when a single on fails?
    k
    4 replies · 2 participants
  • b

    Blake List

    08/24/2021, 11:33 PM
    Hi there, I have a Redis kv store set up and running, and I would like to set and get values for timestamps using the conditional ifelse task. I am running into some troubles when trying to get the False case:
    import time
    import prefect
    from prefect import task, Flow
    from prefect.tasks.redis import RedisSet, RedisGet
    from prefect.tasks.control_flow import ifelse
    
    redis_get = RedisGet(...)
    redis_set = RedisSet(...)
    
    with Flow("flow_p") as flow:
        t1_get = redis_get(redis_key='t1_key')
    
        # true case
        t1_init = redis_set(redis_key='t1_key', redis_val="0000-00-00 00:00:00")
        # false case
        t1_set = redis_set(redis_key='t1_key', redis_val=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
        
        ifelse(case(t1_get, None), t1_init, t1_set)
    
    flow.run()
    Running once then doing
    RedisGet(...).run(redis_key='t1_key')
    gives
    b'0000-00-00 00:00:00'
    but running again gives the same value. Any ideas why I am not getting the False case on the second run?
    k
    13 replies · 2 participants
  • j

    Jacob Blanco

    08/25/2021, 8:12 AM
    1. Is there any way to setup Flow Concurrency control to simply skip scheduled runs if another flow is already running with the same labels? 2. Is it also the case that the Agent has to be tagged with the Flow Concurrency control label as well? So presumably if I have 20 different Flow Concurrency labels, all 20 need to be attached to the Agent?
    k
    2 replies · 2 participants
  • s

    st dabu

    08/25/2021, 8:55 AM
    I am doing this python prefect_pipeline.py (this creates a flow and registers() and runs() it. when I do this , i only see that the prefect task is only registered in the prefect UI but actually not running in the agent but just in my local machine from where I invoke the python prefect_pipeline.py code..
    ✅ 1
    b
    k
    7 replies · 3 participants
  • z

    Zac Chien

    08/25/2021, 9:49 AM
    Hi everyone, is there any suggestions to run task2 until task1 finished when they have no dependency?
    with Flow('feature-factory') as flow:
        a, b = pre_task()
        task1.map(a)
        task2.map(b)
    r
    k
    7 replies · 3 participants
  • e

    Eddie Atkinson

    08/25/2021, 10:01 AM
    Would anyone be able to share what
    ECSRun
    is doing under the hood to run tasks? I have set a subnet for the service I am running my Prefect tasks in and have disabled public IPs, but when I call
    ECSRun
    my tasks are running with a public IP address in a different subnet. From reading the documentation for
    ecs.run_task
    it seems as though you are required to specify a subnet ID if you are running a task on Fargate, which I am. Therefore, I’m wondering what subnet ID Prefect is using and whether it can forced to use the default subnet ID of the service. I am aware that you can override the arguments passed to
    run_task
    by
    ECSRun
    , however I don’t want to do this as I want to make my config agnostic to the account that it is being deployed in (I am deploying the same infra to different AWS accounts and don’t want to hard code the subnet ID which is passed to
    ECSRun
    )
    k
    28 replies · 2 participants
  • d

    Dotan Asselmann

    08/25/2021, 11:54 AM
    Hi! can I force prefect to re-run the entire flow when task failed? instead of continuing from the failing task on the retry?
    g
    k
    2 replies · 3 participants
  • b

    Bastian Röhrig

    08/25/2021, 12:09 PM
    Hey, quick question: Is there a way to access results as documented here https://docs.prefect.io/core/idioms/task-results.html when starting a flow with StartFlowRun?
    k
    4 replies · 2 participants
  • v

    Vincent Chéry

    08/25/2021, 12:14 PM
    Hi! I've been running a prefect server instance (version 0.14.11) in production for a few months, running a dozen flows, some of them every minute. The UI is getting extremely slow when clicking on a flow run or task run. Is this known to happen ? Should I delete old flow and task runs to limit the size of flow_runs and task_runs tables ? If yes, what's the best way to do it ? Thanks in advance !
    k
    10 replies · 2 participants
  • t

    Tense Fragile

    08/25/2021, 12:33 PM
    Hello! I have Linux server that runs the agent that schedules new prefect flows' runs. And I have a GitLab CD pipeline that connects to this server on push to master and pulls master on that server. On push to master I have the latest version of the code on the server but I want the agent to start using the latest code while it keeps scheduling jobs with old code. Is there any kinda autoreload feature for prefect agent? Maybe some of you faced with that problem so I'm curious about how you solved it?
    k
    1 reply · 2 participants
  • m

    Mehdi Nazari

    08/25/2021, 3:26 PM
    Hi folks, I have a Flow defined with a few tasks involved. Plan is to call a task from another task that’s called from a flow
    With
    context. Has anyone been able to successfully do that?
    k
    19 replies · 2 participants
  • s

    st dabu

    08/25/2021, 3:31 PM
    I run a prefect pipeline like this.
    prefect run -n hello-flow --param args={'a':'alpha','b':bravo'}
    Why cant i pass a dictionary to the param object instead of individual params
    Error: No such command 'args ..blah
    k
    7 replies · 2 participants
  • b

    Brett Naul

    08/25/2021, 5:07 PM
    if I return a lot of data serialized with GCS results, is it expected that the only thing dask will transfer is the result metadata as opposed to the data itself? say for example I do
    lots_of_dataframes_in_gcs = task(
        process_data,    
        result=GCSResult(
            PandasSerializer(file_type="parquet")
            )
        )
    ).map(input_files)
    task(notify_data_is_ready).set_dependencies(upstream_tasks=[lots_of_dataframes])
    I don't actually want the data passed to the downstream task, just to enforce the ordering. but based on my exploding daskworker memory it seems that the downstream task is actually collecting all of the outputs themselves(!), even though each transferred object shows up in the dask dashboard as a 48-byte
    Cached
    state object (so the memory is 99.999% untracked by dask) I can imagine a workaround that involves just skipping Results altogether and returning True or a file path explicitly, but it seems like this might be a bug and the
    state.result
    should be getting purged before we ship it around? @Chris White @Michael Adkins or anyone else have thoughts on whether this is expected?
    c
    9 replies · 2 participants
  • j

    Jean Da Rolt

    08/25/2021, 7:02 PM
    Folks, I need help with this warning message:
    Flow run RUNNING: terminal tasks are incomplete.
    
    Waiting for next available Task run at 2021-08-25T18:52:50.186382+00:0
    that happens apparently in the end of the flow, but then it restarts the flow and execute everything again...
    k
    4 replies · 2 participants
  • a

    Andre Muraro

    08/25/2021, 7:02 PM
    A couple of issues came up when I tried tor register a new flow: I develop my flows in a remote docker container, with local path /workspaces/prefect-dev/flows. When I try to register with working directory at prefect-dev using:
    prefect register --project "some-project" --path flows/some_flow.py
    my flow will get registered using the entire path(/workspaces....) . My agent searches for flows at ~/.prefect/flows, so I get a "Failed to load and execute Flow's environment" error. At the same time, even though I did not specify any labels, and there's no reference to any label whatsoever in any part of the source code, that flow gets created with one label that looks like an id of some sort (ie. 447c6079d9b5). My local agent is created using:
    prefect agent local start --name $$(hostname) --no-hostname-label
    Both of these issues came up with my latest flow, but I have used the same setup for a few months now without any of these issues. Any idea on what might be going on?
    👍 1
    k
    m
    40 replies · 3 participants
  • f

    Fina Silva-Santisteban

    08/25/2021, 11:26 PM
    Hi everyone, I’m trying to re-write a flow written in the functional api into imperative api but I’m having a few issues with it: Functional api flow:
    a_task = ATaskClass()
    
    with Flow("Foo") as flow:
        name = Parameter('name')
    
        a_task = a_task(name=name)
    Imperative api flow (so far):
    a_task = ATaskClass()
    name = Parameter('name')
    flow = Flow("Foo")
    
    flow.set_dependencies(
        task=a_task()
    )
    
    a_task.bind(name=name)
    The error complains about the line where I’m trying to bind the flow param to the task:
    raise ValueError(
    ValueError: Could not infer an active Flow context while creating edge to <Task: ATaskClass>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `ATaskClass(...).run(...)`
    What’s the right way to go about this? 🤔 Some background info: our flows are getting quite large so we’re looking for ways to make them easier work with and more reusable. We considered using the flow-of-flows strategy but we don’t need to run any single part as an independent flow so splitting things up into X flows just means slowing down our development cycle since we need to register/re-register all those additional flows, instead of registering/re-registering the one big one. The imperative api seemed like a better choice since the fact that it’s so explicit makes it actually easier to read, specially when the flow is large, and we keep the registration to just that one flow. Many thanks!
    c
    m
    6 replies · 3 participants
  • k

    Ken Nguyen

    08/26/2021, 1:46 AM
    Hey everyone, I'm currently learning my way around Prefect (and a lot of the other tools involved with Prefect) and tried to set up a simple hello task using GitHub storage. The task failed and resulted in this error in the log, could someone help me decipher it?
    Failed to load and execute Flow's environment: GithubException(404, {'data': 'Not Found'}, {'server': '<http://GitHub.com|GitHub.com>', 'date': 'Thu, 26 Aug 2021 01:20:16 GMT', 'content-type': 'text/plain; charset=utf-8', 'transfer-encoding': 'chunked', 'vary': 'X-PJAX, X-PJAX-Container, Accept-Encoding, Accept, X-Requested-With', 'permissions-policy': 'interest-cohort=()', 'cache-control': 'no-cache', 'set-cookie': 'logged_in=no; domain=.<http://github.com|github.com>; path=/; expires=Fri, 26 Aug 2022 01:20:16 GMT; secure; HttpOnly; SameSite=Lax', 'strict-transport-security': 'max-age=31536000; includeSubdomains; preload', 'x-frame-options': 'deny', 'x-content-type-options': 'nosniff', 'x-xss-protection': '0', 'referrer-policy': 'origin-when-cross-origin, strict-origin-when-cross-origin', 'expect-ct': 'max-age=2592000, report-uri="<https://api.github.com/_private/browser/errors>"', 'content-security-policy': "default-src 'none'; base-uri 'self'; connect-src 'self'; form-action 'self'; img-src 'self' data:; script-src 'self'; style-src 'unsafe-inline'", 'content-encoding': 'gzip', 'x-github-request-id': 'EA90:30AB:16BB88:2112BC:6126EC50'})
    k
    98 replies · 2 participants
  • a

    An Hoang

    08/26/2021, 11:39 AM
    good morning! What is the best way to parameterize Result location using Flow Parameter? Currently, I have, both at the flow and task level:
    tsv_result_partial = partial(LocalResult, dir="./test_prefect", serializer = PandasSerializer("csv",
                                                                             serialize_kwargs={"sep":"\t", "index": False},
                                                                             deserialize_kwargs={"sep": "\t"}))
    
    parquet_result_partial = partial(LocalResult, dir="./test_prefect", serializer = PandasSerializer("parquet"))
    
    @task_no_checkpoint(target=search_result_df_file_name, result = tsv_result_partial())
    def example_tsv_result_task:
    
    @task_no_checkpoint(target=another_file_name, result = parquet_result_partial())
    def example_parquet_result_task:
    
    with Flow("test", result=LocalResult("./test_prefect")) as flow:
        param1 = Parameter()
        param2 = Parameter()
    I want the outermost directory to be
    result_folder/param1/param2
    . How to achieve this?
    k
    6 replies · 2 participants
  • b

    Bruno Murino

    08/26/2021, 1:50 PM
    Hi everyone — I’m have a flow that I want to run just one, immediately after I deploy it — is there a way to specify that on a schedule? I’m thinking about a
    Schedule(clocks=[DatesClock([ pendulum.now() ])])
    but I think the lag between running this and registering/deploying the flow would make it so that it doesn’t run, and I’m hesitant about hardcoding an “add 10 minutes” or something to it as doesn’t feel very robust
    k
    2 replies · 2 participants
  • j

    Jocelyn Boullier

    08/26/2021, 2:03 PM
    Hi ! I'm using Prefect Cloud and all of my previous runs have disappeared. They don't appear in the main dashboard, in the flows tab, or in the runs section of each flow. One of the flow that just ran is correctly shown though.
    k
    n
    16 replies · 3 participants
  • z

    Zach Schumacher

    08/26/2021, 2:29 PM
    is there anyway to tie a readme to a flow while registering, or rn can you only do it via the UI?
    k
    m
    +2
    7 replies · 5 participants
  • i

    Italo Barros

    08/26/2021, 2:42 PM
    It's possible to create custom states? Also, it's possible to aggregate (or search) by an error string (or by state) instead of debugging the log of each run on the Cloud?
    k
    8 replies · 2 participants
  • e

    Evan Curtin

    08/26/2021, 3:34 PM
    Hey all, I feel like im missing something really simple
    @task(result=LocalResult(location="initial_data.prefect"))
    def root_task():
        return [1, 2, 3]
    
    
    @task(result=LocalResult(location="{date:%A}/{task_name}.prefect"))
    def downstream_task(x):
        return [i * 10 for i in x]
    
    
    def rating_model_backtest_workflow(opts: dict):
        with Flow("local-results") as flow:
            downstream_task(root_task)
    
        flow.run()
    I’m running this from the docs, I am expecting files to show up at
    ./initial_data.prefect
    and the other location, but nothing appears, am I missing something obvious?
    👀 1
    k
    8 replies · 2 participants
  • w

    wiretrack

    08/26/2021, 3:44 PM
    hey guys. trying to set the local configuration path, but is not being found. I’m doing as docs `export PREFECT__USER_CONFIG_PATH="path/to/config.toml"`I tried using both an absolute and a relative path and nothing. also tried settings the config (Slack webhook url) directly into the code, like this discussion and nothing, still getting the same
    ValueError
    where it can’t find Slack webhook URL
    k
    25 replies · 2 participants
  • h

    Horatiu Bota

    08/26/2021, 3:51 PM
    hi guys, thanks for the awesome tool! missing something really simple 😅 is there a way to access run parameters in the target template? i.e., how do i access the
    company_name
    run parameter when setting the target below (note, this doesn't work because of
    dict object has no attribute company_name
    ):
    @task(target="{parameters.company_name}/{today}-{task_name}", ...)
    i tried
    parameters.get("company_name")
    instead but still no luck. must be a really simple thing i'm missing (sorry for the spam)
    k
    6 replies · 2 participants
  • y

    YD

    08/26/2021, 4:32 PM
    Prefect team Thank you for this great product …
    :marvin: 4
    :thank-you: 4
    4️⃣ 1
    2️⃣ 1
    :prefect: 5
    k
    n
    +2
    4 replies · 5 participants
  • a

    Abhas P

    08/26/2021, 7:58 PM
    Hey guys, I am exploring prefect in order to implement the various parts of an ML application (with multiple models) as a flow. Here are my queries regarding prefect tasks : 1. Are prefect flows a wise choice for productionalizing online streaming style ML models ? If yes what's the recommendation on mirroring the business logic as flows - is a flow of mapped flows standard or is creating a master flow to orchestrate other flow runs wiser choice? 2. Implementing an online streaming ML model as a task - right now I've been able to load the model inside of a task for prediction, but the model loading takes most of the time of the task run. Does prefect have mechanisms for me to persist the ML model in memory?
    k
    2 replies · 2 participants
  • r

    Ryan Smith

    08/26/2021, 9:46 PM
    Hi all, has anyone else noticed that the prefect documentation website seems to become unresponsive after it's been open for several hours (or days?). What happens to me is that I can scroll and click links on the doc site, and the URL in the nav bar will change, but the page content will stay on the same content it was on previously when I left it and I'm unable to navigate away. To restore navigation features I need to refresh the page. Actually, I've also noticed the prefect UI interface also freeze up on me when it's left open for a long time as well (both cloud and self-hosted versions). More of a hard-lock there though where I can't seem to do anything. Curious if anyone else is experiencing one or both of these issues?
    n
    k
    +3
    10 replies · 6 participants
  • g

    Gabriel Montañola

    08/27/2021, 12:11 AM
    Hi there folks, how you doing? Github Actions deployment question I'm trying to improve our data team workflow with Prefect and our flows are using
    Github Storage
    . I'm facing a problem when "developing" the flows. Since we're using Github Actions, flows are registered to a
    development
    project when a Pull Request is opened (and updated). But
    Github Storage
    will use
    main/master
    as ref argument. This works fine for production, but won't do while I'm on a development branch. The workaround I've found is to use a pattern like this: • Github Actions Workflow with different steps for Push/Pull Request: ◦ Push to main: generate project folder and register the flow ▪︎ I use
    prefect register --project $project --path $path
    to do this ◦ Pull Request: register the flow with disabled schedule in
    development
    project ▪︎ I use the
    if __name__ = "__main__"
    pattern in order to pass a dynamic
    GITHUB_HEAD_REF
    to Github Storage ref argument. It's working, but I'm thinking if there is way just use the
    prefect cli
    for this and avoid writing boilerplate code. The thread has some code snippets and more details!
    w
    7 replies · 2 participants
  • w

    Wilson Bilkovich

    08/27/2021, 12:15 AM
    Hi folks. I’m trying to upgrade the Helm chart, to get it to re-create the default tenant for me. It tells me
    'postgresqlPassword' must not be empty
    even though I am passing it with
    --set
    on the
    helm upgrade
    command-line.
    n
    9 replies · 2 participants
Powered by Linen
Title
w

Wilson Bilkovich

08/27/2021, 12:15 AM
Hi folks. I’m trying to upgrade the Helm chart, to get it to re-create the default tenant for me. It tells me
'postgresqlPassword' must not be empty
even though I am passing it with
--set
on the
helm upgrade
command-line.
% CHART_NS=prefect
CHART_NAME=prefect-server-initial
helm upgrade \
--set ui.apolloApiUrl=http://${CHART_NAME}-apollo:4200/graphql \
--set agent.enabled=true \
--set jobs.createTenant.enabled=true \
--set postgresqlPassword=$POSTGRESQL_PASSWORD \
${CHART_NAME} \
prefecthq/prefect-server
Error: UPGRADE FAILED: template: prefect-server/charts/postgresql/templates/NOTES.txt:59:4: executing "prefect-server/charts/postgresql/templates/NOTES.txt" at <include "common.errors.upgrade.passwords.empty" (dict "validationErrors" (list $passwordValidationErrors) "context" $)>: error calling include: template: prefect-server/charts/postgresql/charts/common/templates/_errors.tpl:18:48: executing "common.errors.upgrade.passwords.empty" at <fail>: error calling fail: HELM_ERR_START
PASSWORDS ERROR: you must provide your current passwords when upgrade the release
    'postgresqlPassword' must not be empty, please add '--set postgresqlPassword=$POSTGRESQL_PASSWORD' to the command. To get the current value:

        export POSTGRESQL_PASSWORD=$(kubectl get secret --namespace prefect prefect-server-initial-postgresql -o jsonpath="{.data.postgresql-password}" | base64 --decode)
HELM_ERR_END
n

nicholas

08/27/2021, 12:20 AM
Hi @Wilson Bilkovich - this is after you added the password as an environment variable, right?
w

Wilson Bilkovich

08/27/2021, 12:51 AM
Yeah, I exported it like it suggested.
(I checked the env var afterward to make sure it got a reasonable-looking value)
n

nicholas

08/27/2021, 1:48 AM
Hm I think to set the password you might need to pass
postgresql.postgresqlPassword
instead of just
postgresqlPassword
after the export
w

Wilson Bilkovich

08/27/2021, 2:10 AM
I’ll try that, thanks!
n

nicholas

08/27/2021, 6:30 PM
Just to follow up - were you able to get the upgrade working?
w

Wilson Bilkovich

08/27/2021, 6:36 PM
I ended up nuking the install and doing it afresh before you pointed out the correct spelling. But, after looking at
values.yaml
, I am convinced it would work when spelled with the
postgresql.
prefix
View count: 1