https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Arnaud Legendre

    11/14/2019, 1:29 PM
    Hello everyone, I am new to prefect too, and my question is about running multiple flows in parallel. Let's say i have a sequence of tasks to achieve every day, upon the arrival of a new batch of data. I suppose i have to declare this sequence of tasks in a Flow object (with today's date as a variable) right ? Now let's say that i want to fast-forward this flow for several days in the past : What is the best option ? - I was thinking of creating a new flow, implementing a call to multiple tasks mapped to the set of dates i want to play, each of which running my 'daily' flow. Is it a valid strategy ? will the scheduler (supposedly based on dask.distributed) manage all the work and ressources properly ? - Is it better to rely on command-line interface of a main script, and run this script multiple times, as suggested here ((https://prefect-community.slack.com/archives/CL09KU1K7/p1567716836049700) ? Many thanks for the advice !
    c
    • 2
    • 4
  • i

    itay livni

    11/14/2019, 8:57 PM
    Hi there - I have a function called
    main
    it runs a
    flow
    . If I add a
    @task
    decorator to
    main
    is that an acceptable way to run a flow from a flow. or do you see any issues? ..
    c
    j
    c
    • 4
    • 17
  • p

    Phoebe Bright

    11/14/2019, 9:23 PM
    Apologies for asking what is somewhere in the docs and I know I saw it somewhere but can't find it again. (is there some reason the community is not using stackoverflow?) . How can I get the value that was returned from a task? I'm passing a dict that get's added to by each task, and at the end I want to update my django model with the final dict. I could create another task that runs some sql but it would be really nice to just assign the values and save. For anyone familiar with django, this is my code - feedback very welcome! ''' def run_workflow(request=None): for item in Dataset.objects.pending(): # only handling these types if not (item.batch_mode == "PASS" or item.batch_type in ("IMG", "EDT")): raise ValueError("Unknown batch type %s" % item.batch_type) with Flow("GasCloud processing batch %s" % item.batchid[:4]) as flow: batchid = Parameter("batchid") # get details of batch batch_details = get_batch_details(batchid) # download the zip file to local drive downloaded = get_from_datastore(batch_details) # unzip unzipped = unzip(downloaded) if item.batch_mode == settings.BATCH_MODE_PASS: run_details = add_run(unzipped, "PASS") run_result = complete_run(run_details) elif item.batch_type == "IMG": run_details = add_run(unzipped, "IMG") processing = process_image(run_details) run_result = complete_run(processing) elif item.batch_type == "EDT": write_initial_readings_to_db(unzipped) run_details = add_run(unzipped, "EDT") # binary to csv csvs = binary2csv(run_details) processing = apply_temperature_compensation(csvs) run_result = complete_run(processing) # mark as done processed_dataset = processed_done(run_result) #rezip rezipped = rezip(processed_dataset) # push to datastore back_in_datastore = put_to_datastore(rezipped) # done final = alldone(back_in_datastore) item.batch_details = final . <----- this doesn't work of course! item.save() flow.run(batchid=item.batchid) #flow.visualize() if request: return HttpResponse("Done") '''
    c
    j
    • 3
    • 14
  • k

    Kyle Foreman (Convoy)

    11/14/2019, 11:27 PM
    any suggested design patterns for creating libraries of reusable flows? I.e. if I want to wrap up
    A -> B -> C
    so that user 1 can create
    A -> B -> C -> X1 -> Y1
    and user 2 can create
    A -> B -> C -> Q2
    by simply importing and building off an existing
    A -> B -> C
    flow?
    c
    j
    • 3
    • 8
  • w

    Walter Gillett

    11/15/2019, 4:59 AM
    Hi - we are building bioinformatics pipelines related to infectious disease. Prefect looks interesting. I am wondering about task grouping (a.k.a. nesting or sub-dags). Each step in our pipeline reads inputs from GCS and writes outputs to GCS. Without task grouping, this will get messy. For example, suppose we have steps 1, 2, and 3, each of which reads one GCS input and writes a GCS output. That yields 9 tasks (3 GCS download, 3 compute, and 3 upload), but we would like to group them into pipeline steps because that’s the essential unit of work. Is there a way to model this in Prefect?
    c
    m
    • 3
    • 8
  • t

    Tobias Schmidt

    11/18/2019, 9:47 AM
    I have a flow a la
    A -> B -> C
    . I want task A to be skippable via a command line argument to the Python script that implements that flow. What's the best way to do this? Can I mark A as skipped, set
    skip_on_upstream_skip
    on B and C (or just B?) to False and then run the flow as usual? Or is there a more idiomatic way of doing this?
    e
    j
    • 3
    • 2
  • c

    Chris O'Brien

    11/18/2019, 11:12 PM
    Hi All, I could be going about this wrong but I think I want the capability of running a flow from a flow. My use case is similar to something what’s been discussed previously but where the final set of tasks in a flow are a re-usable flow. Eg.
    X1 -> X2 -> A -> B -> C
    and
    Y1 -> A -> B -> C
    . The idea is that
    A -> B -> C
    are the methods for loading data into the database, so they take the transformed data from upstream and only run if they were successful. Does this make any sense or is there a more Prefect way to attack this?
    d
    j
    • 3
    • 35
  • a

    Adam Roderick

    11/19/2019, 11:41 PM
    Prefect team, thank you for providing Environment Labels so that we can direct different flows to different agents. They are working great!
    🚀 7
    :marvin: 5
  • t

    Thomas Adams

    11/20/2019, 6:06 AM
    I am new but super excited to see what is possible w/ this!
    :marvin: 3
    n
    • 2
    • 2
  • a

    Aliza Rayman

    11/20/2019, 8:54 AM
    Similar to what @Kyle Foreman (Convoy) https://prefect-community.slack.com/archives/CL09KU1K7/p1573774071310600 I'm wondering about the best way to turn task groups into re-usable parts? My problem has many flows which use almost the same task format for
    A->B->C
    to pull and preprocess data, then in between processing which varies by flow, then
    G-H-I
    to aggregate/ handle the result. It seems like
    update
    isn't the ideal use case for this. Right now I just have tasks in separate modules and import and run them all (so its a bit messy)
    z
    • 2
    • 4
  • r

    Ron Van Buskirk

    11/20/2019, 3:43 PM
    Prefect is such an elegant framework -- it's amazing! I've created small pipelines but would appreciate any suggestions on designing a larger one. It builds 150+ Postgres tables in 20-30 hours, there are multiple layers of tables, and tables in one layer depend on one or more tables from the previous. I'd like to create a Flow where the DAG specifies the table dependencies, then builds a table only if (1) it doesn't exist or (2) is older than its parent table(s). I thought of different solutions:
    * having separate tasks for building each table (450+ tasks!):
     Check_table1 -> ifelse -> Build_table1 |-> Check_table2 -> ifelse  -> Build_table2 ...
                                            |-> Check_table3 -> ifelse  -> Build_table3 ...
    							            |-> Check_table4 -> ifelse  -> Build_table4 ...
    							  
    * subclassing the Postgres execute task to create a check-and-build task:
     Check_and_build_table1  |-> Check_and_build_table2 ...
                             |-> Check_and_build_table3 ...
    						 |-> Check_and_build_table4 ...
      
    * having a small number of tasks (check timestamp and existence, conditional, build) and using the map function to iterate the building of the required tables:
     Check_table1 -> ifelse -> Build_table1 |-> Check_table.map(x) -> ifelse -> Build_table(x) ...
                                            ...
    							            ...
    Still really new to Prefect... are any of these any good? Are there any other best practices I should consider?
    👀 1
    z
    • 2
    • 2
  • a

    Agostino Calamia

    11/20/2019, 4:32 PM
    Hello everyone, I am new and just started to use prefect. I already have my first question: Is it possible to give a variable (created in task number 1) to another task (task number 2)? Up until know I did not find a solution for passing variables from task to task
    a
    n
    • 3
    • 7
  • c

    Chris O'Brien

    11/21/2019, 2:09 AM
    Hi all, what’s the best way to implement a conditional on a Task Signal? Where if a task throws an exception that’s unhandled (so is FAIL) we pick one or another branch?
    @task
    def condition():
      if previous_task.Signal == 'FAIL':
        return False
      else:
        return True
    
    with Flow('test') as test:
        output = task_1() #Fails with Exception
        switch(condition, {True: yay_flow, False: boo_flow})
    Or am I approaching this wrong?
    c
    • 2
    • 6
  • s

    Steve Vandervalk

    11/21/2019, 2:25 AM
    Hi all, if I am trying to run a Flow run through a normal IDE debugger but keep seeing the value of Task outputs assigned to variables as <Task:name> types, am I returning the value of tasks wrongly or using the debugger wrong?
    c
    m
    • 3
    • 26
  • a

    Aliza Rayman

    11/21/2019, 8:13 AM
    What is the best way to run a bunch of mapped tasks and aggregate the errors (with custom messages) to a final result? Right now my code looks something like:
    with Flow('aggregate alerts') as flow:
        ids = Parameter("ids")
        response1 = task1.map(ids)  # May return {"error": "error message"} or other data
        response2 = task2.map(response1)  # May return {"error": "error message"} or other data
        ...
        result_file = taskX(responseX-1)  # Aggreagtes all of the error messages with corresponding ids into 1 file
    j
    • 2
    • 4
  • t

    Thomas Adams

    11/21/2019, 5:15 PM
    Is there any really simply tutorials out there for prefect? I saw the ones on their website but however, I am still confused. I have a script that run's locally on my machine and am currently using aws lambda to run it. However I want to explore the pro's and con's of prefect! Maybe my life will be much easier 🙂
    j
    • 2
    • 4
  • c

    Chris O'Brien

    11/21/2019, 11:58 PM
    Is the expected behaviour on a merge of two skipped branches that it still executes? How would I have the Merge respect the two upstream skips? Or is this not possible?
    from prefect import task, Flow
    from prefect.triggers import any_failed, some_failed
    from prefect.tasks.control_flow.conditional import ifelse, merge, switch
    
    @task
    def three_outcomes():
        return "dead_branch"
    
    @task
    def fail_branch():
        print("i fail")
    
    @task
    def pass_branch():
        print("i pass")
    
    @task
    def dead_branch():
        print("im dead")
    
    @task
    def do_final_thing():
        print("final")
    
    with Flow("example") as flow:
        switch(three_outcomes, dict(dead_branch=dead_branch, pass_branch=pass_branch, fail_branch=fail_branch))
        do_final_thing.set_upstream(merge(pass_branch, fail_branch))
    
    flow_state = flow.run()
    flow.visualize(flow_state=flow_state)
    👀 1
    n
    • 2
    • 2
  • t

    Thomas Adams

    11/22/2019, 1:25 AM
    Would anyone be willing to show me an ETL example with basic auth? Does the script automatically live on a server some where so that it can run automatically?
    c
    n
    • 3
    • 10
  • g

    Gaurav Goel

    11/22/2019, 10:30 AM
    Hello team, I am planning to start using prefect for the workflow management, But I would like to understand if there is an REST api available for prefect core module.?
    j
    j
    +2
    • 5
    • 30
  • a

    agonen

    11/24/2019, 7:33 PM
    Hi Folks, I would like first to say that I'm very impressed with the project and the extensive documentation and information on the channel. good work. I have a few questions this I hope might clarify a few things to me. 1. where Perfect core keeps the state of long-running tasks, what happens is the instance running the perfect core is failing? 2. If I have a long-running task, for example, I'm doing a boto call to start an emr cluster and then sending a step to it (for example do ETL and write to S3) and then I would like to and another step based on the outcome of the first step a step once the cluster is ready ? is the mean I need to write down-stream task to keep doing API calls to monitoring the step result ? or should I run the perfect core inside the pyspark job ? 3. does perfect have something similar to an airflow sensor like this GoogleCloudStoragePrefixSensor ?
    c
    • 2
    • 5
  • k

    Kamil Okáč

    11/25/2019, 9:14 PM
    Hi guys, I'm having trouble using LocalDaskExecutor with simple mapping. The following code fails:
    from prefect import Flow, task
    from prefect.engine.executors.dask import LocalDaskExecutor
    
    @task
    def add_ten(x):
            return x + 10
    
    if __name__ == '__main__':
        with Flow('simple map') as flow:
            mapped_result = add_ten.map([1, 2, 3])
            executor = LocalDaskExecutor(scheduler='processes', num_workers=3)
            flow.run(executor=executor)
    The error is
    AssertionError: daemonic processes are not allowed to have children
    Is there something wrong with this code or am I missing something else?
    c
    b
    • 3
    • 8
  • k

    Kamil Okáč

    11/26/2019, 9:49 AM
    Is it wrong to pass list as a task parameter? In the following example the list items order is not preserved.
    from prefect import Flow, task
    
    
    @task
    def arrtest(arr):
        return ''.join(arr)
    
    
    with Flow('ArrayTest') as flow:
        r = arrtest(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k'])
    
    print(flow.run().result[r].result)
    Expected result: abcdefghijk Actual result: ajkbcdefghi
    j
    a
    +3
    • 6
    • 15
  • s

    simon thelin

    11/26/2019, 6:13 PM
    Hello people, this might be a stupid question. I am currently customising a docker-compose where I want to do a small demo with prefect locally. I am following the tutorials and right now I am trying to understand the
    agent
    . If I want to spin it up locally just to try I tried:
    pipenv run prefect agent start
    But I get that no token is provided, according to the
    DOC
    it seems like:
    prefect.config.cloud.auth_token
    should provide this to me automatically? I don't really find the documentation in how to spin it up with the
    UI
    really intuitive and if somebody can explain how to set the
    TOKEN
    locally I would appreciate it.
    d
    • 2
    • 23
  • j

    Jon Wolski

    11/27/2019, 12:17 AM
    Is it feasible to write tasks in languages other than Python? I have a Java team looking to replace their Java-based workflow components. The design ideals of Prefect (e.g. negative engineering) are appealing, but the team wants to stay in Java.
    j
    • 2
    • 1
  • m

    Matias

    11/27/2019, 11:57 AM
    Hi, are there any examples, best practices or guides anywhere on how to write tests for your tasks and pipelines? The docs have a section of testing, but as far as i can see that’s mostly about testing prefect itself, not the things you implement with prefect. Also, there are tests within prefect core project, but that is not the best source of getting started and understanding why things are done in certain way.
    j
    • 2
    • 3
  • a

    Attila

    11/27/2019, 1:51 PM
    Hi Everyone, I have a seemingly simple question but unfortunately I couldn't find an obvious answer (or I just couldn't see the forest for the trees): when I pass an unmapped object to a mapped task in a prefect flow, will the parallel executed tasks receive a copy of the object or will will the parallel tasks receive a reference to the same object? I would assume the former but just want to be sure.
    c
    • 2
    • 2
  • l

    laura

    11/28/2019, 3:37 PM
    Hi, I would really appreciate some help with map... I need a branching workflow so that a parent -> many children tasks -> each child has many grandchildren. The following code does not work how I want it to, as the grandchild executor is only called 2 times (as there are 2 children) and a list of grandchild_id's is given to the GrandchildExecutor rather than a single id. I would like the GrandchildExecutor to be called 4 times, which means the mapping has to be done differently. Any ideas????
    from prefect import task, Task, Flow, Parameter
    
    class ParentExecutor(Task):
        def run(self, parent_id):
            # Do query on a database to return child id's
            return ['child1', 'child2']
    
    class ChildExecutor(Task):
        def run(self, parent_id):
            # Do query on a database to return child id's
            return ['grandchild1', 'grandchild2']
    
    class GrandchildExecutor(Task):
        def run(self, parent_id):
            # grab grandchild from db and do processing with it...
            pass
    
    with Flow("Test branching") as test_flow:
        parent_id = Parameter('parent_id')
        parent_exec = ParentExecutor()
        child_ids = parent_exec(parent_id)
        child_exec = ChildExecutor()
        grandchild_ids = child_exec.map(child_ids)
        grandchild_exec = GrandchildExecutor()
        grandchild_exec.map(grandchild_ids)
    
    state = test_flow.run(parent_id='parent_id123')
    j
    • 2
    • 6
  • a

    alvin goh

    11/29/2019, 12:50 AM
    Hi, I've noticed that when a mapped task in a flow is skipped or failed, flow.visualize() fails with an error about the task not having mapped_states. Is this a design decision to not generate visualisation for failed mapped tasks?
    c
    j
    • 3
    • 6
  • c

    Christian Eik

    11/29/2019, 11:49 AM
    hey, not sure if this is the right place for this question, but: i installed prefect into a venv and am trying to run a script importing from the snowflake tasks. it gives me the error below. importing other tasks works and when i run
    pip install prefect[snowflake]
    it tells me everything is already installed.
    c
    • 2
    • 25
  • c

    Christian Eik

    11/29/2019, 11:49 AM
    from prefect import Flow, task
    from prefect.tasks.docker import CreateContainer
    from prefect.tasks.snowflake import SnowflakeQuery
Powered by Linen
Title
c

Christian Eik

11/29/2019, 11:49 AM
from prefect import Flow, task
from prefect.tasks.docker import CreateContainer
from prefect.tasks.snowflake import SnowflakeQuery
View count: 1