https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    David Ojeda

    05/06/2020, 9:17 AM
    Hi, I am refactoring my flow parameters and I encountered a slight problem when I compose two flows with a parameter with the same name:
    ValueError: A task with the slug "limit" already exists in this flow.
    I could come up with a plumbing hack like this:
    flow = Flow(name)
            local_flow = build_local_flow()     # a function that returns a flow
            quetzal_flow = build_quetzal_flow() # idem
    
            # plumbing: both local and quetzal flows have a limit parameter
            limit_parameter = local_flow.get_tasks(name='limit')[0]
            other_parameter = quetzal_flow.get_tasks(name='limit')[0]
            for edge in quetzal_flow.edges:
                if edge.upstream_task == other_parameter:
                    edge.upstream_task = limit_parameter
            quetzal_flow.tasks = set(t for t in quetzal_flow.tasks if t != other_parameter)
            flow.update(local_flow)
            flow.update(quetzal_flow)
            ...
    which works, but it seems very hackish and far from elegant. Is there a cleaner alternative to this? (other than renaming the parameter, of course)
    a
    j
    • 3
    • 6
  • j

    John Ramirez

    05/06/2020, 1:12 PM
    Hey everyone - I have reached out a few times over the last few days looking for suggestions about how to run up to 3000 distinct runs in parallel in the most efficient way. I see there is a new
    Dask Cloud Provider Environment
    and want to know if this env would be the best way to accomplish this goal
    n
    j
    +2
    • 5
    • 19
  • j

    Jie Lou

    05/06/2020, 2:24 PM
    Hi everyone. I noticed one issue in scheduling behavior in registering a flow, and wondered if anyone met this issue as well. I used
    CronClock("00 16 * * *",parameter_defaults=MY_PARAMETER_1)
    to schedule one flow. And then I also have another flow with different batch of parameters to be scheduled at the same time,
    CronClock("00 16 * * *",parameter_defaults=MY_PARAMETER_2)
    . And then I set
    flow.schedule=Schedule(clocks=[clock1,clock2])
    , and then register the flow. In cloud UI, I just see one flow scheduled instead of two. If I tweak the time a bit, i.e., set
    CronClock("05 16 * * *",parameter_defaults=MY_PARAMETER_2)
    , then two flows are scheduled as expected. It seems like if two flows are scheduled at the same time, then only on will be picked. It’d better if it allows multiple flows scheduled at the same time.
    n
    a
    • 3
    • 9
  • j

    Jim Crist-Harif

    05/06/2020, 3:45 PM
    @Joe Schmid (and anyone else with opinions), I've been thinking about the current state of creating a new
    Environment
    class per dask-cluster class, and it seems a bit untenable. I've been thinking about making a generic dask environment that takes the cluster-manager class and kwargs and uses that to create a dask cluster. Since dask already has a spec'd interface for this, it seems significantly simpler than having a mirror of each of these in prefect. Something like (class name not decided):
    environment = DaskClusterEnvironment(
      cls=dask_yarn.YarnCluster,
      kwargs={
        "queue": "engineering",
        "environment": "hdfs://..."
      }
    )
    👍 1
    👀 1
    j
    d
    • 3
    • 4
  • d

    David Ojeda

    05/06/2020, 4:29 PM
    So what’s the latest word on result_handlers? I have this warning when doing `flow.register`:
    [2020-05-06 18:24:40]  WARNING - py.warnings | /home/david/.virtualenvs/iguazu-env/lib/python3.8/site-packages/prefect/client/client.py:576: U
    serWarning: No result handler was specified on your Flow. Cloud features such as input caching and resuming task runs from failure may not wor
    k properly.
    but the flow constructor docstring says:
    - result_handler (ResultHandler, optional, DEPRECATED): the handler to use for
                retrieving and storing state results during execution
    Should I add a no-op result handler to quiet that warning, or just ignore it?
    c
    • 2
    • 2
  • d

    Dan DiPasquo

    05/06/2020, 9:10 PM
    We'd like to automate flow registration via CICD. Is there an existing pattern for doing so via CircleCI? What is the most appropriate token type for this type of integration?
    n
    j
    • 3
    • 3
  • m

    Matthias

    05/06/2020, 10:13 PM
    I’d like to have Prefect server running locally in an existing Docker environment. Is there any way I could spin up the containers created by
    prefect server start
    manually? Or is there a way to run
    prefect server start
    inside another Docker container? I tried to add the Docker socket as a volume but that did not work.
    n
    b
    • 3
    • 9
  • m

    Manuel Mourato

    05/06/2020, 11:04 PM
    Hello guys, I am trying to execute flow.visualize() in a Pycharm IDE, but nothing is showing up. The code doesnt show any error, it executes until the end of the flow with Success, but the visualization doesnt show up.
    f_run=test_flow1.run()
    test_flow1.visualize(flow_state=f_run)
    Anyone had this issue before?
    n
    c
    m
    • 4
    • 6
  • c

    Chris Vrooman

    05/07/2020, 6:09 AM
    I have a question about executing the same function multiple times within a flow. Is there a recommended way to configure upstream dependencies so that we can ensure that tasks execute in the right order? There is no data dependency for my use case. Was hoping to avoid redefining a function with a different name. Basic Example:
    @task
    def my_function(x, y):
        print(x+y)
    
    with Flow(name="my_flow") as flow:
        # Run 1st
        my_function(1, 2)
        # Run 2nd
        my_function(3, 4)
        # Run 3rd
        my_function(5, 6)
    d
    j
    • 3
    • 3
  • k

    Ketan Bhikadiya

    05/07/2020, 8:33 AM
    Hi, I have prefect server and agent running on the system. Even after registering the flow(flow.register()). I do not see any info on UI(I am not using prefect cloud). Also, I do not see config.taml file in my home directory of prefect. Not sure what is the issue here.
    j
    n
    • 3
    • 5
  • m

    Manuel Mourato

    05/07/2020, 8:36 AM
    Hello again I am trying to perform some multiprocessing tests with a DaskExecutor. This is how I set the remote env for multiprocessing:
    env = RemoteEnvironment(
        executor="prefect.engine.executors.DaskExecutor",
        executor_kwargs={
            "local_processes": True
        }
    
    )
    The execution DAG is in the annexed image. My understing is that , using multiprocesing, Task 2 and Task 4 would execute at the same time after Task 1, but in different processes, because they do not depend on each other. But the behaviour I see, is them executing sequentially, like wait() is being called between each process. Is my understanding not correct?
    j
    • 2
    • 10
  • z

    Zviri

    05/07/2020, 8:47 AM
    hey everyone, I noticed very high memory consumption when using mapped tasks in conjunction with the 
    CloudTaskRunner
     but not the plain 
    TaskRunner
     (using Dask Deployment). I was observing that during the "mapping" procedure the worker that was doing the actual mapping was continuously using more and more memory. Which seemed reasonable since mapping constitutes copying the mapped task. However, I noticed that when using the
    CloudTaskRunner
    memory consumption is much much higher during this step. To be specific, mapping from a list that only contained approximately 8000 elements has eaten up more than 4 GB of memory on the worker. I did some debugging and found out that the same mapped task has a serialized size of 15 200 bytes using
    TaskRunner
    , but 122 648 bytes using the
    CloudTaskRunner
    . This is almost a 10 fold increase which makes the mapping function pretty unusable for me. The increased size is ultimately coming from pickling this function: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L788 and I think the serialized size of the
    CloudTaskRunner
    class is the cause of the different sizes. Is this behavior something that is known? Or is it worth a bug report? I will stick to the plain
    TaskRunner
    for now which will, unfortunately, prevent me from using the cloud UI which I really like. It would be great if this could be fixed. I'm using the latest prefect (v 0.10.7)
    j
    j
    c
    • 4
    • 5
  • a

    Adrien Boutreau

    05/07/2020, 9:36 AM
    Hello! quick question : we are running job in a docker image, and we will like to automatize it with Prefect : do you know where I can find a tutorial ?
    j
    • 2
    • 2
  • m

    Matthias

    05/07/2020, 1:49 PM
    Hi! How do I programmatically run a registered flow, so that I can see the execution in the UI (local, not cloud)?
    👀 1
    l
    a
    • 3
    • 3
  • m

    Matthias

    05/07/2020, 2:01 PM
    How does the Prefect agent get notified of new flows? I am running the agent in a Docker container and it is active and connected to apollo. Also I mapped the .prefect directory as a volume. But the Agent does not pick up new flows (when clicking run in the UI). When I run
    prefect agent start
    in my main Docker container the process picks up the flows
    👀 1
    n
    • 2
    • 25
  • d

    David Ojeda

    05/07/2020, 2:53 PM
    Hello there, I don’t know exactly how to search for this particular strange problem that I have encountered lately: I have a flow that is a simple pattern of generate N elements, map over them, then collect all results and send a slack notification. I executing this flow using a
    DaskKubernetesEnvironment
    and a kubernetes agent on a k8s GCP cluster that has auto-scaling (I am not sure if this is relevant). The strange situation is that, sometimes, the flow stays on running indefinetely because the last 1 to 3 tasks of the map are “still running”. I will try to collect some logs when this happens, but it does not look like the workers are stuck; it seems more like they were killed a bit too soon and they could not inform the prefect server (or the dask scheduler?) that they finished… Has anyone encountered this strange situation?
    l
    • 2
    • 11
  • m

    Matias Godoy

    05/07/2020, 2:55 PM
    Hello! I'm using GraphQL to run flows. The problem is that the API requires the flow_id, which changes every time I register a new version of said flow. Is there a way to run a flow by name instead of by id using GraphQL?
    a
    j
    • 3
    • 3
  • m

    Manuel Mourato

    05/07/2020, 4:00 PM
    Hello again! Quick question: I have a flow1 with 3 sequential tasks,
    task_sequence = [load_data, task1, task2]
    test_flow1.chain(*task_sequence)
    which I saved to a file locally, and then loaded it via the
    test_flow2=Flow.load(path)
    method. Now, I want to add a new task3 to this flow, but I want to make load_data an upstream dependency of this new task, like this:
    test_flow2.set_dependencies(
        task=task3
        upstream_tasks=[load_data])
    But I get the error:
    A task with the slug "Task1" already exists in this flow
    It seems to complain about load data already being defined in the flow, which it is. But what I want is to say load_data is a dependency of task3 What am I doing wrong?
    l
    • 2
    • 8
  • m

    Manuel Mourato

    05/07/2020, 6:08 PM
    I realize this a long shot, but maybe someone has had this issue before: I want to save a flow with a custom Task that I created. This custom task implements some logic with Cython, and when I execute the flow.save method, I get:
    rv = reduce(self.proto)
    File "stringsource", line 2, in jnius.JavaClass.__reduce_cython__
    TypeError: no default __reduce__ due to non-trivial __cinit__
    A Cython object as far as I was able to search cannot be pickeled, unless specific code is add like so : https://stackoverflow.com/questions/12646436/pickle-cython-class My goal here, is to store this flow so that I can run an agent which will then execute it when I run via the UI. If I just register the flow in the UI , but dont save it, I get this error:
    [2020-05-07 17:43:48,846] INFO - agent |     flow = storage.get_flow(storage.flows[flow_data.name])
    [2020-05-07 17:43:48,846] INFO - agent | KeyError: 'Test-Flow55'
    My question is: did someone have an issue like this before? Can I get around this in some way?
    👀 1
    j
    l
    • 3
    • 11
  • j

    John Ramirez

    05/07/2020, 8:10 PM
    hey everyone - I’m trying to use the new
    DaskCloudProviderEnvironment
    and I keep getting this error
    botocore.exceptions.NoRegionError: You must specify a region.
    distributed.deploy.spec - WARNING - Cluster closed without starting up
    any ideas?
    l
    j
    l
    • 4
    • 25
  • j

    John Ramirez

    05/07/2020, 8:49 PM
    Working with prefect
    0.10.7
    and found this error
    Traceback (most recent call last):
      File "main.py", line 4, in <module>
        from prefect.environments import DaskCloudProviderEnvironment
    ImportError: cannot import name 'DaskCloudProviderEnvironment' from 'prefect.environments' (/Users/johnramirez/projects/client/superset/aws-fargate-poc/venv/lib/python3.7/site-packages/prefect/environments/__init__.py)
    l
    • 2
    • 1
  • m

    Manuel Mourato

    05/07/2020, 9:50 PM
    Hello (again) I have recreated the dockers of prefect server, and for the latest version , registered flows do not appear in the UI. Is there a way to specify the tag I want to use with prefect server start?
    l
    • 2
    • 10
  • m

    matta

    05/08/2020, 12:56 AM
    Heya. Data Scientist who occasionally has to do some Data Engineering stuff - just wanted to say that of the tools in its class, Prefect definitely feels the least terrifying!
    💯 1
    :marvin: 3
  • m

    matta

    05/08/2020, 12:57 AM
    quick question: What's the Best Practices for canceling a flow if the data origin has no new entries? Should I raise an Exception?
    j
    c
    • 3
    • 4
  • z

    Zviri

    05/08/2020, 8:03 AM
    Hi there. I really like Prefect's mapping feature and in general the approach towards being able to easily map/reduce your tasks. But AFAIK you can only do a full map or a full reduce. Do you plan on supporting reducing by key? I understand you can do it "manually" within a single task, but a parallel/distributed version could definitely be more effective.
    j
    • 2
    • 1
  • s

    Simon Basin

    05/08/2020, 1:01 PM
    Hello Prefectors! Need advise on a best way to use Prefect with existing CI/Release process, where: • Application codebase and flow definitions are dockerized and tagged with version • Tasks runs are effectively "docker run image:version"
    j
    j
    • 3
    • 3
  • j

    John Ramirez

    05/08/2020, 1:10 PM
    Hello everyone! If you use the
    DaskKubernetesEnvironment
    with a custom YAML spec, can you still specify min & max workers in the class
    j
    j
    • 3
    • 28
  • a

    Adrien Boutreau

    05/08/2020, 3:24 PM
    Hi, I have an issue when I run on ec2 instance Prefect Core : On UI, flows don't appear When I use API to make a call :
    {
    "graphQLErrors": [],
    "networkError": {},
    "message": "Network error: NetworkError when attempting to fetch resource."
    }
    From a python job :
    <http://localhost:4200/graphql/alpha>
    {'data': {'flow': [{'name': 'Zoom_test'}, {'name': 'Zoom_test'}, {'name': 'Zoom_test'}, {'name': 'Zoom_test'}, {'name': 'Zoom_test'}]}}
    I update hosts file by xxx.eu-west-2.compute.amazonaws.com localhost:4200 but without success
    n
    • 2
    • 5
  • j

    John Ramirez

    05/08/2020, 4:01 PM
    hey everyone - I am using EKS with auto-scaling and
    DaskKubernetesEnvironment
    to run my workflows. There will be times where I will need to submit a high volume of flows to the cluster. My expectation is that the auto-scaler will spin up the additional EC2 instances required to handle to temporary volume increase but I am concerned about the latency (that is the time it takes to make the additional resources available). Is there a setting on the retry the entire flow if it fails? That way it reduces the gap.
    j
    • 2
    • 4
  • j

    Jeremiah

    05/08/2020, 7:59 PM
    I know we have a lot of DBT users here - a PR to create a new task library for DBT was just opened! And I’m taking a total stab here — was it you @Mark McDonald? If so, thanks! https://github.com/PrefectHQ/prefect/pull/2526
    💯 2
    🚀 9
    ⭐ 3
    m
    • 2
    • 2
Powered by Linen
Title
j

Jeremiah

05/08/2020, 7:59 PM
I know we have a lot of DBT users here - a PR to create a new task library for DBT was just opened! And I’m taking a total stab here — was it you @Mark McDonald? If so, thanks! https://github.com/PrefectHQ/prefect/pull/2526
💯 2
🚀 9
⭐ 3
m

Mark McDonald

05/08/2020, 8:01 PM
yep! At Clearcover, we use prefect to orchestrate our dbt jobs and it works great! I'm happy to contribute
💯 1
j

Jeremiah

05/08/2020, 8:30 PM
Awesome!
View count: 1