https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Christian Eik

    11/29/2019, 11:49 AM
    importing
    CreateContainer
    works fine
  • c

    Christian Eik

    11/29/2019, 11:50 AM
  • i

    itay livni

    11/29/2019, 6:31 PM
    Hi - I have a a`Flow` with a an
    ifelse
    the
    False
    branch is a
    pass
    .
    calced_def_df = calced_def_df.copy()
    Is that the right way to implement this logic? I am getting a
    UserWarning: You are making a copy of a task that has dependencies on or to other tasks in the active flow context. The copy will not retain those dependencies.
    "You are making a copy of a task that has dependencies on or to other tasks "
    j
    c
    • 3
    • 16
  • a

    alvin goh

    12/01/2019, 9:14 AM
    Hi all, anyone thoroughly used the flow.update() function? Seems like GetItem tasks do not get 'transferred' over to the new flow correctly...
    from prefect import Flow, Parameter
    
    with Flow("math") as f:
        x = Parameter("x")
        d = x['d']
        a = d + 1
    
    flow_state = f.run(x={'d': 7})
    print(flow_state) # success
    # create new flow and copy old flow
    new_flow = Flow(name = "Test")
    new_flow.update(f)
    flow_state2 = new_flow.run(x={'d': 7})
    print(flow_state2) # fails
    Stacktrace:
    ERROR - prefect.TaskRunner | Unexpected error: TypeError("run() missing 1 required positional argument: 'key'",)
    Traceback (most recent call last):
      File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 869, in get_task_run_state
        self.task.run, timeout=self.task.timeout, **raw_inputs
      File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/utilities/executors.py", line 85, in timeout_handler
        return fn(*args, **kwargs)
    TypeError: run() missing 1 required positional argument: 'key'
    c
    • 2
    • 4
  • d

    David Ojeda

    12/02/2019, 11:52 AM
    Hello there! I was suprised to find that a tasks instances are reused when executed; that is, the
    .run
    method is called with the same instance but different input values. In other words, if I change a member variable inside
    .run
    , this change is available to the next
    run
    call. Perhaps a minimal example can explain the situation a bit better:
    from prefect import task, Task, Flow
    
    
    @task
    def generate_numbers():
        return list(range(10))
    
    
    class MyTask(Task):
    
        def __init__(self, *, value=None, **kwargs):
            super().__init__(**kwargs)
            self.value = None
    
        def run(self, *, number):
            print(f'Hello I am {hex(id(self))} number is {number}')
            if self.value is None:
                self.value = number
            else:
                print(f'What? Who set this value={self.value}?')
            return number + 1
    
    
    instance = MyTask()
    
    with Flow("My First Flow") as flow:
        n = generate_numbers()
        n1 = instance.map(number=n)
    
    flow.run()
    This outputs:
    [2019-12-02 11:46:43,162] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
    [2019-12-02 11:46:43,162] INFO - prefect.FlowRunner | Starting flow run.
    [2019-12-02 11:46:43,166] INFO - prefect.TaskRunner | Task 'generate_numbers': Starting task run...
    [2019-12-02 11:46:43,166] INFO - prefect.TaskRunner | Task 'generate_numbers': finished task run for task with final state: 'Success'
    [2019-12-02 11:46:43,167] INFO - prefect.TaskRunner | Task 'MyTask': Starting task run...
    [2019-12-02 11:46:43,168] INFO - prefect.TaskRunner | Task 'MyTask[8]': Starting task run...
    Hello I am 0x11ddaef50 number is 8
    [2019-12-02 11:46:43,169] INFO - prefect.TaskRunner | Task 'MyTask[8]': finished task run for task with final state: 'Success'
    [2019-12-02 11:46:43,169] INFO - prefect.TaskRunner | Task 'MyTask[1]': Starting task run...
    Hello I am 0x11ddaef50 number is 1
    What? Who set this value=8?
    ...
    I assume that this is specific to the local executor. In general, I don’t think this is a problem, but I was wondering if there is any documentation / warning somewhere in the docs that I may have missed ?
    c
    • 2
    • 3
  • l

    Luke Orland

    12/02/2019, 4:15 PM
    Are there advantages to executing workflows on a Dask Cluster on Kubernetes vs. on Fargate or other execution environments?
    j
    • 2
    • 5
  • p

    PtitPoulpe

    12/02/2019, 4:29 PM
    hello
    👋 2
  • p

    PtitPoulpe

    12/02/2019, 4:30 PM
    I wonder if there is some Flow validations using Task type annotation ?
    j
    • 2
    • 1
  • i

    itay livni

    12/02/2019, 8:23 PM
    Hi - Is there a way to get the result of a specific merge task after running a flow . Similar to this:
    flow_state.result[ifelse_return].result
    n
    j
    • 3
    • 26
  • a

    alvin goh

    12/03/2019, 6:50 AM
    Hi - i found something strange... Seem like when a task feeding into a mapped tasks fails or triggerfails, the downstream task still tries to map the result from the upstream(failed) task, and prefect errors out with 'FAILED' object does not support indexing.
    from prefect import Flow, Parameter, task
    from prefect.tasks.core.constants import Constant
    from prefect.engine.signals import FAIL
    @task
    def trythis(x, fail=False):
        if fail:
            raise FAIL()
        return [k+1 for k in x]
        
    @task
    def tryprint(x):
        print(x)
        
    with Flow("math") as f:
        x = Parameter("x")
        y = trythis(x, fail=True)
        y1 = trythis(y, fail=False)
        z = tryprint.map(y1)
        
    flow_state = f.run(x=[1,2,3,4,5,6,7])
    print(flow_state.result[z].result)
    Output:
    [2019-12-03 06:50:35,756] INFO - prefect.FlowRunner | Beginning Flow run for 'math'
    [2019-12-03 06:50:35,758] INFO - prefect.FlowRunner | Starting flow run.
    [2019-12-03 06:50:35,765] INFO - prefect.TaskRunner | Task 'x': Starting task run...
    [2019-12-03 06:50:35,769] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
    [2019-12-03 06:50:35,775] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
    [2019-12-03 06:50:35,780] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'Failed'
    [2019-12-03 06:50:35,786] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
    [2019-12-03 06:50:35,789] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'TriggerFailed'
    [2019-12-03 06:50:35,795] INFO - prefect.TaskRunner | Task 'tryprint': Starting task run...
    [2019-12-03 06:50:35,796] ERROR - prefect.TaskRunner | Task 'tryprint': unexpected error while running task: TypeError("'TRIGGERFAIL' object does not support indexing",)
    Traceback (most recent call last):
      File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 259, in run
        executor=executor,
      File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 719, in run_mapped_task
        upstream_state.result[i],
    TypeError: 'TRIGGERFAIL' object does not support indexing
    [2019-12-03 06:50:35,800] INFO - prefect.TaskRunner | Task 'tryprint': finished task run for task with final state: 'Failed'
    [2019-12-03 06:50:35,802] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    'TRIGGERFAIL' object does not support indexing
    Seems like the mapped task should also raise triggerfailed if the upstream tasks failed.. maybe it's not so simple as it may just be a subset of upstream tasks which failed... what is the design philosophy on this?
    j
    • 2
    • 4
  • d

    Dan Egan

    12/03/2019, 8:31 PM
    https://stackoverflow.com/questions/59164875/combine-a-parameterized-flow-with-a-schedule-that-varies-by-parameter
    👍 1
    z
    j
    • 3
    • 8
  • i

    itay livni

    12/04/2019, 1:58 PM
    Hi - Attached is a
    flow
    with
    mapping
    The results of the
    map
    are read into another function
    munge_dfs
    That
    edge
    is not defined in the visualization -- It is represented as some id. Is this something
    prefect
    or I have control over? It makes reading the visualization confusing. And even more so with multiple
    maps
    in a
    flow
    c
    • 2
    • 4
  • i

    itay livni

    12/04/2019, 1:59 PM
  • a

    alexandre kempf

    12/04/2019, 4:13 PM
    Hello everyone, First, thank you all for this great software 🙂 It is really nice to use 🙂 I want to split the "result" of a task into 2 parts.
    @task
    def return_2_values(x, y):
        return x, y
    
    with Flow("lolilol") as flow:
        a, b = return_2_values(x=2, y=4)
    But I can't because task are not iterable 😕 The only trick I found is to use a dictionary inside my task and then use GetItem. Unfortunaly, with this method, I need to know that is inside "return_values" at the level of the Flow in order to get the correct keys. Is there a way to do it without prior information on the task ? Like a normal python function 🙂
    n
    j
    • 3
    • 7
  • a

    Alex Post

    12/04/2019, 6:18 PM
    Hi - I'm having an issue with setting a parameter according to the documentation found here: https://docs.prefect.io/core/concepts/parameters.html My flow looks like this:
    with Flow('Kafka Data Quality Test') as flow:
        full_run = Parameter('full_run', default=False)
    
    flow.run(parameters=dict(full_run=True))
    and I keep getting this error:
    Traceback (most recent call last):
      File "src/app.py", line 243, in <module>
        flow.run(parameters=dict(full_run=True))
      File "/Users/apost/ccde/ccde-kafka-data-quality/venv/lib/python3.7/site-packages/prefect/core/flow.py", line 991, in run
        fmt_params
    ValueError: Flow.run received the following unexpected parameters: full_run
    Has anyone seen this before?
    j
    • 2
    • 13
  • d

    Dylan

    12/04/2019, 9:54 PM
    Hey Prefect Community! For those of you using
    LOOP
    signals in your Flows, would you be up to sharing what you use them for? We’re considering a change to the behavior and we’re interested in your thoughts /thread
    :upvote: 2
    e
    m
    • 3
    • 3
  • n

    Nat Busa

    12/05/2019, 4:54 AM
    Hi, I hope this is the right place to post it ... I understand your opencore philosophy, but I am having a hard time choosing between prefect and two other projects: dagster and metaflow ... Also just using dask distributed does not sound bad ... Any thoughts on this?
    c
    d
    m
    • 4
    • 5
  • a

    ARun

    12/05/2019, 6:42 PM
    hey guys, got a basic questions. Just found out about prefect, and started messing around with it today. I've been following along the docs. And I've tried using this locally, and using the dask executor. I think I'm missing something. I've made a simple job to move file from a database into a sftp. However, when I run the file, I get this message. If I ctrl-c out of it, it crashes the program. Do i have to do it in nohup, or is there a way i can execute the script in the background. So it just runs, and follows the schedule? If there is any documentation on this, that would be of great help. Thanks!
    [2019-12-05 18:38:18,226] INFO - prefect.Flow: sf -> sftp | Waiting for next scheduled run at 2019-12-05T21:00:00+00:00
    j
    • 2
    • 9
  • l

    Luke Orland

    12/06/2019, 4:51 PM
    I am developing a workflow, and I'm having trouble writing tests that mock out the
    run()
    method of tasks in the workflow.
    c
    m
    • 3
    • 15
  • k

    Kevin Hill

    12/06/2019, 6:05 PM
    I'm speaking in hypotheticals here, I haven't dug entirely through the framework (minus the easy first example), though I wanted to ask a few questions: 1. Can if I had a hypothetical Reinforcement Learning training pipeline run through prefect, could it handle millions of interactions over the course of a training period? 2. Is it possible to push information from the end of the flow to the beginning if a condition is met? 3. If so, how fast would that be? Can I use Dask to split certain tasks up between machines and concurrently process more tasks? 4. Would that use the
    LOOP
    API?
    d
    • 2
    • 12
  • n

    Nat Busa

    12/06/2019, 6:08 PM
    Hi community: Anybody working on a papermill task?
    :upvote: 3
  • k

    Kevin Hill

    12/06/2019, 8:25 PM
    Is it possible to put a task inside of another task?
    c
    • 2
    • 1
  • k

    Kevin Hill

    12/06/2019, 8:26 PM
    You know, I'll try checking myself tomorrow
  • k

    Kevin Hill

    12/06/2019, 8:44 PM
    So, I saw one of your blog articles about loops. Could you go through a usecase like I'm asking for in another one? It's pretty simple. You need to send events through a pipeline, you'd identify what data you'd need to process on by ID, then you'd log the result in another step, then check for resending a task with that given id to the beginning of the pipeline
  • k

    Kevin Hill

    12/06/2019, 8:45 PM
    IRL, I'd expect that loop wouldn't be there, but for training online machine learning algorithms it would be
  • a

    alvin goh

    12/07/2019, 3:25 AM
    what's the difference between caching all_parameters in all_inputs??
    c
    • 2
    • 2
  • a

    alvin goh

    12/08/2019, 7:40 AM
    So here's a use case that i'm having trouble with... I have a flask server running with waitress, and i want to run a prefect flow whenever i receive a request. At the same time, i want to enable caching in the prefect flow, which does not work IF i run waitress on more than 1 thread (seems like cache is local to the thread memory...), requests only hit the cache if it goes to the same worker thread. Is there a way to let the workers on different thread share the same cache? I'm not entirely sure what happens when waitress spawns threads...
    j
    j
    • 3
    • 2
  • d

    DiffyBron

    12/08/2019, 1:52 PM
    how do you do flow.run(value=5) and only show the output instead of all the flow states?
    j
    j
    c
    • 4
    • 22
  • d

    DiffyBron

    12/09/2019, 5:18 AM
    hello, i'm trying to run a shell and it seems that it only outputs one line when i perform 'ls -lat' is that an expected behavior? (I've taken bits and pieces from here : https://github.com/PrefectHQ/prefect/issues/1013
    from prefect import Flow, task
    from prefect.tasks.shell import ShellTask
    
    my_task = ShellTask(helper_script="cd /etc")
    
    @task
    def print_output(output):
      print(output)
    
    with Flow("My shell") as flow:
      result = my_task(command='ls -lat')
      print_output(result)
    
    if __name__ == '__main__':
        out = flow.run()
    c
    • 2
    • 2
  • n

    Nat Busa

    12/09/2019, 7:01 AM
    Chaining tasks with no input: how to do that?
    • 1
    • 3
Powered by Linen
Title
n

Nat Busa

12/09/2019, 7:01 AM
Chaining tasks with no input: how to do that?
managed that! in imperative mode use flow.chain(...)
inside a flow context use the followin snippet:
import prefect
def chain(*tasks): flow = prefect.context.get("flow", None) flow.chain(*tasks)
View count: 1