https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • i

    itay livni

    05/24/2021, 11:54 PM
    Hi - I have a flow that runs under a minute and is part of another flow. I was thinking of wrapping the flow in an aws lambda to speed up the overall processing of the larger flow. Any thoughts?
    k
    • 2
    • 6
  • f

    Felipe Saldana

    05/25/2021, 1:47 AM
    Hello, I am hoping for some guidance here. After a mapped task runs in parallel I need to run a different task sequentially a dynamic number of times with Parameters. Is this possible? The below is in the flow context ... with Flow
    post_runner.set_upstream(all_pushes_mapped_results)
        post_runner.bind(mapped_run_name,
            mapped_gpudb_user,
            mapped_gpudb_pass,
            mapped_gpudb_host,
            mapped_collection_name)
    post_runner is a task itself so that I can actually get access to the parameters. Internally the runner task loops and creates a dynamic number tasks (if I do the same algorithm below directly in the flow context I dont have access to the parameters)
    class RenameTaskRunner(Task):
        def __init__(self, *args, **kwargs):
    
            super().__init__(*args, **kwargs)
    
            # other constructor logic
    
        def run(self, run_names, gpudb_user, gpudb_pass, gpudb_host, collections_list):
    
            <http://logger.info|logger.info>("Start Rename Runner")
    
            all_tasks = []
            iterations = len(run_names)
    
            all_tasks.append(RenameTask(name="push1_post"))
            all_tasks[0].bind(gpudb_user, gpudb_pass, gpudb_host[0], collections_list)
    
            for i in range(1, iterations):
                all_tasks.append(RenameTask(name=f"push{i + 1}_post"))
                all_tasks[i].set_upstream(all_tasks[i - 1])
                all_tasks[i].bind(gpudb_user, gpudb_pass, gpudb_host[i], collections_list)
    
            <http://logger.info|logger.info>("Finish Rename Runner")
    
            return all_tasks
    Below is the error. Is it possible to have RenameTaskRunner register the task with the given outer flow context? Can I pass a reference in to the constructor or some other idea?
    [2021-05-25 01:31:50+0000] ERROR - prefect.TaskRunner | Unexpected error: ValueError("Could not infer an active Flow context while creating edge to <Task: push1_post>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `RenameTask(...).run(...)`")
    Traceback (most recent call last):
    k
    • 2
    • 33
  • s

    Sam

    05/25/2021, 5:33 AM
    Hello everyone. Prefect looks wonderful, but I am unable to get past the “https://cloud.prefect.io/welcome/name-team” screen on first login. Support appears to be offline, so I was wondering if anyone in the community had an idea for bypassing this screen?
    ✅ 1
    n
    • 2
    • 2
  • j

    juumel_team

    05/25/2021, 7:36 AM
    Hi everyone. We are a small data company based in Europe. We use prefect since months without going on cloud. We built all our automation flows on prefect and it's an amazing tool, thanks a lot for your work. We decided to move some flows on the prefect cloud backend and we were surprised to see the total duration time increase by a factor of two compared to our local prefect backend server. The flow on which we ran the tests is one of the longest in our stack (191 tasks, 26 parameters). Keeping the same agent, executor, computer, and parameters. We move from 65s to 135s. Each task takes on average +100% time increase (even the parameters collection). So, we understand that there is some time cost linked to the exchange of state information and metadata but could you, dear fellow community, please help us with the following questions? 1. Is the time gap between runs on the prefect cloud backend and the prefect server backend proportional: The longer the flow, the longer the time gap? 2. Are there any good practices to reduce this time gap? 3. Upgrading to the standard or enterprise offer will that make things faster? Thanks a lot :) Extra information: Our internet connection at the office 127 mbps/s download, 7.8 mbps/s upload.
    k
    • 2
    • 14
  • f

    Fabrice Toussaint

    05/25/2021, 12:19 PM
    Hi, is there a way to safely delete Prefect logs from Postgres without messing up scheduled flows? The tables log, task_run and task_run_state can grow rapidly in size, so I want to delete them every X days, but only for the ones that are ran and not scheduled.
    k
    • 2
    • 19
  • m

    Marc Lipoff

    05/25/2021, 2:41 PM
    Are there are examples of running the
    prefect build/register
    commands in CircleCI, specifically done in parallel? I have dozens for flows to register via CircleCI, and I'd like to do then in parallel to minimize the time.
    k
    • 2
    • 2
  • p

    Pedro Henrique

    05/25/2021, 3:19 PM
    Hi everyone, I'm trying to configure user access to the prefect, is it possible to configure using prefect outside the cloud?
    k
    • 2
    • 4
  • p

    Pedro Henrique

    05/25/2021, 3:20 PM
    when I open the home page the members button is not enabled
  • c

    Chris L.

    05/25/2021, 5:10 PM
    Hi Prefect community! Does anybody have any examples of
    big_future = client.scatter(big_dask_dataframe)
    and passing
    future = client.submit(func, big_future)
    as an output from one task to be used as an input in another task? I found this UserWarning at the bottom of the "prefect-etl" article in the dask docs (https://examples.dask.org/applications/prefect-etl.html) as well. Was wondering if anybody has encountered this issue as well? And whether there's a solution to this. Thank you in advance!
    m
    • 2
    • 4
  • j

    Joseph Hughes

    05/25/2021, 5:41 PM
    Greetings! Currently Learning about Prefect at Microsoft Build! Excited to be here!
    :marvin: 1
    👋 3
    m
    k
    • 3
    • 3
  • c

    Chris DeNardo

    05/25/2021, 6:03 PM
    Hi Prefect community! Just found out about you via MS Build!
    🚀 4
    👋 4
    z
    k
    • 3
    • 2
  • h

    Hemanand Ramasamy

    05/25/2021, 6:09 PM
    Here from MachDatum from MSBuild
    🎉 3
    👋 3
    k
    • 2
    • 1
  • t

    Tim Enders

    05/25/2021, 7:34 PM
    Does anyone have, or know of, a GitLab CI/CD image that is setup for prefect?
    k
    • 2
    • 3
  • s

    Saksham Dixit

    05/25/2021, 7:35 PM
    Hi Guys, I am trying to use Github Storage, but run into the following error:
    Failed to load and execute Flow's environment: BadCredentialsException(401
    Any advice how to proceed to solve this error?
    k
    • 2
    • 20
  • s

    Sam Cox

    05/25/2021, 8:40 PM
    Hi, right now I am storing individual flows in gitlab, and occasionally I get the error:
    Failed to load and execute Flow's environment
    followed by a GitLab MaxRetryError, probably because we are reading from gitlab too many times in a given time period. Using gitlab as storage is important for some of our current projects, but is there a way to retry a flow when the Flow's environment fails to load?
    k
    • 2
    • 4
  • p

    Pedro Henrique

    05/25/2021, 8:59 PM
    someone, know why when i tried to run "prefect register --project "my proj" -p /home/user/app_flows/” in a .sh file (to auto register my flows when my VM turns up) don’t work? But running the same line in the terminal works... The triggered error is prefect.utilities.exceptions.AuthorizationError: [{'path': ['project'], 'message': 'AuthenticationError: Forbidden', 'extensions': {'code': 'UNAUTHENTICATED'}}]
    k
    m
    • 3
    • 2
  • a

    Alex Furrier

    05/25/2021, 9:20 PM
    I'm running into some issues with the built in Snowflake task. Is there documentation on how to use it? Specifically I'm running into the following issue: I'm using the task SnowflakeQuery to execute a query. I'm doing that like so:
    from prefect.tasks.snowflake.snowflake import SnowflakeQuery
    
    sf_task = SnowflakeQuery(query=SNOWFLAKE_QUERY,
                   user=SNOWFLAKE_USER,
                   password=SNOWFLAKE_PASSWORD,
                   account=SNOWFLAKE_ACCOUNT,
                   warehouse=SNOWFLAKE_ACCOUNT,
                   database=SNOWFLAKE_DATABASE,
                   role=SNOWFLAKE_ROLE,
                   schema=SNOWFLAKE_SCHEMA)
    ret = sf_task.run()
    Going off the source code I would expect that to return the query data in an iterable. This based off the code within
    SnowflakeQuery
    that creates a cursor, connects, executes the query and then returns the
    .fetchall()
    method of an executed Snowflake cursor (basing my understanding of that from this) What is actually returned by
    sf_task.run()
    is the Snowflake cursor which appears to have already executed the query (it's of type
    snowflake.connector.cursor.SnowflakeCursor
    with a closed connection state). However, there is metadata in
    ret.description
    which is a list of tuples. I've tried this with a few different queries that should return data and it's the same result. Any idea what's going on? I may be doing something obviously wrong but not seeing it. I'm able to get the data using the Snowflake connector and executing myself:
    import snowflake.connector
    import pandas as pd
    
    #create connection
    conn=snowflake.connector.connect(
                   user=SNOWFLAKE_USER,
                   password=SNOWFLAKE_PASSWORD,
                   account=SNOWFLAKE_ACCOUNT,
                   warehouse=SNOWFLAKE_ACCOUNT,
                   database=SNOWFLAKE_DATABASE,
                   role=SNOWFLAKE_ROLE,
                   schema=SNOWFLAKE_SCHEMA)
    
                    
    #create cursor
    curs=conn.cursor()
    
    #execute SQL statement
    cur = curs.execute(SNOWFLAKE_QUERY)
    
    #load it to df
    df = pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description])
    df
    k
    • 2
    • 2
  • k

    kevin

    05/25/2021, 10:18 PM
    Can the interactive GQL API query flow-runs that are older than two weeks old?
    k
    • 2
    • 5
  • f

    Felipe Saldana

    05/26/2021, 1:09 AM
    I have created a single flow that I want to run against different environments at the same time. For example I have dev, preprod, and prod environments. Based on the given environment I can pass in the the given parameters that have the correct values(usernames, password, etc.). How do I do this with only one flow ... how do I register that flow with different parameters? Is this possible.
    k
    m
    • 3
    • 10
  • n

    Noah Holm

    05/26/2021, 7:55 AM
    How do I remove AgentConfigs created in the Automation section in Cloud? I have a hard time figuring out which of my configs correspond to which config ID, because there are extra configs created called “Unnamed config” which gets selected after saving.
    k
    • 2
    • 9
  • a

    Alex Souvannakhot

    05/26/2021, 8:33 AM
    Hello all!  I am currently investigating whether Prefect would fit our use case. We have many machine learning tasks that are chained together in various flows. The flows are triggered when a user uploads a file on the web client and run in the background. We are considering using Prefect to schedule and manage these flows. Currently, we have many workers for each of the machine learning tasks. There is a service that schedules the jobs and processes the outputs of the jobs via a message queue. However, there is no definition of a flow and the scheduling logic is scattered throughout the code for this service. This is where we think Prefect would be useful for us. From what I gathered, Prefect is mainly used for ETL workflows. In our case, we want to use Prefect to serve results from our machine learning models. Would Prefect be suitable for our use case? I am quite new to the MLOps field so any help or suggestions would be welcome!
    e
    • 2
    • 2
  • b

    Bruno Murino

    05/26/2021, 11:59 AM
    Hi everyone — I’m trying to use the prefect S3List task but I’m struggling with the AWS authentication, since it appears the S3List task doesn’t accept boto_kwargs nor a boto_session, and I need to specify different profiles for my task. Has anyone faced this problem before?
    k
    • 2
    • 4
  • d

    Dotan Asselmann

    05/26/2021, 12:20 PM
    hey everyone. i’m suddenly starting to get this error on my flows:
    AttributeError: 'Flow' object has no attribute 'terminal_state_handler'
    any idea what could be the reason? i’m running on prefect self hosted server
    k
    • 2
    • 2
  • b

    Bruno Murino

    05/26/2021, 1:48 PM
    Hi everyone — I have a task that is mapped over a list, and I have a subsequent task that must only run after all the branches of the previous task map are done, however the tasks are not data dependant. I’m trying to specify “upstream_tasks” but it’s giving me a weird error as it appears it is trying to run the task?
    k
    • 2
    • 3
  • b

    Bruno Murino

    05/26/2021, 2:26 PM
    Is there any way to get the full traceback with a state handler?
    k
    • 2
    • 1
  • m

    Mark McDonald

    05/26/2021, 2:27 PM
    Hi - we have a flow that occasionally gets stuck in a running state until we manually cancel it. We tried using the automations feature, but we encountered this situation last night and it didn't work
    k
    m
    • 3
    • 26
  • k

    Kevin Kho

    05/26/2021, 2:33 PM
    Join my webinar today with LabelStudio!
    :upvote: 1
    👀 1
  • r

    Robert Bastian

    05/26/2021, 2:41 PM
    Hello Prefect! Can somebody provide some guidance on the best way to test failure paths in Prefect flows? Can I inject state into a flow during setup in someway to ensure the failure behavior is working as expected? Thanks in advance!
    k
    • 2
    • 2
  • l

    Lukáš Polák

    05/26/2021, 2:54 PM
    hi guys. I have a question regarding management of Prefect flow schedulers via Prefect API. We decided to use Prefect to handle uploading user data into our db. we support multiple different ways to upload the data (each of them is a separate Prefect Flow). In our system, we allow user to schedule a periodic job that downloads user data from external source and uploads them into our database. User should be able to specify different run periodicity for different datasets (and modify this scheduling interval). Since schedulers are stored inside a JSONB column attached to flow group without a specific schedule_id, it is very hard to update a specific schedule (unless you use one Flow parameter as a unique identifier - this feels a bit hacky). Also removing a schedule from the flow group is slightly uncomfortable (Prefect UI simply sends the whole schedule array without the deleted schedule back to graphql). At the moment, we are exploring the possibility of registering Prefect flows per dataset (using dataset ID in flow name) so we can reliably control the scheduler. This should give us better control over the scheduler, but we also get more granular view of the flows (which we dont really need or want). Is there a better way of approaching this particular problem? Any suggestions are welcomed.
    k
    • 2
    • 6
  • b

    Bruno Murino

    05/26/2021, 4:02 PM
    Hi everyone — I’m trying add stuff to the prefect.context inside a task, to be used in other tasks, but it seems like the context is reset between tasks?
    m
    • 2
    • 1
Powered by Linen
Title
b

Bruno Murino

05/26/2021, 4:02 PM
Hi everyone — I’m trying add stuff to the prefect.context inside a task, to be used in other tasks, but it seems like the context is reset between tasks?
m

Michael Adkins

05/26/2021, 4:05 PM
Hey @Bruno Murino -- we actually don't intend for the context to be used like this. We'd recommend you make the tasks dependent on each other and pass data explicitly or very soon we are releasing a key-value store for Cloud that would accomplish this.
View count: 2