https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Adam Shamlian

    05/20/2021, 3:18 PM
    Hi all. I have a set of inputs that I am going to map to a series of tasks that dynamically touch databases. Consequently, I don't want to directly pass database connections (to avoid any nonsense in a distributed environment). If I have:
    @task
    def generate_inputs_from_params(args):
        #....
    
    @task
    def create_db_conn(args):
        # ....
    
    @task 
    def do_db_work(args):
        # ....
    
    @task
    def do_some_other_work(args):
        # ...
    
    with Flow("example") as f:
        # Parameter tasks
        conn_inputs, db_work_inputs, other_work_inputs = generate_inputs_from_params(args) # from param tasks
        conn_map = create_db_conn.map(conn_inputs)
        res_map = do_db_work.map(conn_map, db_work_inputs)
        res2_map = do_some_other_work(res_map, other_work_inputs)
        # some reduce func if necessary
    I have two questions about this: 1. Is that flow constructed properly - I'm ultimately after something like:
    inputs = generate_inputs_from_params(args) # from param tasks
    for (conn_input, db_work_input, other_work_input) in inputs:
        conn = create_db_conn(conn_input)
        res = do_db_work(conn, db_work_input)
        res2 = do_some_other_work(res2, other_work_input)
    2. When mapping over credentials dynamically, would I inject `Secret`s into the
    conn_inputs
    or would I resolve the proper
    Secret
    "within the for loop" (i.e. extending the map chain to include an additional layer that resolves `Secret`s)? My understanding of docs is that if I do the former, the secret data would be exposed in
    conn_inputs
    , which in a distributed environment means that plaintext credentials could be making network hops, or in any environment would be persisted as part of
    Result
    instances. I'd like to make sure I'm understanding this correctly.
    k
    • 2
    • 18
  • j

    Jocelyn Boullier

    05/20/2021, 3:43 PM
    Is there a way to have a task take several tasks as argument (in a list), have it run even if one of the argument fails, and have the list passed as parameter contain the exception(s) + the results ? I'm asking because I'd like to have a Slack report based on the results of several tasks, and either I'm adding an handler for each task but then I have three slack messages, or I add it at the Flow level but then I don't have (or at least I don't know how) access to the state and results of each task.
    k
    • 2
    • 10
  • j

    Joe Schmid

    05/20/2021, 5:23 PM
    Hi Prefect community, for anyone attending this week's Dask Summit online conference, I'll be giving one of the keynote presentations tomorrow morning at 9am ET along with my colleague @Jie Lou on our use of Prefect & Dask for machine learning in healthcare. Prefect (and Dask) have become very key parts of our technology and have delivered great results for our business. Paid tickets are required to attend live tomorrow, but I think the talks will get posted to the Dask Youtube channel for free access after the conference: https://summit.dask.org/schedule/presentation/62/dask-prefect-for-healthcare-machine-learning-on-aws/
    :upvote: 1
    👍 3
    k
    d
    +2
    • 5
    • 18
  • z

    Zach Schumacher

    05/20/2021, 7:08 PM
    is there anyway to pass a secret into an environment variable in prefect cloud? e.g. I know i can set env variables using a template or via the KubernetesRun object, but both approaches get displayed in plain text in the UI
    k
    • 2
    • 13
  • m

    Marwan Sarieddine

    05/20/2021, 9:28 PM
    Hi folks, this is a question of passing a custom context to a prefect flow run. More specifically about passing a custom context with a nested dictionary.
    k
    m
    m
    • 4
    • 36
  • s

    Sean Harkins

    05/20/2021, 10:56 PM
    I’m experiencing an issue with unpickling when a flow is registered from inside a conda environment. Registering the flow directly from a
    virtualenv
    which contains the necessary dependencies works and the flow runs as expected. But registering the flow from a conda environment I receive this unpickling error when running.
    k
    • 2
    • 17
  • d

    Damien Ramunno-Johnson

    05/20/2021, 11:04 PM
    I have a quick question, I was thinking about using preemptible machines. If I was using checkpointing and caching the results I guess it would just retry the flow?
    k
    d
    t
    • 4
    • 14
  • j

    Jeremy Phelps

    05/21/2021, 1:36 AM
    Is there a way to get the agent to only run flows from a particular project?
    m
    k
    • 3
    • 8
  • p

    Peter Roelants

    05/21/2021, 9:57 AM
    Hi Prefect, Running a flow from another flow can be done through
    StartFlowRun
    . However, this needs a flow being registered prior to calling
    StartFlowRun.run()
    , which requires Prefect Cloud or server to register the flow. Is there an alternative to use dependent flows when no prefect Cloud or Server is available (for example when running a unit-test testing the full flow and its dependent flows)?
    k
    • 2
    • 2
  • m

    Mark McDonald

    05/21/2021, 1:01 PM
    Hi - I ran into a problem recently with the Task Concurrency Limiting feature. I have a task tag that is supposed to throttle the number of tasks that can interact with a particular database. In the past, this tag was only applied to a task that is mapped in a single flow. Yesterday, I tried applying this same tag to a separate mapped task in a different flow that interacts with this same database. At first, it worked as I expected. However then I saw that any task with this tag applied to it would just sit in state of "Queued" and never move into "Running". I made sure that no other tasks were running with tasks using this tag, but the behavior remained the same. I ended up having to delete the tag to get the tasks to run. My questions are: 1. is it appropriate to have a single task tag shared between different flow's tasks that have a common resource dependency limitation like the one I've described? 2. Any idea why all these tasks were getting stuck in a "Queued" state when there were no actively running tasks with that tag?
    k
    • 2
    • 29
  • r

    Richard Hughes

    05/21/2021, 1:19 PM
    Hi, I am looking for something to either run to look at long running flows and send alerts based upon these flow running over let's say an hour for example. Does anyone have a good reference to something similar or have any ideas for these types of issues I am having? Thanks in advance!
    k
    • 2
    • 14
  • a

    Adam

    05/21/2021, 2:54 PM
    Does anyone have an example of how to use KubernetesSecret? We are using Bitbucket for storage, and the password is stored in Kubernetes as “bb-flows-pull”. Everything works if we hardcode the secret into the script but this isn’t an option for us. I’m a complete beginner on this so apologies if this is something simple.
    k
    d
    • 3
    • 6
  • b

    Bruno Murino

    05/21/2021, 3:10 PM
    Hi everyone — I’m doing a PoC of using Prefect at my company and I’m running an ECS Agent as an ECS Service. I’m wondering how much memory does the ECS agent needs, in terms of vCPU and GB. I assume this can be quite minimal..
    k
    • 2
    • 4
  • j

    Josiah Berkebile

    05/21/2021, 3:29 PM
    Is there a migration guide somewhere for moving from Airflow to Prefect?
  • j

    Josiah Berkebile

    05/21/2021, 3:30 PM
    Second question: How's the static typing/MyPy support in Prefect?
    k
    • 2
    • 4
  • j

    Jason Prado

    05/21/2021, 4:21 PM
    Is there a way to
    prefect execute flow-run
    an already-completed run? My use case is that I’m building a custom container to execute flows stored in script storage on my kubernetes agent, but when the flow runs it can’t find python dependencies (
    import pandas
    fails). I’d like to repro it locally, but when I try and run the flow I see
    Flow run has already finished.
    . Other advice for debugging would be welcome; I’ve run a shell in the container and I do see my dependencies FWIW.
    k
    • 2
    • 2
  • a

    Andrew Nichol

    05/21/2021, 4:47 PM
    howdy id like to have an action in my tasks/flows happen upon cancellation from the prefect ui - is this doable?
    k
    • 2
    • 13
  • d

    David Elliott

    05/21/2021, 5:39 PM
    Hey all! I'm really struggling to register a very large flow (~1200 tasks) to cloud- I'm hitting an
    API_ERROR
    on flow.register(). I had this issue intermittently when I last worked on this project (late March), related to this post, but it used to register after 1 or 2 retries. However now I can't register at all - I've tried 6 times and still can't register the flow. I'd be very grateful for any advice please! Prefect version 0.14.14, docker storage pushing to ECR, full error message in thread!
    m
    • 2
    • 16
  • p

    Philip MacMenamin

    05/21/2021, 5:49 PM
    Issue with new version of register. Previously (and currently although I'm getting warnings) I was able to register flows with this:
    prefect register flow --file flow.py --project proj_name
    This works without issue, however the recommendation is to use the new format:
    prefect register --project my-project -p myflows/
    k
    m
    • 3
    • 13
  • s

    Sébastien Arnaud

    05/21/2021, 5:56 PM
    Is there a way to pass an existing dask client to Prefect (instead of pass a DaskExecutor which launches a cluster on its own) - asking in particular to be able to run coiled cluster commands to launch cluster, register some worker plugins, trigger a scale, then pass it to Prefect for execution.
    k
    b
    • 3
    • 9
  • a

    ash

    05/21/2021, 8:20 PM
    Greetings everyone , I am facing issue while connecting my gitlab storage for flow registration. I did try the steps mentioned in docs but i guess I am missing on something. Can anyone please share in simple steps of resolving the same.
    k
    • 2
    • 32
  • g

    gotham

    05/21/2021, 8:35 PM
    Hello Prefectians 🙂, just starting out on Prefect, at our Organisation we are trying to use Prefect with Elasticsearch just wanted to know if anyone had any experience to guide me with this
    k
    • 2
    • 6
  • a

    Andrew Nichol

    05/22/2021, 3:02 AM
    do y'all know how to propogate canceled states from the flow to task? it seems lik the task just keeps running when i cancel a flow... retrying the flow also doesnt work until i explicitly set the state of the active task to "cancelled" (even after having cancelled the flow)
    k
    • 2
    • 10
  • g

    Giovanni Giacco

    05/22/2021, 8:17 AM
    Hello everyone, I’m using an ECS Agent and S3 storage and I’d like to know how I can specify the docker image in the Run Config (ECSRun) at runtime. I tried to get the image to use from an environment variable but it does not work. I need that because I’d like to use different docker images in staging and production environments. Any suggestion?
    j
    • 2
    • 2
  • j

    Jeremy Phelps

    05/22/2021, 10:05 PM
    Is there a way to get the logs from a single element of a mapped task?
    k
    • 2
    • 8
  • a

    ale

    05/24/2021, 10:20 AM
    Hey folks, we’re experiencing some intermittent issue with Fargate Agent in EC2 mode. we are on
    prefect==0.13.13
    From time to time, tasks fail to start and we get the following error:
    f6edaa77-6dc6-4b85-953a-25ccdb4fb366 finished in state <Failed: "list index out of range">
    where
    f6edaa77-6dc6-4b85-953a-25ccdb4fb366
    is the task run id. We can’t find any meaningful log on ECS, but I suspect this error is caused by the fact that the container fails to start. Any suggestions? Thanks!
    k
    • 2
    • 9
  • z

    Zack Novak

    05/24/2021, 4:46 PM
    Hey everybody. My name is Zack Novak, and I’m a data engineer at Second Front Systems. We are building a platform as a service to support bringing commercial technology into the Department of Defense and other government agencies. We want to use Prefect, but can’t as it is not in the Department of Defense’s hardened container repository called Ironbank. Has Prefect gone through this Ironbank onboarding previously / would this be something Prefect is interested in? Currently the only two DoD approved workflow solutions are Airflow and Argo, and Prefect is the preferred solution over those two.
    k
    • 2
    • 5
  • j

    Joseph Loss

    05/24/2021, 7:43 PM
    Hey guys, curious if someone can help me understand task mapping / flattening for an iterated for loop. This loop calculates tracking error for each account in several strategies, and as such takes quite some time. I know task mapping would be of use here, but can't quite seem to get it right from the docs.
    strategies = ['strat1', 'strat2', 'strat3']
    
    for strat in strategies:
        strat = [strat]
        
        # this gets the list of applicable accounts
        lstAccnts = GetRiskModelAccnts(strategy = strat)
    
        # this gets the tracking error risk on an account level (each account has between 1 and 3 strategies)
        dfAccntsTEc, dfAccntsTE, dfAccntsMoments = RiskWrapper(
                                                               lstAccnt = lstAccnts,
                                                               benchmark = 'SPY',
                                                               lstAllowStrats = strat,
                                                               AggregationLevel = ['accnt']
                                                   )
    
        # this gets the tracking error risk on an account-strategy level 
        dfAccntsTEc, dfAccntsTE, dfAccntsMoments = RiskWrapper(
                                                               lstAccnt = lstAccnts,
                                                               benchmark = 'SPY',
                                                               lstAllowStrats = strat,
                                                               AggregationLevel = ['accnt', 'strategy']
                                                   )
    You'll notice the aggregation level is also something that is iterated, and there is a third aggregation level ['accnt', 'strategy', 'strategyLevel'] that isn't shown above. I know mapping could be of use here as well but I'm not sure the correct syntax to implement it.
    k
    • 2
    • 9
  • a

    Adam Shamlian

    05/24/2021, 9:46 PM
    Are there any prohibitions/caveats associated with a non-task function in a flow, particularly with respect to how the framework might serialize the function and flow? As an example:
    def my_non_task_util_func(some_args):
        return util_results
    
    @task
    def task_a(a_args):
        return a_result
    
    @task
    def task_b(b_args):
        return b_result
    
    with Flow('test') as f:
        util = my_non_task_util_func(some_args)
        util_2 = my_non_task_util_func(some_other_args)
        a = task_a(util, other_a_args)
        b = task_b(util, util_2, a)
    Should I be concerned about how Prefect treats this? For further context, this util func is a factory for a collection of
    related prefect.tasks.core.constants.Constant
    s. Thus far, it has been easier to return these from a vanilla func rather than a task that returns a collection of tasks.
    a
    • 2
    • 2
  • j

    joshua mclellan

    05/24/2021, 10:42 PM
    Hey, I was wondering what kind of solutions people have found to event-driven flow execution. I saw a there was a PIN for the issue but I wanted to know how people have been solving this issue without built in functionality (or if there is some functionality ive overlooked)
    k
    • 2
    • 6
Powered by Linen
Title
j

joshua mclellan

05/24/2021, 10:42 PM
Hey, I was wondering what kind of solutions people have found to event-driven flow execution. I saw a there was a PIN for the issue but I wanted to know how people have been solving this issue without built in functionality (or if there is some functionality ive overlooked)
k

Kevin Kho

05/24/2021, 10:57 PM
Hey @joshua mclellan, so it’s a matter of using the API to start a flow run. Here is an example blog: https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a
So you would connect this to let’ s say Lambda. Lambda would listen for events and then trigger the Prefect flow when it detects one.
j

joshua mclellan

05/24/2021, 11:03 PM
Would i have access to that even if I used the core server as opposed to the prefect cloud?
k

Kevin Kho

05/24/2021, 11:03 PM
Yes you would. The API would be you server API
🔥 1
j

joshua mclellan

05/24/2021, 11:05 PM
I assumed that would be the case but figured id double check
k

Kevin Kho

05/24/2021, 11:57 PM
By the way, in case you don’t know, Cloud gives 10,000 free task runs every month which is a lot to get started with and a lot easier than server
View count: 1