https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Abuzar Shaikh

    08/11/2021, 8:01 AM
    Hello everyone, I want to map a task, inside a task, but whenever I try it gives me the following error: `ValueError: Could not infer an active Flow context while creating edge to <Task: map_fn>. This often means you called a task outside a
    with Flow(...)
    block. If you're trying to run this task outside of a Flow context, you need to call
    map_fn.run(...)
    Below is the replication of what I want to do:
    from prefect import Flow, task
    
    numbers = [1, 2, 3]
    
    @task
    def map_fn(x):
        return x + 1
    
    @task
    def reduce_fn():
        res = map_fn.map(numbers)
        print(res)
    
        return res + [1]
    
    with Flow('flow1') as flow1:
        print(reduce_fn())
    
    flow1.run()
    Any suggestions or workaround will be appreciated.
    s
    k
    • 3
    • 3
  • m

    Michael Law

    08/11/2021, 8:05 AM
    Hey guys, I'm seeing some pretty substantial delays in my jobs where my agent appears to be having SSL connection issues going from AKS with (prefect:python37-latest) to Prefect cloud with my flows registered on Azure Storage. I've seen in previous comments that this could be to do with rate limiting? urllib3.exceptions.ReadTimeoutError: HTTPSConnectionPool(host='10.0.0.1', port=443): Read timed out. (read timeout=None) Details in thread
    k
    • 2
    • 13
  • a

    Andreas Eisenbarth

    08/11/2021, 9:42 AM
    Hello, we always use checkpoints and want to enable checkpointing in Python by default to avoid potentially error-prone extra steps for correct environment setup. It seems that the following approaches do not work (before creating a flow):
    prefect.config.flows.checkpointing = True
    or
    os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "True"
    Instead, we need to ensure that no
    prefect
    import statement (direct or indirect) occurs before setting this environment variable. It seems impractical that prefect reads such configuration in top-level code (executed on import) and not in functions. Are there any better solutions?
    k
    k
    • 3
    • 3
  • d

    Didier Marin

    08/11/2021, 10:26 AM
    Hello, I'm looking for a way to pause a task if a certain condition is met, and resume it after the user confirms. Here is a minimal example:
    from prefect import task, Flow, Parameter
    from prefect.engine.signals import PAUSE
    
    @task
    def dummy_task(x):
        confirmed = ???
        if x > 100 and not confirmed:
            raise PAUSE("Are you sure ?")
        return x + 1
    
    with Flow("dummy") as flow:
        x = Parameter('x', default=1)
        dummy_task(x)
    Is there a clean way of getting previous states of a task, maybe from
    prefect.context
    , such that I know confirmation happened ? Or do I have to add some branching to handle this ?
    s
    • 2
    • 3
  • f

    Fabrice Toussaint

    08/11/2021, 11:57 AM
    Hi, Why does the result (a tuple) of a mapped function used as input to another mapped function throw
    ValueError("Cycle found; flows must be acyclic!")
    ? Or to rephrase the question: "Why is it cyclic at all?" Minimum example:
    from prefect import Flow, apply_map, task
    
    
    @task()
    def do_something_one(item):
        return item + 1
    
    
    @task()
    def do_something_two(item, item_one):
        return item + item_one
    
    
    @task()
    def do_something_three(item_combination):
        return item_combination[0] + item_combination[1]
    
    
    # mapping function one
    def do_something_with_one_two(item):
        result_one = do_something_one(item)
        result_two = do_something_two(item, result_one)
        return (result_one, result_two)
    
    
    # mapping function two
    def do_something_with_three(item):
        return do_something_three(item)
    
    
    with Flow("test flow") as flow:
        l = [1, 2, 3]
        results = apply_map(do_something_with_one_two, l)
        results_results = apply_map(do_something_with_three, results)
    
    flow.run()
    The visualization can be seen below.
    k
    m
    • 3
    • 6
  • b

    Billy McMonagle

    08/11/2021, 3:19 PM
    Hi there, I have generated a new API key and am trying to use it in a Kubernetes Agent, but I'm getting an unexpected error.
    k
    m
    • 3
    • 20
  • n

    Nivi Mukka

    08/11/2021, 3:19 PM
    Hi Team, I have Prefect Cloud setup on GKE cluster with Dask also which is being used as the Executor. Can I use
    multiprocessing
    inside my task code and how is that expected to behave in Prefect+Dask? Code looks like this inside the task:
    from tqdm import tqdm
    import multiprocessing
    
    pool = multiprocessing.Pool(processes=16)
    func_output = list(tqdm(pool.imap(some_func, some_functions_input)))
    I am getting this error:
    AssertionError: daemonic processes are not allowed to have children
    k
    • 2
    • 12
  • y

    YD

    08/11/2021, 5:17 PM
    Task do not run in parallel I might not understand the parallel execution well. I have two tasks that are not demanded on each other, when I trigger the flow manually, one task runs and the other is pending (when looking at the UI flow run schematic). the code looks like
    from datetime import timedelta
    from prefect import task, Flow
    from prefect.schedules import CronSchedule
    from time import sleep
    
    
    @task(max_retries=1, retry_delay=timedelta(minutes=10), timeout=2000)
    def tast_1():
        sleep(10)
        print('Do task 1')
        return True
    
    
    @task(max_retries=1, retry_delay=timedelta(minutes=10), timeout=2000)
    def tast_2():
        sleep(10)
        print('Do task 2')
        return True
    
    
    def main():
        schedule = CronSchedule("0 15 * * *")
    
        with Flow("parallel tasks", schedule=schedule) as flow:
            r1 = tast_1()
            r2 = tast_2()
    
        flow.register(project_name="parallel tasks")
    
    
    if __name__ == "__main__":
        main()
    k
    k
    • 3
    • 3
  • k

    Kyle McChesney

    08/11/2021, 8:12 PM
    high level question, is it possible to use/reference parameters within a state handler. The use case is as follows • I have a “job” record in a database/application that is external to prefect • The execution of this external “job” can be represented/encapsulated partially by a flow run • I’d like to use state handlers to update the the job status in the external system • To do so, I need to grab the external job id from a parameter (since it is an input to the flow) Reasons for not putting this in a task • There may be a number of different flows which will use this, but the update logic will be the same • The external job id is an optional input, so I want to avoid coupling it as much as possible with the flow code • I want to apply these “updates” and different parts of the flow (aka specific tasks), for different flows it will be for different tasks
    c
    • 2
    • 7
  • g

    Gustavo de Paula

    08/11/2021, 8:51 PM
    Hi guys! I'm trying to run a flow with a custom flow class that requires
    python >= 3.8
    . When running it with the local agent, it works fine, but when I try to run it with the kubernetes agent, using as image one that extends
    prefecthq/prefect:latest-python3.8
    , I get a python version mismatch error saying that it is trying to run the flow with a python 3.7.10. Is there a way I can change this python version?
    k
    • 2
    • 31
  • n

    Nadav

    08/11/2021, 10:26 PM
    Hi, I am trying to write a flow that uses boto3 for interacting with AWS resources, i am installing prefect with aws extra in the docker image, but the flow fails in tasks that try to create aws clients with: “`botocore.exceptions.DataNotFoundError: Unable to load data for: endpoints`” any suggestions ?
    k
    • 2
    • 9
  • o

    Open-Issue Prefect Team 1

    08/11/2021, 10:44 PM
    I've got a newbie question that I haven't been able to figure out from the documentation. I'd prefer to use the Cloud backend. Since the code for flows isn't stored on the Cloud backend, are there approaches one can use to avoid having to run a machine 24/7? (At this point I'm anticipating only having simple flows that run once a day.) The closest I've seen is this article which talks about using Docker, AWS ECR, AWS Batch, and Cloudwatch to avoid a dedicated machine. However, I'm not sure if the author was using Prefect Cloud or Server. Plus, if one sets up Cloudwatch to fire off events to AWS Batch, then I'm not understanding the benefit of using Cloud anymore.
    k
    m
    • 3
    • 7
  • y

    YD

    08/11/2021, 11:16 PM
    registering flows question ... if a user A register a flow with a specific name can user B register a flow with the same name on the same project? will it overwrite the first flow ?
    k
    • 2
    • 1
  • w

    William Grim

    08/11/2021, 11:56 PM
    Hey there, I’ve got a question for which I’m sure there’s a solution. I’ve got a flow that is scheduled to run daily. It’s job is to run other flows that have been added to prefect over time (e.g. our system lets people register instances of flows). However, when this top-level flow gets registered, prefect builds a graph that only knows about all the lower-level flows that had been previously registered and doesn’t recognize changes. So, I tried creating a task called “run_dymamic” that could kick off other tasks, but nothing happens. Is there a way to create a flow that can “change” after its been registered? Basically, when the flow gets executed, I want that instance’s run to modify its graph to data it finds in a database, for example. I’m trying my best not to be abstract in my way of speaking.
    k
    c
    • 3
    • 11
  • i

    Ivan Indjic

    08/12/2021, 8:48 AM
    Hi guys, I have a question about organizing my prefect tasks. I have two python files. One of them is "main.py" where I defined the flow and a few tasks, and the second one is called "mytasks.py" in which I defined just one task. They're both located on my gitlab repository and at the same root level. I imported task from mytasks.py in my "main.py" file. However, when I try to run that flow on Docker agent, it gives me an error saying that there is no module called "mytasks" . Can you help me with this?
    g
    s
    k
    • 4
    • 12
  • n

    Newskooler

    08/12/2021, 9:24 AM
    Hi 👋 I am struggling to make my prefect depth-first and not breadth-first. My flow is set up like so:
    from prefect.executors import LocalDaskExecutor
    from prefect import Flow
    
    with Flow(name=name, executor=LocalDaskExecutor(scheduler="threads", num_workers=1)) as flow:
        ...
    Do I need to use another executer or where can I learn more about this?
    k
    • 2
    • 8
  • n

    Nishtha Varshney

    08/12/2021, 10:50 AM
    Hey is there any inbuilt function of prefect to extract/pull incremental data using api?
    k
    • 2
    • 4
  • s

    Samuel Kohlleffel

    08/12/2021, 12:38 PM
    I'm having trouble using parameters when running my flow from the CLI. It works correctly when I run the flow by calling
    flow.run(parameters=dict(rebuild="true"))
    but when I try to run
    python -m python.module --param rebuild=true
    the parameter is not recognized and it runs the flow with the default value. What am I doing wrong here?
    m
    • 2
    • 2
  • m

    marios

    08/12/2021, 2:05 PM
    Hi, hi! I am trying to find out if there is a way to generate a timeline-like diagram of the flows I have registered on Prefect. Something to showcase that "Flow A is triggered by Flow B which scheduled at 12:00". Something similar to the schematic that I get on flow level with tasks.
    k
    • 2
    • 4
  • i

    Italo Barros

    08/12/2021, 3:06 PM
    Hello everyone! Just a noobie question here, how do I create edges (one task can only execute if the previous is successful) between tasks that don't pass "results" between them? For example, let suppose that I have a program that creates some folders, login to an API, and collects some results being: Create Folder 1 > Create Folder 2 > Login to API > Collect Results The three initial tasks don't return results, but I want to run in "chained" execution.
    t
    k
    c
    • 4
    • 7
  • n

    Nicholas Chammas

    08/12/2021, 3:10 PM
    I don’t have a question. I just want to say that @Kevin Kho is a trooper for keeping up with all these questions and helping people be productive with Prefect. Thank you.
    :thank-you: 1
    :upvote: 19
    k
    • 2
    • 1
  • r

    Ruslan Aliev

    08/12/2021, 3:23 PM
    Hi, everybody! I try to find right pipeline for my computer vision scenarios, which consist from several tasks(detection, segmentation, tracking, some_business_tasks, etc.). Each task has a lot of parameters, own environment(docker image) and depend on the result of previous task. Task result stored in noSQL DB. I have 5 virtual machines: 2 got GPU and 3 got CPU only.  I want to run some tasks(detection, segmentation.) only on VM with GPU. How can I organize Prefect flow for this pipeline?
    k
    • 2
    • 2
  • h

    hunter

    08/12/2021, 4:32 PM
    Hi all! I see a feature for logging stdout to the cloud console, but is there a similar feature for stderr? I’m trying to integrate Prefect into a codebase with a lot of existing C++ utilities that use glog for logging, and I’m having trouble getting the logs for these utils to show up in the console.
    k
    • 2
    • 9
  • k

    Kyle McChesney

    08/12/2021, 4:43 PM
    I am trying to set some custom environment variables for the code running in my flows. This code is python code in a custom module thats installed into my docker container and imported by my flow code. To that end, I’d like to avoid making it dependent on prefect to access said environment variables (ex: from prefect config, etc). I have added a script to
    /etc/profile.d/
    that exports the necessary env vars, but it does not seem to be loaded by default. Likely due to ENTRYPOINT/CMD issues (no login shell?). Wondering if anyone has found a way to do this. I am aware that I could configure the agent to pass env vars in OR update the run config for the flow with them but for a number of reasons, I am hoping to avoid this. The goal is to get env vars set somehow in docker build that are loaded through a normal “source” type process on container start
    h
    k
    • 3
    • 3
  • j

    Joe Schmid

    08/12/2021, 5:30 PM
    We're running into an intermittent failures running Flows for ML hyperparameter tuning on a small/medium sized Dask cluster (12 workers) on AWS. The stack trace (I'll post in a thread reply in a sec) shows an exception down in Dask/pickle, but this is unfortunately intermittent. Anyone run into something similar? (This is on Dask version 2021.07.1 and a slightly older Prefect version: 0.13.19)
    k
    • 2
    • 4
  • c

    Charles Liu

    08/12/2021, 6:44 PM
    Is there a way to run a task quickly in-line during the flow orchestration block? I'm currently using .run() to attempt to execute a PrefectSecret because it contains a dict that I need info from, and the secrets don't seem iterable on their own.
    k
    • 2
    • 6
  • a

    Anh Nguyen

    08/12/2021, 8:02 PM
    hello everybody, I'm using Prefect to ETL data. But I don't know the strategy to follow yet: 1/ How many records were extracted and loaded? 2/ How to detect data need to baseline(new or update from source table) ? Thanks all
    k
    • 2
    • 2
  • n

    Nivi Mukka

    08/12/2021, 9:12 PM
    Hi Team, what are the disadvantages of disabling the
    Lazarus
    service? https://docs.prefect.io/orchestration/concepts/services.html#lazarus
    k
    • 2
    • 1
  • d

    Danny Vilela

    08/12/2021, 10:15 PM
    Hi! If we want to have a parameter within a flow like
    snapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
    and pass that into another (class-based) task, how should we do that? Should we pass it to the task initialization (e.g.,
    my_task = MyTask(snapshot_date=snapshot_date)
    ) or task call (i.e.,
    my_task(snapshot_date=snapshot_date)
    )? This is assuming that
    MyTask
    uses
    snapshot_date
    within its
    run
    method.
    k
    • 2
    • 12
  • a

    Aric Huang

    08/12/2021, 10:28 PM
    I want to implement Slack notifications on flow state change that can notify/@-mention a specific Slack user, which can be different between flow runs. I see documentation for using the
    slack_notifier
    (https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#slack-notifications) or creating your own state handler to attach to a flow (https://docs.prefect.io/core/concepts/notifications.html#state-handlers) but don't see a straightforward way to change the Slack message based on some runtime value other than
    old_state
    and
    new_state
    . Is there any recommended way to do this? I think if there's some way for a state handler to read a Parameter value that would work for me, but not sure if that's possible.
    k
    • 2
    • 9
Powered by Linen
Title
a

Aric Huang

08/12/2021, 10:28 PM
I want to implement Slack notifications on flow state change that can notify/@-mention a specific Slack user, which can be different between flow runs. I see documentation for using the
slack_notifier
(https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#slack-notifications) or creating your own state handler to attach to a flow (https://docs.prefect.io/core/concepts/notifications.html#state-handlers) but don't see a straightforward way to change the Slack message based on some runtime value other than
old_state
and
new_state
. Is there any recommended way to do this? I think if there's some way for a state handler to read a Parameter value that would work for me, but not sure if that's possible.
k

Kevin Kho

08/12/2021, 10:31 PM
Hey @Aric Huang, I don’t think you can do it with that notifier. Instead, use the SlackTask to customize the message. I suggest putting it in your state handler and then using
SlackTask(message).run()
and this will be the easiest way to fire out a message
a

Aric Huang

08/12/2021, 10:35 PM
@Kevin Kho OK, would that state handler look something like this:
def post_to_slack(task, old_state, new_state):
    msg = "Task {0} is now in state {1}".format(task, new_state)
    SlackTask(msg).run()
    return new_state
And then this state handler would be attached to a flow or task?
k

Kevin Kho

08/12/2021, 10:36 PM
Yes exactly
a

Aric Huang

08/12/2021, 10:37 PM
What I'm not seeing is how I could vary the message at runtime. For example if I want the person running the flow to specify who to notify when running it
For example i'd like to do something like this:
def post_to_slack(task, old_state, new_state):
    username = <some Parameter value>
    msg = "@{username}: Task {0} finished in state {1}".format(username, task, new_state)
    SlackTask(msg).run()
    return new_state
k

Kevin Kho

08/12/2021, 10:39 PM
You can use the Parameters in the state handler through the Prefect context. Try logging
prefect.context.get("parameters")
. I think it’s a dictionary of all the values.
a

Aric Huang

08/12/2021, 10:39 PM
Ahh gotcha. that sounds like it will work, will give it a try
Thank you!
👍 1
View count: 2