https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • k

    Kristoffer Sahlin

    10/15/2019, 10:23 AM
    Hi, I have a recursive workflow problem that needs to be parallelized. Is it possible to handle it with prefect? Specifically, I have n batches of input labeled 1,...,n, and a function _merge_consecutive_batches_. Assume that n is a power of 2. The function _merge_consecutive_batches_ takes two consecutive batches, 1 and 2, 3 and 4, etc, and produces merged output for each pair, labeled say, 1_2, 3_4, … Now I want to apply _merge_consecutive_batches_ recursively until I'm down to a single output batch. For example, in the second iteration, _merge_consecutive_batches_ will take as input the two consecutive batches 1_2 and 3_4 and produce the output batch 1_2_3_4. Naturally, the jobs producing 1_2 and 3_4 have to be completed before starting this job (and they have to be run in parallel for scalability reasons). It there a solution with prefect for this scenario?
  • j

    Jeff Yun

    10/15/2019, 1:46 PM
    Hi! Given an unrun flow, what's the best way to get the inputs passed into the tasks (like the output of
    task_runner.get_task_inputs(state, upstream_states)
    but without having to run the flow)?
    c
    • 2
    • 14
  • d

    dmo

    10/16/2019, 1:49 PM
    Hello! I'm looking into Prefect and have a hard time finding a good way to handle shared code. Let's say I have utility function in my_utility.py:
    def get_answer_to_everything():
        return 42
    And then I define my flow in one_of_many_flows.py:
    import my_utility
    from prefect import Flow, task 
    
    
    @task
    def run_imported_function():
        return my_utility.get_answer_to_everything()
    
    
    with Flow("Imported function") as flow1:
        run_imported_function()
    flow1.run() # 42
    flow1.run(executor=DaskExecutor(address=IP)) # ModuleNotFoundError: No module named 'my_utility'
    I know this is due to how the serialization works in Python. However, Dask provides a way to upload files to workers through Executor.upload_file: https://stackoverflow.com/questions/39295200/can-i-use-functions-imported-from-py-files-in-dask-distributed However, I have found no way to perform anything similar through the Prefect API. Am I looking in the wrong place or is this use case out of scope?
    c
    m
    • 3
    • 14
  • c

    Chris Eberly

    10/16/2019, 5:12 PM
    hello again, one more question, is there a way to “yield” a batch of results to start feeding to a mapping process? the example would be, in your guys’ x-files tutorial, if we got say 10 episode urls at a time, can we start the scraping process asynchronously and then collect the results at the end? apologies if i’m missing something obvious, or if this is not a good use case
    c
    • 2
    • 6
  • i

    itay livni

    10/19/2019, 1:42 AM
    Hi - Is it possible to have a class with x number of methods as tasks?
    c
    • 2
    • 5
  • i

    itay livni

    10/21/2019, 6:47 PM
    Hi - A couple of general questions: (1) What is the use case for an imperative flow vs. functional flow? (2) When is mixing the two appropriate?
    c
    • 2
    • 3
  • j

    John Faucett

    10/21/2019, 7:09 PM
    Hi, is there any more document on parameters? other than here: https://docs.prefect.io/core/concepts/parameters.html, I’m having a confusing time getting them to work especially when I need multiple
    c
    • 2
    • 20
  • j

    John Faucett

    10/21/2019, 7:10 PM
    Really helpfull would be an imperative example
  • a

    An Hoang

    10/21/2019, 9:47 PM
    I'm trying to run this example: https://docs.prefect.io/core/tutorials/dask-cluster.html with my HPC Dask cluster. I get this error:
    [2019-10-21 21:17:43,141] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
    [2019-10-21 21:17:43,144] INFO - prefect.FlowRunner | Starting flow run.
    [2019-10-21 21:17:43,145] ERROR - prefect.FlowRunner | Unexpected error: ValueError("Unexpected keyword arguments: ['processes', 'silence_logs']")
    Traceback (most recent call last):
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 393, in get_flow_run_state
        with executor.start():
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 74, in start
        self.address, processes=self.local_processes, **self.kwargs
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/distributed/client.py", line 649, in __init__
        "Unexpected keyword arguments: {}".format(str(sorted(kwargs)))
    ValueError: Unexpected keyword arguments: ['processes', 'silence_logs']
    [2019-10-21 21:17:43,149] ERROR - prefect.Flow: dask-example | Unexpected error occured in FlowRunner: ValueError("Unexpected keyword arguments: ['processes', 'silence_logs']")
    conda versions:
    # Name                    Version                   Build  Channel
    dask                      2.6.0                      py_0    conda-forge
    dask-core                 2.6.0                      py_0    conda-forge
    dask-glm                  0.2.0                      py_1    conda-forge
    dask-jobqueue             0.6.3                      py_0    conda-forge
    dask-ml                   1.0.0                      py_1    conda-forge
    
    # Name                    Version                   Build  Channel
    prefect                   0.6.6                      py_0    conda-forge
    cluster configuration:
    python
    from dask_jobqueue import LSFCluster
    cluster = LSFCluster(queue='all_corradin',
                             cores= 47,
                             #processes = 2,
                             walltime='5000:00',
                             memory='250GB',
                             local_directory = "/tmp",
                             job_extra=['-o /dev/null', '-e /dev/null'],
                             scheduler_port = 8786,
                             worker_dashboard_address = 8788
                             )
    cluster.scale(17)
    c
    • 2
    • 4
  • a

    Argemiro Neto

    10/22/2019, 12:05 AM
    Hey, team! I have a
    task
    being mapped with the following signature:
    @task(max_retries=3, retry_delay=timedelta(seconds=5))
    def run_sync_loads(config: dict) -> dict:
    The `task`'s results are being sent to the following `task`:
    @task(max_retries=3, retry_delay=timedelta(seconds=5), trigger=triggers.always_run)
    def save_model_config(loads: list, configs: list):
    Both are reference tasks to the
    flow
    . The problem is that one of the mapped tasks is failing but not being retried and I have no indication that some tasks failed at the end of the flow. To be precise, the logs show that one retry is being called but the task should fail again. It's not failing, even completing. Any ideas on why this is happening?
    c
    • 2
    • 30
  • i

    itay livni

    10/22/2019, 3:46 AM
    Hi there again - I remember reading somewhere in the docs about "hiding" inputs for visualizations. Can't seem to find it. Can you point me to the docs or show an example? Thanks again. Keep up the good work.
    c
    b
    • 3
    • 4
  • m

    Mark McDonald

    10/22/2019, 5:06 PM
    Hi, I'm working on my first flow and trying to wrap my head around tasks states and the trigger of "any failed". For reference, I've got a very simple flow like this:
    with Flow('test flow') as f:
    	data = get_data()
    	cleansed_data = munge_data(data)
    	send_data = email_data(cleansed_data)
    	alert = send_error_email()
    
    	alert.set_upstream(send_data)
    
    f.set_reference_tasks([send_data])
    I want my
    alert
    task to trigger if any of the tasks above fail. So, in my task definition I have this set:
    @task(trigger = prefect.triggers.any_failed)
    def send_error_email():
    When the flow runs the first three tasks complete successfully, however and alert task fails, because the trigger fails. (TRIGGERFAIL signal raised during execution.) However, the overall Flow status is a success because I've set the flow's reference task to the
    send_data
    task Is this the correct way to use the trigger of "any_failed". It seems to me that it might be more natural for the alert task to be in status of "skipped" if none of the upstream tasks failed.
    c
    e
    • 3
    • 3
  • a

    An Hoang

    10/22/2019, 5:18 PM
    Hi there, is there a way to use Docker with Prefect so that each task/flow runs in a different environment?
    a
    c
    • 3
    • 3
  • m

    Matt Harvey

    10/22/2019, 7:47 PM
    I'm trying to decide on best practices for a complex flow schedule. I have a flow that needs to run at specific times each day, but those times change each day depending on data obtained from an API in the morning. Before I moved this flow to Prefect and did it the old fashion way, I just ran the script every minute, and it started with a function call that returned a bool depending on if it should continue or not. (That function would do the complex logic of hitting the API or retrieved data and determine if it should run at that moment). In Prefect, should I create a task that does this check and then skips the rest of the flow if it is not supposed to run? Or is there a way I can create a dynamic schedule that I pass into the flow itself? Or is there a better way altogether?
    j
    • 2
    • 6
  • a

    Adam Roderick

    10/22/2019, 8:08 PM
    @Matt Harvey I think you can update schedules in prefect cloud via the API. That way you could get your schedule data in the morning, then update the flow's schedule before ending the job
    j
    m
    • 3
    • 5
  • a

    Adam Roderick

    10/22/2019, 8:08 PM
    https://docs.prefect.io/cloud/concepts/scheduled-flows.html
  • a

    Adam Roderick

    10/22/2019, 8:08 PM
    Unfortunately I haven't found a way to do this in Core
  • i

    itay livni

    10/22/2019, 8:36 PM
    Handle results of if else
  • i

    itay livni

    10/22/2019, 8:37 PM
    How do you retrieve the results from an
    ifelse
    ? Thanks
    c
    • 2
    • 4
  • i

    itay livni

    10/24/2019, 4:25 AM
    Hi there again. Running into an issue wher I pass two
    pamdas dataframes
    However the
    merge
    fails with a
    pandas error
    ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
    Without the
    merge
    command both return valid
    dataframes
    . Any suggestions?
    Untitled
    c
    • 2
    • 11
  • m

    Mark McDonald

    10/24/2019, 8:15 PM
    Hi - I'm looking for some feedback on how we're deploying our flows. I'm trying to dynamically deploy our flows using Docker and a custom deployment script. Within my project, I have a module that contains my flows (each flow in a separate file). Each of these flows has various dependencies that get imported (pandas, scipy, numpy, etc.). We pin our dependencies in a requirements.txt file. My deployment script does the following in this order: 1. Grabs all of my flows' dependencies from the requirements.txt file and adds it to a list called
    dependencies
    2. Installs these dependencies on my docker image 3. Builds prefect docker storage object: storage = Docker(registry_url=os.getenv('PREFECT_ECR_REGISTRY'), image_name=os.getenv('PREFECT_ECR_IMAGE_NAME'), image_tag=package_version, python_dependencies=dependencies) 4. script imports each of my flow files and locates the flow objects within them. These flows get added to the storage object 5. calls storage.build 6. iterates through the flows and calls flow.deploy() on each flow object with build set to False As it stands the deployment takes ~5 minutes. Any areas where I might be able to improve this?
    ✔️ 2
    c
    • 2
    • 2
  • i

    itay livni

    10/24/2019, 8:36 PM
    How do you add a custom
    filepath
    to visualize? Using the script from the visualize docs I added filename as an arg. But I do not see the write to s3. I am assuming that s3fs is available for writing this file (if that is the right terminology?)
    Untitled
    c
    • 2
    • 8
  • d

    Daniel Veenstra

    10/24/2019, 10:05 PM
    Hey all, I'm working on starting what's essentially a data warehouse at my company, and thinking of using prefect to schedule and orchestrate all the ETL. We're going to have a number of 3rd party data sources to pull data from on various schedules, and then we'll likely want to schedule some transformations after certain combinations of tables are finished loading each day. I'm trying to get the project architecture off on the right foot and have been trying out prefect for a couple days, and I'm wondering how I should think about organizing my Flows. I'm wondering, should I have one flow per data source, or one flow for the whole pipeline? My dilemma is that each data source is going to have its own schedule, which leads me to have one Flow per source, but if I want to trigger transformations based on the completion of table loads, that feels like the flows are going to have dependencies on each other's completions and would be better off as one flow. Thoughts? Any examples out there of similar projects?
    j
    i
    r
    • 4
    • 16
  • e

    Egbert Ypma

    10/25/2019, 3:03 PM
    Hi folks, I'm pretty new to prefect and exploring its possibilities at the moment. I am trying to upload a file to AWS but I am struggling with all the security details. Where can I find an example that uses the S3Upload task as a step in a workflow?
    c
    • 2
    • 2
  • b

    Brian Mesick

    10/25/2019, 5:44 PM
    Hi folks. I'm trying out Prefect and running some pretty basic flows to get a feel for how development works. I'm curious if anyone is actually using the Snowflake task as it seems to have a bug that I would expect to render it unusable.
    c
    j
    m
    • 4
    • 8
  • i

    itay livni

    10/26/2019, 5:53 AM
    Hi - I am trying to check if a task has successfully run in a flow and then based on that on a successful; or failed
    task
    completed do something. Something like
    TaskRunner
    state.is_successful()
    but in a
    flow
    .
    j
    c
    • 3
    • 9
  • m

    Matias

    10/26/2019, 9:30 AM
    I’m wondering if and how prefect could be used to transfer large amounts of data between different servers/clouds. Basically, I’d need to move 10-100 gigabyte csv/Jason files from an SFTP server to ADLS, and later on between other sources and sinks. Moving this amount of data as a one gigantic in memory string between tasks does not seem very sound approach for many reasons. So how would you actually do that?
    j
    j
    +2
    • 5
    • 18
  • o

    osuarez

    10/29/2019, 9:14 AM
    Hi everyone!! I am totally new to the Prefect tool and I am trying to use it as a deployment inside GCP. The idea is have Prefect working inside an instance of Compute Engine by now (just to start using it), could you anyone give me some clue about what is the best way to have this working on Compute Engine? I have made some tests deploying an Dask cluster inside the CE but I am not sure this is the best way to set the tool up.
    c
    • 2
    • 1
  • a

    Adam Roderick

    10/29/2019, 11:12 AM
    I have several tasks where I call out to the shell using subprocess. When I
    flow.run()
    locally, I see all the stdout information from these subprocesses in the prefect log output. But I do not see the same information in prefect cloud. Is there a configuration setting or something I can update to ensure I see this output in the cloud logs?
    j
    c
    • 3
    • 3
  • a

    Arlo Bryer

    10/29/2019, 3:58 PM
    Hi everyone, new here. I'm considering trying prefect to entirely revamp a set of data pipelines that we're using at Atomico.
Powered by Linen
Title
a

Arlo Bryer

10/29/2019, 3:58 PM
Hi everyone, new here. I'm considering trying prefect to entirely revamp a set of data pipelines that we're using at Atomico.
View count: 1