https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Samuel Hinton

    02/16/2021, 6:48 PM
    Gday all! Does prefect server currently support flow concurrency? Heres the scenario - we have a flow that polls for data every 10 minutes and stores it. Works great. However, we want to backfill the data, and need one flow run executed per day back a year (365 flow runs). Now, if I simply use the prefect client to create_flow_run 365 times, the agent schedules them all at once, the API times out, and everything crashes (see fun image below). On top of this, it would be ideal if I could have a priority of flows to be run if theres concurrency so that the regular polling doesnt get stuck in an execution queue (not an issue if we have concurrency available via agents, I can have a backfill agent and a real time agent, but unsure if theres support like that). I can see task concurrency in the doco, but have missed flow concurrency itself. Is this supported?
    s
    a
    +3
    • 6
    • 16
  • p

    Peyton Runyan

    02/16/2021, 8:21 PM
    Is there a good way to set a predefined prefect task to have an upstream dependency? I would like the sql_task to wait on the password, but that's got the graph that I'm getting.
    sql_task = SqlServerFetch(
            db_name=prefect.config.sql_server.database,
            user=prefect.config.sql_server.user,
            host=prefect.config.sql_server.server,
            query=get_manual_override_rows,
            fetch='many',
            fetch_count=3,
            result=result_formatter,
            name="Database Query"
            # commit: bool = False,
    )
    
    with Flow("config_flow") as f:
    
        mode = Parameter("Mode", default="dev")
    
        password_var  = set_creds(mode)
        password_task = EnvVarSecret()
        password = password_task(password_var)
        s = sql_task(password=password)
    
        v = view_sql(sql_task)
    j
    • 2
    • 3
  • s

    Suchindra

    02/16/2021, 9:16 PM
    Hi Everyone, I am prototyping an ETL flow where I want to read a bunch of files, do some transformation, and write the results in parallel using a
    LocalDaskExecutor
    My flow looks something like this:
    list_of_files = Parameter()
    list_of_frames = extract_fn.map(list_of_files) # This is indeed getting executed in parallel
    How can I pass both the result of the previous mapped fn and also the input parameters to my transform fn? I understand that I could probably refactor the transform_fn to avoid using the
    list_of_files
    but wanted to know if there is any other way of handling this.
    transform_fn.map(list_of_frames, list_of_files)
    As you can tell I am very new to prefect. Thanks a lot!
    j
    • 2
    • 8
  • p

    Peyton Runyan

    02/16/2021, 10:58 PM
    Is there a way to define a function at runtime? I have a sql task that I would like to pass a username and password at runtime, and I'm struggling pretty hard to get it moving. More in the thread
    a
    c
    • 3
    • 9
  • s

    Suchindra

    02/16/2021, 11:24 PM
    I have a bunch of code which uses
    logbook
    instead of
    logging
    from stdlib. How can I configure it to capture logs from within prefect tasks?
  • i

    itay livni

    02/16/2021, 11:30 PM
    Hi - From a pure speed perspective is there any difference between running a flow recursively or running
    StartFlowRun
    in a task with a loop signal? My concern is that with
    StartFlowRun
    the enviroment (including modules) will be "re-initialized" on every loop. But maybe that will happen with recursion also? -- In short is the effort of trying to build a recursive Flow worth the effort from a speed perspective? (Currently running into different issues). Thanks
    c
    • 2
    • 1
  • m

    matta

    02/17/2021, 4:11 AM
    Is there a way to force "depth-first execution"? Like, "only process 4 chunks at a time" or something like that? I have a Flow where I'm downloading a bunch of files, uploading them them to a Snowflake stage, and then deleting the folders. Right now it tries to download everything and I think it's blowing up the container's storage. Is there a way to make it go "down" before going out (thus clearing disk space), or maybe just be generally smarter about disk space? Thanks!
    c
    • 2
    • 4
  • s

    Sven Teresniak

    02/17/2021, 9:40 AM
    hmmm. when you are creating a project and the project already exists, prefect cli will just print a message and return without error. so no need for
    --skip-if-exists
    because this is the default behaviour. but when you're creating a tenant a 2nd time (same name and slug) you will get an exception and return value 1 from the prefect cli. it seems
    create-tenant
    needs a
    --skip-if-exists
    and
    create project
    not
    a
    • 2
    • 2
  • h

    Harshal Rane

    02/17/2021, 9:51 AM
    logs
  • h

    Harshal Rane

    02/17/2021, 9:51 AM
    Hi everyone, i am trying out hybrid model approach of prefect, where i am using prefect.io cloud and i have created PAT token and using that generated a Runner token for Docker agent which is on the virtual machine in cloud. when i am scheduling my flow, the docker agent accepts job it pulls the agent image but flow fails with this error
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/root/'")
    Attached are logs from Docker agent Has anyone faced this issue before?
    j
    • 2
    • 6
  • m

    Michael Hadorn

    02/17/2021, 10:04 AM
    Hi there Is there a way to use dynamic flows with the Prefect Server (Cloud or on-premise)? This register stuff, is not useful sometimes. With dynamic flow i mean: the tasks will be generated at runtime. Not possible with mapping, because they have dependencies. I already asked here, but nobody answered about this: https://prefect-community.slack.com/archives/C014Z8DPDSR/p1612986542265800 Thanks a lot for every help!
    a
    a
    • 3
    • 12
  • j

    Jonathan Wright

    02/17/2021, 10:51 AM
    Hi. I am interested in how Prefect approaches the concept of nested or sub flows. That is running a flow inside another. I’ve not found anything in the docs about this? Or is this a design pattern that Prefect discourages? Thank you.
    a
    • 2
    • 2
  • m

    Milly gupta

    02/17/2021, 1:34 PM
    Hi, I am looking to deploy Prefect agent either on Kubernetes or Service Fabric? Does anyone know what are the resource requirements for Prefect Agent?
    j
    • 2
    • 11
  • j

    Josselin Girault

    02/17/2021, 2:37 PM
    Hi. I'm curious about how Dask and Prefect behave together. If I have a dask.dataframe as the input of my prefect flow and want to both narrow and wide operations on the dataframe, how do I go about implementing it ? Since all prefect tasks are submitted as dask.delayed objects, I can't directly use the dask API on my dask.dataframe inside the tasks as it's unreliable to use dask calls in dask calls. If I make so the narrow operations are mapped tasks, how do I write a groupby efficiently ? The results of the mapped tasks will be reduced and the groupby will be done on a single node. Both Dask documentation (https://examples.dask.org/applications/prefect-etl.html) and Prefect documentation (https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html) do not answer this issue. I haven't found an example of how to use dask objects inside a prefect task, or how to implement wide operations in pure prefect. It seems that by using Dask as its engine, Prefect prevents the use of the Dask API to do transformations.
    c
    • 2
    • 2
  • m

    Matthew Blau

    02/17/2021, 2:40 PM
    Hello all, my boss has asked a question about Prefect that I don't quite know the answer to. We are using Prefect server and the question was: can we host Prefect server on 1 server, but have our program code, including the flows, on another server? I don't quite know whether or not that is something Prefect Server can do. We are wanting a dedicated server for Prefect, but the program code hosted elsewhere in our infrastructure
    c
    • 2
    • 8
  • p

    Puja Thacker

    02/17/2021, 3:29 PM
    @Kyle Moon-Wright #prefect-community -- I am new to Prefect.. Can someone help me understand the difference between Prefect agent and Prefect executors?
    j
    • 2
    • 4
  • n

    Nikul

    02/17/2021, 4:35 PM
    Hi, I'm running a mapped task over ~400 elements on Kubernetes using DaskExecutor + KubeCluster, but I quickly run out of memory. The data I'm using is <5GB and the nodes I'm using have ~60GB of RAM. The job pod (running the Dask scheduler) reaches >40GB memory usage just before the mapped task starts and the node runs out of memory before any of the mapped tasks start. I was wondering if anyone knows what the issue is. Thank you
    m
    • 2
    • 4
  • a

    Ajith Kumara Beragala Acharige Lal

    02/17/2021, 5:57 PM
    Hi Prefect-Experts, Can someone point me to a simple example where you show steps to deploy and schedule a flow on kubernetes cluster via
    Helm
    , the Prefect server also deployed to the kubernetes via Helm, now need to try-out a flow on that Prefect-server, does anyone has example/page?
    j
    m
    • 3
    • 7
  • t

    Tobias Heintz

    02/17/2021, 8:26 PM
    Hi, we are evaluating using prefect to run our ML pipelines. Currently these are run as a series of ECS tasks on Fargate, triggered periodically through Cloudwatch. The big contender for the job is Airflow, here our setup would look something like this: have Airflow UI and scheduler running in the cloud (probably also as ECS tasks), and have a DAG which triggers the existing pieces of the pipeline in ECS. So far I haven't been able to find a similar functionality for prefect. Here's what I understood: we would run the prefect UI and server in the cloud, and have an ECS agent deployed that talks to the server. This agent would be able to run entire workflows within a dedicated ECS task, but I would rather trigger existing tasks. Would love if you could point me to documentation that explains the architecture, and maybe covers our use case as well. Thanks!
    a
    c
    • 3
    • 6
  • f

    Fina Silva-Santisteban

    02/18/2021, 1:27 AM
    Hi Prefect community! I’m using prefect cloud and deploy flow runs using the ECS Fargate agent successfully. To trigger a flow run I need to first be running a
    prefect ecs agent
    in a terminal, and then use the prefect dashboard to trigger that run. I’d like to build a standalone app, so have a custom GUI that lets a user input flow parameters and trigger a flow run. I’m imagining that I’ll have to create a prefect ecs agent using the prefect api instead of running it in a separate terminal but I’m not sure how to go about that. Pls let me know of any deployment recipes that fit my use case! 🙏
    s
    n
    m
    • 4
    • 15
  • p

    Puja Thacker

    02/18/2021, 7:45 AM
    #prefect-community --- Hello everyone.. I have a quick question on Prefect Cloud... How does Prefect Cloud isolate one client's content from other? Using Prefect Cloud, we will be accessing open Prefect API.. I am trying to understand at what level does it isolate one client from other? For example, For Informatica Cloud API, it maintains "Organizations" for each client and each environment, Tableau maintains "Sites" for each client for content isolation. So, in similar way, how does Prefect Cloud isolate clients from each other?
    m
    • 2
    • 1
  • p

    Puja Thacker

    02/18/2021, 7:46 AM
    Also, if we need to have different environments, like Dev, Preprod and Prod, how is that implmeneted at Prefect cloud level?
    m
    • 2
    • 1
  • j

    Julian

    02/18/2021, 10:31 AM
    Hey, I wonder if it is possible to name future scheduled flow_runs derived from some provided flowrun name-template and the scheduled time. At the moment, we schedule most of our flows using a CronSchedule. Then, the prefect-scheduler schedules future FlowRuns with funny flowrun names such as
    crazy_monkey
    , but they don't provide any actual value. Since it is quite nice (from ui perspective) to have meaningful names, we rename these flowruns during runtime like
    {flow_name}_{system_time}
    or
    {flow_name}_{parameters}
    . However, if a flowrun fails before executing the rename task, it won't have this "nice" name but the orignal (meaning-less) name. Also, renaming inside the flow will also overwrite flowrun names which were provided when manually starting a flowrun with name, e.g. from ui.
  • m

    Milly gupta

    02/18/2021, 11:47 AM
    Hi all, Does Prefect agent persist all task results some where?
    a
    m
    • 3
    • 11
  • s

    Siva Kumar

    02/18/2021, 2:57 PM
    #prefect-community how to return dataframe objects or serialized objects from task function, when i tried to return object i am getting an exception,pickle cloud object can not return serialized objeect
    m
    • 2
    • 1
  • j

    Joël Luijmes

    02/18/2021, 2:59 PM
    Hey, I’m looking for something similar as asked here (batch mapped tasks or nested mapping). Has anyone got something like this working? The example provided by Michael does something differently than I want. What I’m after is: 1. Retrieve some dynamic list (i.e. query from database) 2. Batch the result set in 20ish items to process in parallel 3. For each item in the batch, do a branch of chained tasks 4. Wait for batch to complete, repeat for next batch until exhausted Basically I want to write something as
    tasks = dynamic_list_of_tasks()
    windowed_tasks = fixed_window(tasks, window_size=5)
    
    def process_item(item):
        x = task_1()
        task_2(x)
    
    def process_window(tasks):
        apply_map(process_item, tasks)
    
    apply_map(process_window, windowed_tasks)
    j
    m
    • 3
    • 6
  • j

    Jean-Marc LE FEVRE

    02/18/2021, 3:04 PM
    Hello guys !
  • j

    Jean-Marc LE FEVRE

    02/18/2021, 3:05 PM
    We are trying to use prefect with gevent, and we are having trouble with it. It seems that gevent monkey patch is triggered too late in the flow run. Does anyone have advice for this case ?
    m
    r
    • 3
    • 12
  • k

    Karolína Bzdušek

    02/18/2021, 5:43 PM
    Please help, I am frustrated by this for too long. I am still chasing my tail 🙈 I want to have GitHub Storage, however I still can't make it work, although I am doing nothing fancy.
    flow.storage = GitHub(repo="cividi/gemeinde_workflows",path="/workflows/residential_density.py",access_token_secret="GITHUB_ACCESS_TOKEN")
    "GITHUB_ACCESS_TOKEN"
    is stored in secret in Cloud. However, when I run flow through UI, I am still getting this and not sure why. Note: repository is private and own by our team, I am using token generated under my account. What I am missing? Thanks for your help in advance.
    m
    • 2
    • 21
  • c

    Craig Wright

    02/18/2021, 8:08 PM
    Hi all! We would love feedback on the
    FivetranSyncTask
    . If anyone is even vaguely interested in more deterministic Fivetran scheduling, please reach out! https://prefect-community.slack.com/archives/CL09KTZPX/p1613678832100000
    :upvote: 3
    🚀 5
Powered by Linen
Title
c

Craig Wright

02/18/2021, 8:08 PM
Hi all! We would love feedback on the
FivetranSyncTask
. If anyone is even vaguely interested in more deterministic Fivetran scheduling, please reach out! https://prefect-community.slack.com/archives/CL09KTZPX/p1613678832100000
:upvote: 3
🚀 5
View count: 1