https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • z

    Zach Schumacher

    04/26/2021, 6:16 PM
    Trying to do something SUPER basic, not sure why its not working though (I’d imagine I’m missing something very obvious). • I have the k8s agent deployed and querying for flow runs • I have my flow registered with github storage and can see it in the UI When I try to run it, however, it just stays in the scheduled state forever. What am I doing wrong? (Flow below). I realize I’m not specifying a docker image, but figured since its a kwarg there was a default it would fall back on. Could be what I’m missing?
    from prefect import task, Flow
    from prefect.run_configs import KubernetesRun
    from prefect.storage import GitHub
    
    
    @task()
    def print_hello(name: str):
        print(f"hello {name}!")
    
    
    with Flow(
            "Example", 
            storage=GitHub(
                repo="Simplebet/sbprefect", 
                path="sbprefect/flows/example.py", 
                access_token_secret="GITHUB_ACCESS_TOKEN")
    ) as flow:
        example_task = print_hello("world")
        flow.run_config = KubernetesRun()
    k
    12 replies · 2 participants
  • a

    Andy

    04/26/2021, 8:29 PM
    prefect server start
    throws an error for me. I think it’s because docker is on root. However, I can’t run
    sudo !!
    — I get
    sudo: prefect: command not found
    . Is there a way to resolve this issue?
    m
    5 replies · 2 participants
  • e

    eli

    04/26/2021, 9:02 PM
    Has anyone had trouble with required parameters and the
    StartFlowRun
    Task? I've been trying to run a flow of flows, but get validation errors despite passing the required parameters to the flow a mocked up example
    param_a = Parameter(name="param_a")
    param_b = Parameter(name="param_b", required=True)
    
    @task(log_stdout=True)
    def print_param(param):
       print(param)
    
    # start flow run task instance
    start_flow_run = StartFlowRun(project_name="required_test", wait=True)
    
    with Flow("flow1") as f1:
      print_param(param_a)
    
    with Flow("flow2") as f2:
      print_param(param_a)
      print_param(param_b) # the required one
    
    
    with Flow("main") as f:
       # this works, ignoring the extra unused param
       start_flow_run(flow_name="flow1", parameters={"param_a": "a", "param_b": "b"}) 
       # this fails raising a validation error in the logs
       start_flow_run(flow_name="flow2", parameters={"param_a": "a", "param_b": "b"})
    The error:
    prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': "Required parameters were not supplied: {'param_b'}", 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    k
    m
    7 replies · 3 participants
  • r

    Rajdeep Rao

    04/26/2021, 10:08 PM
    Hey y'all! I had a question regarding IAM permissions on the AWS task that the prefect agent creates to run a flow. What's the best way to add a task-role and a sec group on it? I tried using https://docs.prefect.io/orchestration/flow_config/run_configs.html#ecsrun but to no avail. I see the values on prefect cloud's UI under advanced settings, but I don't see anything on the actual Task Def that gets created. Is this the correct way to accomplish something like this? Or am I on the wrong track here?
    k
    13 replies · 2 participants
  • v

    Vladislav Bogucharov

    04/26/2021, 10:51 PM
    Does anyone know how to display information about another broken task or the error that caused the fail in a task? I have very simple etl flow with one alert task. This alert task is executed if any of the etl tasks fail. I want to use this alert task to send a further telegram notification and I want to include meaningful information in this notification than just the "Error" message. Maybe I should use a completely different approach? I tried to get information through context, but this information is not what I need at the moment.
    from prefect import task, Flow
    from prefect.triggers import any_failed
    from random import randint
    
    @task()
    def extract():
        # this task generates random integers from 0,1,2,3,4,5.
        random_number = randint(0, 5)
        return random_number
    
    
    @task()
    def transform(random_number):
        # this task is potentially dangerous, since the random number can be zero, and then there will be division by zero
        data = 100 / random_number
        return data
    
    
    @task()
    def load(data):
        # just print data
        print(data)
    
    
    @task(trigger=any_failed)
    def telegram_alarm():
        # this task is needed to send notifications about fails in telegram.
        print('ERROR')  # How can I display a more informative message? For example, which task broke? Or what error caused the fail? (in my case ZeroDivisionError: division by zero)
    
        # import prefect
        # print(prefect.context.to_dict())  # tried this method but didn't find the needed logs
    
    
    with Flow('test_etl') as flow:
        random_number = extract()
        data = transform(random_number)
        load(data)
        telegram_alarm(upstream_tasks=[flow.get_tasks()])
    
    flow_state = flow.run()
    k
    4 replies · 2 participants
  • z

    Zach Schumacher

    04/27/2021, 1:19 AM
    what should the entrypoint be for custom docker images in a
    KubernetesRun
    ?
    k
    30 replies · 2 participants
  • l

    Louis Burtz

    04/27/2021, 5:20 AM
    Hello Prefect community! I'm starting out with Prefect, I have many similar tasks that been a pleasure to automate, -> I attach a sample code snippet of my flow in the thread 🧵 to avoid spamming here. My questions are: 1. I couldn't figure out if instead of the three for loops I could/should be using map/iterated mapping (I've made .map() work well with the equivalent of 1 for loop but not the three nested ones to go over all my parameters) 2. I couldn't figure out how to execute 'depth first' ( = execute the task_inference task after each long task_training is complete). I've read in the prefect docs that Depth First Execution is with Dask executors. In my case I don't want parallel execution since each task takes the entire available GPU memory of my local machine i'm running these on. Many thanks for any pointers and corrections of my misunderstandings
    k
    26 replies · 2 participants
  • t

    Thomas Hoeck

    04/27/2021, 10:15 AM
    Hi, I'm working with a larger than memory dataset. The operations needed can be easily parallelized and be split into chunks and I have done that with
    .map()
    . The problem is that when I run it the results are all still stored in memory (
    bigger_than_mem
    runs twice and keep resuslt in memory). Is it possible to have the flow the the data as a file, and clear the memory? Here is a sample flow
    @task
    def get_chunks():
    return [[1,2],[3,4,5]]
    @task
    def bigger_than_mem(x):
    return x*100000000
    @task
    def dump_to_db(x)
    dumb(x)
    with Flow("my_flow") as flow:
    x = get_chunks()
    x_trans = bigger_than_mem.map(x)
    dump_to_db.map(x_trans)
    k
    j
    5 replies · 3 participants
  • v

    Vladislav Bogucharov

    04/27/2021, 1:10 PM
    As far as I found out,
    state.TriggerFailed
    is a child of
    state.Failed
    . When one of my tasks fails, then I get the
    state.Failed
    from this task and then from the next task, which actually didn't break, I get
    state.TriggerFailed
    with message _TRIGGERFAIL('Trigger was "all_successful" but some of the upstream tasks failed.',)._ I would like to receive information only about the failed task. At the moment I have implemented the following logic, is this the best practice?
    if isinstance (new_state, state.Failed) and not isinstance (new_state, state.TriggerFailed):
    ➕ 1
    z
    j
    7 replies · 3 participants
  • c

    ciaran

    04/27/2021, 1:43 PM
    I feel like we need a #kudos channel but anyway: Big #kudos to @Tyler Wanner and @Kevin Kho for putting up with all my constant questions and queries and being really quick to help out 💯 🥇
    :marvin: 7
    👍 10
    🚀 4
    👍🏼 1
    k
    1 reply · 2 participants
  • t

    Talha

    04/27/2021, 3:16 PM
    can I assign task to two different agents.
  • t

    Talha

    04/27/2021, 3:17 PM
    I mean, One task1 is running on local agent and task2 is running on the docker agent
  • t

    Talha

    04/27/2021, 3:17 PM
    and put some conditions if the any task fails, the process fail ??
    j
    3 replies · 2 participants
  • c

    Chris Marchetti [Datateer]

    04/27/2021, 3:29 PM
    Hey all! I am having trouble understanding the change from personal access tokens to api keys. What I understand is that they act in the same way. We have multiple projects that are using prefect. We are using it to run our ELT pipeline with docker, meltano, redshift, and dbt. The proper way to use them is to create a service account for each of the projects that will handle the authentication and store the keys in env variables we call 
    PREFECT__CLOUD__AUTH_TOKEN
    . Can we just replace references to the personal access tokens (
    prefect_auth_token
    ) to the new name
    PREFECT__CLOUD__AUTH_TOKEN
    ? Any guidance would be appreciated. Thanks!
    k
    t
    +1
    23 replies · 4 participants
  • f

    flavienbwk

    04/27/2021, 4:39 PM
    Hi, I'm trying to use the Docker storage method. The image gets correctly saved in my registry but I get the following error in my agent :
    docker.errors.APIError: 400 Client Error for <http+docker://localhost/v1.40/containers/create>: Bad Request ("invalid IP address in add-host: """)
    Could anyone help ? Thanks
    k
    m
    16 replies · 3 participants
  • l

    Linnea Sahlberg

    04/27/2021, 7:15 PM
    Hi! I’m trying to trigger a flow from another flow using
    StartFlowRun
    and running into an issue where the flow cannot be found, even though that flow exists. I’ve attached the stack trace of the error as well as a screenshot of our flows, to show that we do have one named
    final_stats-staging
    . Any help here would be great- thanks!
    j
    6 replies · 2 participants
  • s

    Sébastien Arnaud

    04/27/2021, 7:29 PM
    Hi, Any other users of MLFlow? I am trying load a model from S3 using MLFlow in a tasks, but it does not seem to be picking up the AWS credentials (passed through Prefect using PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS env var). Thanks!
    k
    12 replies · 2 participants
  • n

    Nathan Atkins

    04/27/2021, 9:04 PM
    Can anyone point me to a way to pass a priority to a Dask task from map()? I can work with one priority for all the tasks being mapped, but it would be really cool if I could assign a priority to each of the mapped values.
    k
    2 replies · 2 participants
  • b

    Braun Reyes

    04/27/2021, 9:56 PM
    Does anyone know if there is a way to query cloud api for late flow runs?
    j
    5 replies · 2 participants
  • j

    Joseph Loss

    04/27/2021, 11:34 PM
    We have this massive script that is super important to our day to day operations, and I'm looking to convert this into a prefect-cloud oriented setup so that I can connect it with a slack webhook and also have access to logs anywhere/everywhere. But I'm having trouble setting this up correctly, for a few reasons: 1. These tasks in the graph are inter-twined, meaning that a lot of these tasks are called within a "task" itself. Example:
    def fnRunTask(parameter1,  parameter2, parameter3 .... parameter10):
        df1, df2 = fnRunSubTask(parameter3, parameter5)
        dfTmp = fnRunSubTask2(df1, parameter1, parameter7, parameter9)
    2. As shown above, each subfunction requires different parameters, and the number of parameters for each function can vary. Essentially this is all headache from converting a bunch of stuff that was defined under "if name == _main_:" into a prefect setup. I've played around with passing in one large dictionary at the start, but wanted to ask here before I continued debugging that (didn't work on tries #1-20 lol) Also wanted to ask about the task interconnectivity, as I was a bit confused about upstream/downstream tasks since some of them are "at the same time" within the task that calls them.
    k
    7 replies · 2 participants
  • r

    Ranu Goldan

    04/28/2021, 3:11 AM
    Hi everyone, Does Labels could be utilized as the agent pointing only? can we use it as general usage of label? Because I tried labelling my flow as ie: label1, label2, label3 And my agent was: label1 The flow won't run unless I remove label2 and label3
    c
    2 replies · 2 participants
  • j

    Jacob Blanco

    04/28/2021, 5:59 AM
    Regarding Approval steps in Flows, are there any restrictions on which type of account can Approve the continuation of the flow? Do you need to be a User?
    j
    1 reply · 2 participants
  • j

    Joseph Loss

    04/28/2021, 6:31 AM
    Also I'm banging my head against the wall
    # fnPortfolioOptimization()
    from prefect import Client
    client = Client(api_token = %TOKEN%)
    client.login_to_tenant(tenant_slug='pwcm')
    
    with Flow('test2') as flow:
        lookback_periods = Parameter("lookback_periods", default=[3, 5, 7])
        percent_change = Parameter("percent_change", default = [.05, 0.07, 0.1])
        lookback_days_rsi = Parameter("lookback_days_rsi", default = 20)
        rsi_overbought = Parameter("rsi_overbought", default=0.75)
    
        fnRunAlgo(lookback_periods, percent_change, lookback_days_rsi, rsi_overbought)
    prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured. I've tried a custom service-account token, as well as my user api key, neither work when calling flow.register(project_name) or client.register(flow, project_name)
    k
    6 replies · 2 participants
  • k

    Kayvan Shah

    04/28/2021, 6:42 AM
    Does multiprocessing library work with Prefect flows?
    ✅ 1
    k
    1 reply · 2 participants
  • j

    Jeremy Tee

    04/28/2021, 7:55 AM
    Hi everybody, when i tried to rerun my failed mapped task from prefect cloud by clicking on the restart button, it prompts me this error
    Unexpected error: TypeError("object of type 'NoneType' has no len()")
    . However, when i rerun a fresh one, it works! Is my way of caching wrong? prefect = 0.14.15
    k
    j
    22 replies · 3 participants
  • b

    Bruno Roque

    04/28/2021, 8:28 AM
    Is there a recommended way of exporting
    flow_run
    data (or any other entity) into a data-warehouse? We had a poor man's workflow engine that we are replacing by prefect. But we need to get the
    flow_run
    data to our data-warehouse, in order to then use the reporting tools. We were thinking of doing some sort of ETL that queries Prefect's GraphQL API, and loads it into our data warehouse. But we were wondering if there is a recommended way of achieving this? GraphQL's subscriptions do no work yet, right?
    ✅ 1
    k
    2 replies · 2 participants
  • v

    Vladislav Bogucharov

    04/28/2021, 8:45 AM
    Hello everyone! Unfortunately, I did not find it in the documentation, is it possible to implement authentication in the Prefect Server UI? Even the simplest one. I understand that this can be done in the Prefect Cloud, but what about the Server version?
    ✅ 1
    k
    1 reply · 2 participants
  • r

    Romain

    04/28/2021, 10:43 AM
    Hey Folks, I'm running into this exception `ValueError: Cannot set
    mapped=True
    when running from inside a mapped context`` As a dummy example, I'm trying to do something like this:
    from prefect.tasks.core.function import FunctionTask
    from prefect import Flow, apply_map
    
    
    def task_block(z):
        R = FunctionTask(lambda x: range(x))(z)
        M = FunctionTask(lambda x: x + 1).map(R)
        return M
        
    with Flow('nested_map') as flow:
        L = [1, 2, 3]
        apply_map(task_block, L)
    is there a way to do this kind of nested mapping differently?
    ✅ 1
    e
    k
    8 replies · 3 participants
  • g

    g.suijker

    04/28/2021, 11:28 AM
    Hi all, unfortunately I couldn't join the Livestream - Data science on Prefect last Friday. Is there a recording somewhere that I can watch?
    ✅ 1
    c
    2 replies · 2 participants
  • r

    Robert Bastian

    04/28/2021, 1:23 PM
    I have a task that is using the trigger “any_failed” as a way to “clean up” if any upstream tasks fail. My expectation is that this task would be “skipped” if the upstream tasks succeed. In actuality the task is marked as failed and my flow failed. I got around this by setting a reference task, but it seems strange to mark a task as having failed when it didn’t run. Is there a way to change this to skip? Thx!
    k
    1 reply · 2 participants
Powered by Linen
Title
r

Robert Bastian

04/28/2021, 1:23 PM
I have a task that is using the trigger “any_failed” as a way to “clean up” if any upstream tasks fail. My expectation is that this task would be “skipped” if the upstream tasks succeed. In actuality the task is marked as failed and my flow failed. I got around this by setting a reference task, but it seems strange to mark a task as having failed when it didn’t run. Is there a way to change this to skip? Thx!
k

Kevin Kho

04/28/2021, 2:09 PM
Hi @Robert Bastian! I tested this and this seems like the right approach where you set the reference task. State handler would be the way to change state from
TriggerFailed
to
SKIP
👀 1
View count: 1