https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Akash

    09/10/2019, 1:58 PM
    Another question, which may be a rather naive one. What happens if the execution environment where a Prefect flow is deployed gets deleted? For example, an EC2 instance gets terminated for some reason. I'm guessing that because of the lack of a persistence layer, that flow is lost.
    c
    • 2
    • 1
  • j

    Jerry Thomas

    09/10/2019, 2:25 PM
    I need to process streaming data from Kafka and then send the result back to output. I was able to achieve my objective using the following approach. Wanted to check if this is the right way or is there a better way to process streaming data
    def get_stream_data():
        consumer = KafkaConsumer("my-topic",
                                 bootstrap_servers=['localhost:9092'],
                                 value_deserializer=lambda m: json.loads(m.decode('ascii')),
                                 consumer_timeout_ms=1000)
    
        messages = [messages.value for messages in consumer]
        if len(messages) == 0:
            raise prefect.engine.signals.SKIP("No data received")
        return messages
    
    def process_itemwise(data):
        data["result"] = 1
        return data
    
    def publish_result(data):
        producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                 value_deserializer=lambda m: json.dumps(m).encode('ascii')))
        producer.send("results", data)
        producer.flush()
    
    with Flow("custom") as flow:
        data = get_stream_data()
        result = process_itemwise.map(data)
        publish_result.map(result)
    
    while True:
        flow.run()
    c
    • 2
    • 11
  • a

    Aakarsh Nadella

    09/10/2019, 4:19 PM
    Does Prefect core support hyper parameter optimization libraries such as
    Tune
    ?
    c
    • 2
    • 4
  • j

    Joe Schmid

    09/10/2019, 8:19 PM
    Question on using an external Dask cluster (DaskExecutor) running in adaptive mode -- we notice that running a Flow that maps over elements (say 100 elements) only seems to use the number of Dask workers that existed at the time the flow run started. We would have expected the Dask cluster to begin scaling up workers (since it's in adaptive mode) and starting to run more Prefect tasks in parallel. I'm not sure we've really characterized this behavior well, but we've observed it enough that it seems to be a pattern. (We have a notebook that - using the k8s APIs - plots the number of requested workers, running workers, & nodes and could show some of what we're seeing if it would help.)
    j
    b
    +2
    • 5
    • 27
  • i

    Ike

    09/11/2019, 4:35 PM
    What is the best practice of decorating tasks. I have modules where I decorated funcs with @task. Then I import them in main.py for the Flow. But I'm having problem using sphinx to generate docs. No error messages just HTML output does not contain doc strings
    c
    m
    j
    • 4
    • 14
  • j

    Joe Schmid

    09/12/2019, 6:32 PM
    Somebody set me straight on something silly I must be doing wrong. I have a really simple Flow that looks like this:
    with Flow("ScaleDownDaskOffHours", environment=env, schedule=schedule) as flow:
        config = get_dask_scheduler_config()
        new_config = reset_config(config)
        save_results = save_new_config(new_config)
        save_results.set_upstream(None, flow)
        ifelse(new_config != config, save_results, None)
    I'm getting the warning
    PrefectWarning: One of the tasks passed to the switch condition has upstream dependencies: <Task: save_new_config>. Those upstream tasks could run even if the switch condition fails, which might cause unexpected results.
    I thought the
    save_results.set_upstream(None, flow)
    line would take care of that but clearly not. What's the right way to address that?
    c
    j
    • 3
    • 10
  • d

    David Norrish

    09/13/2019, 1:14 AM
    Apologies for a naive question! I have a flow that is roughly doing:
    filepaths = get_filepath(directory)
    # return a list of file paths
    outpaths = get_output_paths(filepaths)
    # return a corresponding list of paths to write to after cleaning
    dataframes = read_file.map(filepaths)
    # read files into memory in parallel
    cleaned_dfs = clean_dataframe.map(dataframes)
    # clean dataframes in parallel
    write_dataframe.map(cleaned_dfs, outpaths)
    # Write dataframes to file in parallel I have 16 cores and ~400 files and am using the DaskExecutor. During execution, ALL files are read into memory before any of the cleaning child task starts, then all cleaning tasks are performed before any writing to file commences. This means the memory is being flooded as each file is ~2GB. It's kind of like a breadth-first approach, whereas I'd like it to run depth-first: read 16 files into memory at once, clean them and write to file, then a core can free the memory and read in the next file. Is there a way to achieve this, possibly with triggers? The only workaround I've been able to think up is to write a single big task that does the reading/cleaning/writing steps - but I feel like that's kind of not the point of Prefect. Huge thanks in advance if anyone can help!
    j
    c
    • 3
    • 7
  • m

    Matt Harvey

    09/13/2019, 6:34 PM
    Hello! Quick question. The Schedules doc say:
    Prefect does not support sub-minute schedules.
    Is this a hard limit? I have the need to run a Prefect Flow secondly for a minute (and then stop). 🙏
    c
    j
    • 3
    • 6
  • b

    Brett Naul

    09/13/2019, 7:32 PM
    when using docker storage and/or remote executors, what’s considered the best practice for shipping code around? does it make sense to rely on cloudpickle to send any required non-third party code via the flow itself, or should everything run from the same base image that has the needed internal functions?
    c
    • 2
    • 6
  • m

    Matt Harvey

    09/14/2019, 3:11 PM
    Good morning! Apologies if this is answered in the docs, I can't seem to find it. Prefect always seems to run in UTC, and so the
    CronClock
    needs to be set in UTC. Not a big deal, but is it possible to override the timezone? 🕐
    c
    m
    • 3
    • 5
  • s

    Stewart Webb

    09/15/2019, 12:24 PM
    Hi All, I'm trying to figure out what the best-practice would be within Prefect if within a task I need to send a job to a 3rd party appliance which cannot scale, so I need to not send too many concurrent jobs to it. All I can find in relation to this is the queued state I could put a given Task in. I assume then I'd be looking at building my own resource queuing system (using redis-or-the-like) to queue these jobs..
    c
    • 2
    • 3
  • a

    Aakarsh Nadella

    09/16/2019, 9:13 PM
    I am trying to run prefect flow on Dask cluster using kubernetes and AWS. I was able to import "prefect" and also print its version. But when I execute the flow, it says
    ModuleNotFoundError: No module named 'prefect'
    . I have installed all the dependencies that need to be installed by Dask-worker and Dask-jupyter using yaml file.
    c
    j
    • 3
    • 19
  • a

    Argemiro Neto

    09/18/2019, 4:28 PM
    Hello, everyone! I'm new to Prefect and Python as well. Correct me if I'm wrong: tasks inside a flow will run in sequence. If a task fails the downstream ones will not be executed. On my tests, the execution flow is not deterministic. In the code below I expect the following output:
    ==== START ====
    === starting the flow
    == generate called
    = status: [1, 2, 3]
    == task runner called with 1
    == task runner called with 2
    == task runner called with 3
    ==== END ====
    Code:
    @task
    def generate_task_list() -> list:
        print("== generate called")
        return [1, 2, 3]
    
    
    @task(max_retries=3, retry_delay=timedelta(seconds=0))
    def task_runner(x: int) -> None:
        print('== task runner called with {}'.format(x))
    
    
    @task
    def print_status(text: any) -> None:
        print('= status: {}'.format(text))
    
    
    with Flow("random-mapping") as f:
        print_status('=== starting the flow')
        values = generate_task_list()
        print_status(values)
        final = task_runner.map(x=values)
    
    print('==== START ====')
    f.run()
    print('==== END ====')
    By running this multiple times I get different results but never what I expected. What am I missing? Thank you!
    c
    m
    • 3
    • 9
  • b

    Brett Naul

    09/18/2019, 9:00 PM
    is there a simple way to use a secret in a custom task? so far I’ve been using a
    RemoteEnvironment
    and my existing cluster already loads
    GOOGLE_APPLICATION_CREDENTIALS
    from a k8s secret; for a
    DaskKubernetesEnvironment
    I guess I need to pull it from a prefect
    Secret
    instead. do I just need to add
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = Secret(...).get()
    everywhere or is there an easier way?
    👀 1
    j
    j
    m
    • 4
    • 14
  • a

    Argemiro Neto

    09/19/2019, 12:04 AM
    Is there possible to run
    tasks
    inside
    tasks
    and
    flows
    inside
    tasks
    ? I was able to call a
    flow
    inside a task but got a
    ValueError: Could not infer an active Flow context
    error every time I tried to send parameters to the
    flows
    from the
    tasks
    . Got the same error when calling a
    task
    within a
    task
    .
    c
    • 2
    • 7
  • a

    Alex Lopes

    09/19/2019, 1:32 PM
    Hi guys, good morning. 🙂 I'm starting to use prefect, and have a quick newbie question: What kind of scenario would I use task class instead of the task decorator? Just trying to understand it better.
    ⬆️ 1
    👋 1
    z
    e
    j
    • 4
    • 6
  • j

    Jerry Thomas

    09/20/2019, 5:06 AM
    I would like to try running my prefect flow on a dask-kubernetes cluster. I saw that it is possible but I am not able to grasp how. Can you share an example of running a prefect flow on a dask-kubernetes cluster? Should I create a custom worker (pod/docker) for the cluster with the appropriate libraries? How does my prefect flow end up executing on the dask-worker if the worker pod/docker does not contain my code?
    z
    j
    • 3
    • 4
  • c

    Christopher Stokes

    09/20/2019, 9:13 PM
    Hello folks. I feel like I'm missing something pretty basic. I have the following code that runs
    CreateRidAlert
    task twice and throws warnings:
    create_alert = CreateAlert()
    create_rid_alert = CreateRidAlert()
    
    with Flow('Alert Flow') as flow:
        alert_json = Parameter(name="alert_json", required=True)
        alert = create_alert(alert_json)
        rid_alert = create_rid_alert(alert)
        ifelse(has_rid(alert), rid_alert, None)
        rid = extract_rid(rid_alert)
    I'd like to remove the
    rid_alert = create_rid_alert(alert)
    line but then I don't know how to wire up the
    ifelse
    line with a result to pass to
    extract_rid
    for data flow. This may not be a clear question.
    c
    j
    • 3
    • 10
  • m

    Markus Binsteiner

    09/21/2019, 8:17 AM
    Quick question about scheduling: is there a way to gracefully cancel a running (infinite) flow that uses a schedule? Also, I need to manage several of them in my application, so I plan to write some sort of management class for runs. Except, maybe something like this already exists, does anybody know?
    j
    • 2
    • 3
  • a

    Aiden Price

    09/21/2019, 11:32 PM
    Hi folks, what is the best way to reference and mutate a variable between `flow.run()`s? Should it be a
    Parameter
    ? My actual use case is to have a dict which is a copy of one of my database tables which the incoming data needs to refer to to find its foreign key each time. If I find a new name that I don’t have in my dict I’ll need to update the table in the database and mutate my dictionary, then reference the mutated version in subsequent `flow.run()`s. I’m only new to Prefect but I have to say I love your work, thank you!
    👋 2
    j
    c
    • 3
    • 12
  • j

    Jason

    09/23/2019, 6:01 PM
    Hi, are there ways to invoke a non-Python step in Prefect?
    j
    • 2
    • 2
  • j

    Jason

    09/23/2019, 6:01 PM
    Would the Docker task be the one to use?
  • g

    Gary Liao

    09/25/2019, 1:47 AM
    It seems that Prefect doesn't support multiple return, or some Python syntax? ----------------------------------- @task def hello(x): a=1 b=2 return a, b with Flow('test flow') as flow: a, b = hello(1) #<---- 'NoneType' object is not callable hello(a) hello(b) ______________________________
    j
    e
    b
    • 4
    • 16
  • d

    David Ojeda

    09/25/2019, 7:01 PM
    Hi everyone, quick question: I am building my doc with Sphinx and wanted to refer to the Prefect docs with intersphinx. This is relatively easy, as long as there is a public
    objects.inv
    but I cannot find it for Prefect… Is this supported ?
    c
    • 2
    • 6
  • g

    Gary Liao

    09/26/2019, 3:23 AM
    A Flow's parameters need to be passed to at least one task's run() function, or error would be raised, right? __________________________________________________ from prefect import task, Flow, Parameter from prefect.core.task import Task class print_plus_one(Task): def __init__(self, x): super().__init__() self.x = x def run(self): print(self.x + 1) with Flow('Parameterized Flow') as flow: x = Parameter('x') print_plus_one(x)() flow.run(parameters=dict(x=1)) # prints 2 flow.run(parameters=dict(x=100)) # prints 101 __________________________________________________ ValueError: Flow.run received the following unexpected parameters: x
    c
    • 2
    • 8
  • j

    Joe Schmid

    09/26/2019, 5:58 PM
    This isn't 100% Prefect related but thought it would be relevant for some users. We have a need for different types of Dask workers, e.g. high memory, GPU, etc. This would work really well with Prefect's support for task tagging to specify Dask worker resources: https://docs.prefect.io/api/unreleased/engine/executors.html#daskexecutor We use dask-kubernetes (on AWS EKS) which currently supports just a single k8s pod spec for all Dask workers. I'm about to start on a potential PR in dask-kubernetes that would allow for different worker types, but only in non-adaptive mode. Before I dive in, I thought I'd ask if anyone knows of existing work in this area of creating and scaling different Dask worker types, especially on k8s. I see this old issue: https://github.com/dask/distributed/issues/2118 but haven't found much else. (I've also asked on the Dask users gitter channel.)
    :dask: 3
    :marvin: 2
    j
    • 2
    • 2
  • a

    Aiden Price

    09/27/2019, 5:04 AM
    Quick question, can you
    task.map()
    over a Pandas DataFrame? So you could get each row as a Series for each task call. Or does it only work with generic iterables? Thank you.
    c
    • 2
    • 3
  • t

    Tobias Schmidt

    09/27/2019, 12:45 PM
    Can't quite figure this out from the docs: is there a way to limit the number of tasks in a map that are run in parallel?
    j
    z
    • 3
    • 4
  • c

    Christopher Stokes

    09/27/2019, 8:32 PM
    Seeing something a bit odd - unsure if I'm using the library wrong. When using
    with raise_on_exception():
       state = flow.run(parameters)
    I get raised exceptions on conditional misses - like
    switch
    statements missing. The
    raise signals.SKIP('Provided value "{}" did not match "{}"'.format(value, self.value)
    call ends up firing and stopping execution
  • c

    Christopher Stokes

    09/27/2019, 8:34 PM
    this is on the most recent release
Powered by Linen
Title
c

Christopher Stokes

09/27/2019, 8:34 PM
this is on the most recent release
View count: 1