https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Asif Imran

    10/16/2020, 2:48 AM
    👋 Hello there! First week of 😛refect:. Loving it so far I ran into a small hitch (most likely an operator error) I use the DaskKubernetesExecutor with a custom spec
    flow.environment = DaskKubernetesEnvironment(
        worker_spec_file="worker_spec.yaml",
    )
    Some how my flow got into a funk where I can no longer update the spec. Even if I (re)-register the flow and push it to cloud+run, on my cluster a quick
    k pod describe
    reveals the very spec that I am trying to get rid of. Thus far I have tried deleting the flow and starting from scratch. I am pretty sure i dont want to have to rename the flow everytime I bump the version
    👀 1
    n
    • 2
    • 4
  • p

    Prathamesh

    10/16/2020, 4:22 AM
    Hello all I'm trying to build a script for Prefect orchestration wherein my functions are defined in different scripts and imported as modules. All using Python. Is this possible? I've tried the following but I'm getting an error as below: Function_A script:
    def func_a():
    do something
    Function_B script:
    def func b():
    do something
    Prefect script:
    from Function_A import func_a
    from Function_B import func_b
    @task
    def executing_func_a():
    func_a()
    @task
    def executing_func_b():
    func_b()
    .....flow register.....
    Flow registers successfully. When executing flow, received an error: ModuleNotFoundError("No module named _func_a")_ Please advise.
    r
    • 2
    • 3
  • r

    Robin

    10/16/2020, 10:22 AM
    Hey all 🙂 We are playing with the flow concurrency limits and added two labels with the following limits:
    k8s
    = 3 and
    customer
    = 1. So, we thought that with a prefect agent that has the labels
    k8s
    and
    customer
    , we should be able to run several flows at the same time (e.g. up to 3
    k8s
    labeled flows) but always only 1
    customer
    labeled flow. However, we were not able to run a
    ["customer", "k8s"]
    and a
    k8s
    flow at the same time. 😮 We then added another
    k8s
    agent (without the
    customer
    label) and now the flows indeed can run add the same time, 1 flow on each agent. Is this intended behavior? For us that does not feel right, because in that case one needs to update infrastructure whenever adding flow concurrency limits, right? 🤔
    n
    • 2
    • 10
  • a

    Alexander

    10/16/2020, 12:11 PM
    So to continue with my thoughts on flow registration (https://prefect-community.slack.com/archives/CL09KU1K7/p1602793993098800). I received several requests to share my current setup. (cc @Billy McMonagle, @Dylan). This is mainly a POC of prefect as workflow orchestration engine. 1. I have a vagrant VM where i install docker and docker-compose. 2. I took https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/docker-compose.yml and modified it a bit. I also pushed all variables defined in this docker compose in .env file. BTW i was wondering why
    prefect server start
    does not let you to detach?? It has so much control on how whole prefect server is deployed yet you cant use on anything rather then local console. Also i found that there was no actual documentation on all services of prefect server and how to connect them to each other. I was able to understand it only by reverse-engineering docker compose file and prefect server cli source code 😔 Other than that there is no really fancy stuff here, just regular compose file with some variables defined. 3. There is a big security hole in the whole setup called apollo. UI (read: your web browser) need access to apollo endpoint. There is no auth in both UI and apollo, which means you have to put the whole service behind the VPN, at least. 4. Then i have this nifty little dockerfile wich defines flow execution and registration environment. Nothing fancy too, except you have to somehow forward an apollo endpoint address. It also uses the same image as used by docker agent as its base. 5. On every commit i run this code:
    docker build -t hs-databricks:build_runtime -f flows/Dockerfile --build-arg APOLLO_PUBLIC_URL=$APOLLO_PUBLIC_URL .
    # Register all flows
    docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e APOLLO_PUBLIC_URL=$APOLLO_PUBLIC_URL hs-databricks:build_runtime \
     python /src/hs-databricks/flows/registry.py
    6. And there is a central flow registration script, which scans ./flows folded for python files, extracts flows from them, builds docker storage and registers the flows. It has no comments, but i wrote it just yesterday ☺️ My plans are, since this all is running in docker, try to deploy the prefect server + agent + flow storage in AWS Fargate, so i dont need to maintain any EC2 at all. Hope this will help anyone 👍
    docker-compose.ymlDockerfileregistry.py
    👏 2
    🙌 4
    s
    b
    +2
    • 5
    • 4
  • p

    psimakis

    10/16/2020, 3:18 PM
    Hello all, Is there any way to dynamically provide the
    webhook_secret
    on a
    SlackTask
    using a
    PrefectSecret
    ? I have tried to provide the
    webhook_secret
    on the instantiation but I get a
    TypeError
    Given this flow:
    with Flow(name='flow name') as flow:
        slack_custom_webhook_url = PrefectSecret('SLACK_E_SPIDER_STATISTICS')
        spider_name = Parameter('spider_name', default='spider a')
        with case(spider_name, 'spider a'):
            SlackTask(webhook_secret=slack_custom_webhook_url)(message='Test')
    Check out the full trace-back in the thread:
    n
    a
    • 3
    • 11
  • a

    Aaron Y

    10/16/2020, 7:30 PM
    trying to use the DaskExecutor to process a json list of records, and am getting this error?
    n
    • 2
    • 6
  • k

    Krzysztof Nawara

    10/17/2020, 12:45 PM
    Hi! During my continued experiments with caching I discovered something surprising. • I have a mapped task with two inputs - one is unmapped, the other one returns a list over which I map • all those tasks have caching enabled and use LocalResults • dummy cache_validator prints current inputs to the task and also cached inputs extracted from cached state • when executed locally it works fine - all cached_inputs are correctly populated • when executed using prefect-server only one input is populated - for the other one the input is present in the dictionary, but the value is None To keep this post brief I'm gonna put all of the details in the thread
    k
    • 2
    • 13
  • a

    Alberto de Santos

    10/17/2020, 2:00 PM
    Hi all! I am getting a bit confused and need clarification from this wise community.
  • a

    Alberto de Santos

    10/17/2020, 2:02 PM
    I am using a map for a set of elements (let’s call them COUNTRY_ID), then, for specific countries, the task failed (and it should failed), however, I would like to catch that error some how and deal with it properly using Prefect. I have read all the doc regarding Handlers, Results and stuff like this. However, I am still struggling to understand which is the best option to, let’s say in my own words, do a try/except in Prefect.
    n
    • 2
    • 6
  • n

    Nicolas Bigaouette

    10/17/2020, 6:21 PM
    Hi all! I'm exploring prefect for a specifc use case but I am not sure if it can perform what I need. Maybe someone can tell if it's possible? I have a web backend that sometimes needs to perform some commands sequences. Looking at a
    Flow
    seems to support this. But I have a specific requirement where a one (or more) command(s) are actually RPCs. Those RPCs are customs; the remote command to be executed is added to a queue and the remote worker polls that queue. After having executed a command, the remote worker will
    POST
    its result to the backend. There is multiple instances of the backend running (for redundancy and scaling) so the result will probably not be received by the process that created the RPC call. This creates a discontinuity in the code; The task driving the workflow has to stop when enqueuing its remote command and another instance will pick up the work at a later time. I can probably model my sequences with prefect just fines, except I am not sure how to handle the discontinuity described above. I though I could raise a "pause" signal in the task that initiated the custom RPC. Then the backend instance receiving the
    POST
    could send a "continue" signal to the paused task. Is that even possible? Can such a workflow be modeled by prospect? Thanks!!
    n
    • 2
    • 5
  • b

    Bruce Haggerty

    10/17/2020, 9:16 PM
    Hi, any suggestions on how to debug a "TypeError: can't pickle generator objects" error when attempting to create a new flow? I don't actually think I'm using any generator objects in my code.
    • 1
    • 1
  • l

    Leo Meyerovich (Graphistry)

    10/18/2020, 12:19 AM
    We're trying to figure out how to dispatch several concurrent interval flows, but the docs bottom out on this. Any pointers? 1. Original:
    for p in configs: interval_flow.run(parameters=p)
    <= was not concurrent 2. Docs: flow docs say to manually call `FlowRunner`: `for p in configs: FlowRunner(flow=interval_flow).run()`<= unclear if the intervals run concurrently, and unclear how to pass in different
    p
    to different flows Any pointers?
    n
    c
    • 3
    • 18
  • f

    Felix V

    10/18/2020, 8:48 PM
    Hello all! so how do you uninstall airflow? 😀
    😄 8
  • j

    Jasono

    10/18/2020, 11:40 PM
    Hi, If I have two tasks in a flow that have no interdependency, how do I make them run concurrently? For instance, this seems to cause task2 to wait until task1 finishes although task2 isn't dependent on task1.
    with Flow('') as flow
    r=task1()
    r2=task2()
    task3(r)
    
    flow.run()
    m
    • 2
    • 6
  • l

    Lior

    10/19/2020, 9:11 AM
    Hey, what's the status regarding event-driven / streaming flows? https://docs.prefect.io/core/PINs/PIN-08-Listener-Flows.html
    j
    • 2
    • 3
  • m

    Magda Stożek

    10/19/2020, 9:11 AM
    Hello 👋 Is it possible to somehow get the url of a currently running flow? I'd like to have a task which sends a slack notification, which would include a link to the current flow. I can't seem to find anything like that in the docs, but I thought I'd ask, maybe I'm missing something.
    j
    r
    • 3
    • 15
  • a

    Alberto de Santos

    10/19/2020, 10:00 AM
    Hi all
  • a

    Alberto de Santos

    10/19/2020, 10:02 AM
    Given a Map (with the parameter
    trigger
    setup to
    any_successful
    ), how could I deal with those tasks with the
    TRIGGERFAIL
    result? Could I make something like
    if TRIGGERFAIL: then ...
    ?
    n
    • 2
    • 53
  • a

    Avi A

    10/19/2020, 11:50 AM
    Hey, I’m sure this was already discussed here before but I couldn’t find the right conversation when searching for it… What’s the best way to create a flow that triggers a series of other flows one after the other (while waiting for each to finish before running the next)
    a
    • 2
    • 2
  • a

    ale

    10/19/2020, 11:57 AM
    Hi folks, today I’m facing a strange issue. I’m using
    AWSSecretsManager
    in a flow which runs on Prefect Fargate Agent. The flow works fine if the flow runs with
    launch_type="FARGATE"
    If the flow runs with
    launch_type="EC2"
    then I get the following error:
    botocore.exceptions.NoRegionError: You must specify a region.
    It seems that the flow cannot pick the region from the provided task/execution role… Any suggestions?
    r
    s
    • 3
    • 17
  • a

    Adam

    10/19/2020, 12:21 PM
    Non-prefect question: does anyone know a bit of regex and can help me with something very simple?
    n
    • 2
    • 1
  • e

    emre

    10/19/2020, 1:45 PM
    Hi everyone Although task mapping with DFE feels AMAZING, I have a need to use the old way (BFE) for my ELT pipeline. Is there any way to enforce BFE, either for the entire flow, or more preferably enforce at some mapping levels and get the best of both worlds? My use case is simply financial. My snowflake load operations take 1-2 minutes if run back to back, but DFE distributes the load operations over my 1 hour runtime, thus keeping my snowflake virtual warehouse up and increasing compute cost.
    k
    • 2
    • 4
  • r

    Ralph Willgoss

    10/19/2020, 2:28 PM
    Hi, Is there a trick in prefect to get a map of a map to parallelise correctly We've created a flow and it appears as though a map within a map isn't working as expected. I can produce an example but just wanted to know if this is a common problem.
    k
    • 2
    • 7
  • r

    Ralph Willgoss

    10/19/2020, 3:04 PM
    Also, Im using DaskLocalExecutor and I'm using scheduler=processes as I believe that's recommend. I'm also specifying the number of workers, but will each worker spin up more than one thread?
  • n

    Newskooler

    10/19/2020, 3:26 PM
    Hi 👋 I get a strange error in one of my tasks when I try to register a flow:
    ValidationError: Value is not JSON-compatible
    Does anyone know why? I could not trace the source of the issue : /
    r
    • 2
    • 12
  • t

    tarikki

    10/19/2020, 3:34 PM
    Hi! Is there a way to start the prefect server with
    prefect server start
    in the background? Alternatively, is there a
    docker-compose.yml
    file that can be used to start the local setup in the background? Cheers! 😊
    🙌 1
    e
    • 2
    • 4
  • j

    Jasono

    10/19/2020, 4:08 PM
    Hi, I’m getting
    Context object has no attribute 'today'
    error when I try to get
    prefect.context.today
    in the Flow block. According to the documentation,
    today
    is one of those variables always available during task runs. What am I missing?
    c
    • 2
    • 11
  • r

    Robin

    10/19/2020, 4:15 PM
    Hey all 🙂 We got the following error when trying to map over a list of dictionaries. First just to clarify: mapping over a list of dictionaries should be fine, right? Second, what could be the reason for the message?
    ✔️ 1
    j
    • 2
    • 7
  • k

    Krzysztof Nawara

    10/19/2020, 5:37 PM
    It seems that weekend posts usually go under the radar, so reposting once again :)
    k
    • 2
    • 1
  • b

    Bob Colner

    10/19/2020, 6:37 PM
    For prefect mapping, is it possible to revert back to the old 'breath first' behavior?
    c
    m
    • 3
    • 5
Powered by Linen
Title
b

Bob Colner

10/19/2020, 6:37 PM
For prefect mapping, is it possible to revert back to the old 'breath first' behavior?
c

Chris White

10/19/2020, 6:42 PM
Hi Bob, while there isn’t a first-class API for toggling this behavior, you can enforce breadth first by adding a “dummy” task at each level of your mapped pipeline:
level_one = task_one.map(some_list)
one_wait = Task(name="Level 1 Block")(upstream_tasks=[level_one])

level_two = task_two.map(some_other_list, upstream_tasks=[one_wait])
Task(name="Level 2 Block")(upstream_tasks=[level_two])
etc.
Coincidentally, someone else asked for this only a few hours ago: https://prefect-community.slack.com/archives/CL09KU1K7/p1603115121236400
b

Bob Colner

10/19/2020, 6:45 PM
cool, thanks Chris. Will be great to expose this as a parameter option in the future
👍 1
m

Mike Marinaccio

10/29/2020, 8:53 PM
An even simpler approach seems to be working for me. Am I overlooking something?
level_one = task_one.map(some_list)

level_two = task_two.map(some_other_list, upstream_tasks=[level_one])
c

Chris White

10/29/2020, 8:56 PM
Ah that’s great! The middle task is unnecessary - you can set the dependency directly, nice
🎉 1
View count: 3