https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    alex

    08/18/2020, 6:22 PM
    Hello, how can I reuse a flow? I have a function that takes in a list of items and build a flow using those items (I don't use prefect parameters currently). I want to deploy 2 versions of that flow (they will be different since the item list will be different) . I do change the name each time but it seems like they're both being considered as the same flow and replace each other.
    j
    8 replies · 2 participants
  • t

    Trever Mock

    08/18/2020, 7:27 PM
    Hey all, happy Tuesday!   I'm trying to import additional generated Python files and use them as part of my Prefect Workflow. Everything works fine locally, but when I push up the flow to a docker repo and try to run it using a Kubernetes agent, I'm getting the following error from cloudpickle:   ModuleNotFoundError: No module named 'test_pb2'   The Kubernetes agent spins up a job in our cluster, and this error comes from that job. It works fine if the flow doesn't reference any other custom modules.   Here's what I've tried so far, but it all results in the same error (note that the healthcheck.py passes just fine in all these cases when the flow is pushed to docker storage):   1.       Setting PYTHONPATH to point to test_pb2 and other files. If I pull the container, PYTHONPATH is correct; however, when the kubernetes agent spins up a job using that container, it doesn't appear like it's set anymore. Is it getting overridden somewhere?   2.       Turning the generated files into a package installable with pip and then installing it as part of the Dockerfile that creates the docker storage. If I pull the container, the package appears to be installed successfully. Still the same error when run by the kubernetes agent though.   Any ideas on other things I could try? Is there a preferred way to include other Python files as part of a prefect flow?   Any ideas/help is greatly appreciated!!
    j
    3 replies · 2 participants
  • r

    Richard Hughes

    08/18/2020, 7:46 PM
    Hi All,
  • r

    Richard Hughes

    08/18/2020, 7:47 PM
    is there a way to make the prefect logs in the ui display in ms or does this even log at this granularity?
    j
    6 replies · 2 participants
  • j

    josh

    08/18/2020, 8:30 PM
    Hey team, recently some users have been asking us to announce core releases here on a regular cadence so we are going to start doing that again! Just released
    0.13.3
    and here are a few notable changes: 🌲  More control over log levels 📦  Pass any arguments to Docker tasks 🔑  Fixed caching when no cache_key is present A big thank you to our contributors who helped out with this release! Full changelog:
    Untitled
    🙌 4
    🙏 7
    🚀 11
    :party-parrot: 1
  • b

    bral

    08/18/2020, 9:03 PM
    Is there right way to check executions on dask cluster? My flow:
    from prefect import task, Flow, case, Task
    from random import randint
    from prefect.schedules import IntervalSchedule
    from datetime import timedelta
    from prefect.engine.executors import DaskExecutor
    from prefect.environments import LocalEnvironment
    
    
    class Preprocess:
        def __init__(self, number):
            print(number)
            self.number = number
            self.data = None
    
        def read_file(self):
            self.data = self.number
    
        def preprocess_file(self):
            self.data = self.data * 2
    
        def save_file(self):
            self.data = self.data / 3
    
    
    @task
    def is_running():
        return False
    
    @task
    def get_files():
        return [i for i in range(0, randint(0, 15))]
    
    @task
    def _print(lst):
        print(lst)
        print(len(lst))
    
    @task
    def etl(file):
        prep = Preprocess(file)
        prep.read_file()
        prep.preprocess_file()
        prep.save_file()
    
    
    executor = DaskExecutor(address="<tcp://localhost:8786>")
    schedule = IntervalSchedule(interval=timedelta(seconds=5))
    local = LocalEnvironment(executor=executor)
    
    with Flow("process", schedule=schedule, environment=local) as flow:
        condition = is_running()
        with case(condition, False) as cond:
            files = get_files()
            _print(files)
            etl.map(files)
    
    flow.run()
    Accorging tuto : https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html I started scheduler and 2 workers , and after looked at the boker-web ui. But Tasks are empy on ui.
    j
    2 replies · 2 participants
  • w

    Will Goldstein

    08/18/2020, 9:03 PM
    Hi all! Is this an okay place to ask getting-started questions?
    💯 2
    j
    2 replies · 2 participants
  • w

    Will Goldstein

    08/18/2020, 9:07 PM
    I'm trying to get started with the kubernetes agent locally (and just
    prefect server start
    for the scheduler) it keeps firing up pods that try to connect to localhost:4200 for the graphql endpoint and failing because they can't access
    localhost
    from inside a pod. Is there a way to specify the endpoint the pods should be trying to connect to (e.g. to `kubernetes.docker.internal`instead)?
    j
    2 replies · 2 participants
  • w

    Will Goldstein

    08/18/2020, 9:08 PM
    Tried reading through the docs and the source but I didn't see anything, so I figured I must be trying to do something non-standard
  • c

    Cody Kaiser

    08/18/2020, 9:09 PM
    Hi all. Using Prefect Cloud i've setup a user with read-only role, but they can't see any projects. Doesn't read-only allow them to see everything in the cloud? If not how to apply those permissions?
    j
    6 replies · 2 participants
  • c

    Chris Marchetti [Datateer]

    08/18/2020, 9:55 PM
    I am having an issue where I created an account using google and then was sent an invitation to my organization's (Datateer) prefect. I followed the invitation and in messages it says that I have joined the group, but I can't see any of the organization's materials. Thanks for the help!
    c
    9 replies · 2 participants
  • d

    dennis

    08/19/2020, 3:11 AM
    Hi all, does prefect provide the feature like ""depends_on_past=True"" in Airflow when running flows on schedule? Thanks for the help!!
    j
    2 replies · 2 participants
  • k

    Klemen Strojan

    08/19/2020, 7:46 AM
    Hi all - we’re using Github Flow Storage and from time to time our flows fail with:
    Failed to load and execute Flow's environment: ConnectionError(MaxRetryError("HTTPSConnectionPool(host='<http://api.github.com|api.github.com>', port=443): Max retries exceeded with url: /repos/<ORG>/<REPO> (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7ff5caf88668>: Failed to establish a new connection: [Errno -2] Name or service not known',))",),)
    What are potential causes for this error? When there is downtime on Github, we see
    Failed to load and execute Flow's environment: GithubException(500, None)
    . What part of the source code should I explore to understand this better?
    j
    2 replies · 2 participants
  • x

    x062Wyhdolq

    08/19/2020, 8:33 AM
    Hi everyone! I have a problem with event driven/streaming flow.
    j
    4 replies · 2 participants
  • n

    Nuno Silva

    08/19/2020, 2:54 PM
    Hi. I'd really appreciate some help on connecting to a remote k8s cluster running in azure. I can execute flows in the cluster when using
    flow.run(...)
    but when I
    flow.register(...
    ) and then run it from the UI, I get errors in the dask workers in k8s saying that cannot connect to
    localhost:4200
    . which makes sense since that localhost:4200 has to be replaced by the IP where the prefect server is running. how to do that? (https://github.com/PrefectHQ/prefect/issues/3185)
    j
    d
    62 replies · 3 participants
  • m

    Michael Ludwig

    08/19/2020, 3:26 PM
    We use the prefect logging method. Either self.logger from
    Task
    or imported from
    from prefect.utilities.logging import get_logger
    We get some weird double logging with different formats. I am not sure if it is a pure Prefect issue or just something we do wrong on our ends. But did someone else see something like this an has a solution?
    [2020-08-19 15:21:50] INFO - prefect.TaskRunner | Task 'load': finished task run for task with final state: 'Success'
    Task 'load': finished task run for task with final state: 'Success'
    [2020-08-19 15:21:51] INFO - prefect.TaskRunner | Task 'extract': finished task run for task with final state: 'Success'
    Task 'extract': finished task run for task with final state: 'Success'
    [2020-08-19 15:21:51] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    Flow run SUCCESS: all reference tasks succeeded
    [2020-08-19 15:21:52] INFO - prefect.orchestrator.utils | Flow state transition: <Running: "Running flow."> -> <Success: "All reference tasks succeeded.">
    Flow state transition: <Running: "Running flow."> -> <Success: "All reference tasks succeeded.">
    3 replies · 1 participant
  • j

    Jonas Hanfland

    08/19/2020, 4:01 PM
    Hey guys, how would you change
    BigQueryTask()
    arguments at runtime? Basically I would like to be able to change the
    table_dest
    argument based on an environment variable, so that I can save my query result in a different place depending on if I run the flow locally or elsewhere. Thanks very much in advance
    j
    2 replies · 2 participants
  • c

    Charles Leung

    08/19/2020, 4:24 PM
    Hey Team, Does anyone have the issue of not being able to restart failed tasks? It seems that it should be okay, asking for a click confirm, but it is often grayed out.
    n
    9 replies · 2 participants
  • b

    Brian Mesick

    08/19/2020, 5:14 PM
    Hey all. Does anyone know off hand why a flow wouldn’t get picked up after it’s been submitted? The last version of it got picked up. I’ve deleted the flow, bumped the version tag, rebuilt, and still nothing. Get lazarus killed 3 times and errors out. Agent is running other flows fine.
    j
    c
    +1
    45 replies · 4 participants
  • w

    Will Goldstein

    08/19/2020, 5:46 PM
    Hi, is there any equivalent in prefect to Airflow's automatic picking up of new dags? Or do all flows need to be explicitly registered?
    j
    2 replies · 2 participants
  • w

    Will Goldstein

    08/19/2020, 6:09 PM
    Sorry for the battery of questions- the KubernetesJobEnvironment overwrites the image I provide in a job spec with some flow specific image-- how can I specify a flow should run with a specific image?
    1 reply · 1 participant
  • b

    bral

    08/19/2020, 8:43 PM
    good evening! Are there any plans to include a role-based system in the server release? or will it only stay in the cloud?
    c
    1 reply · 2 participants
  • m

    Michael Reeves

    08/19/2020, 10:16 PM
    Question: if I have a task I want to perform over all the lines in a file, but have each task run on a line depend on the next (I don't want to continue iterating if a previous line fails), whats the best prefect way to do this? for right now I'm doing:
    with Flow('file') as flow: 
        name = Parameter('name')    
        lines = read_file(name=name)
        for l in lines:
            do_line(l)
        finish()
    I know theres a prefect
    Map
    functionality, but I didn't see an easy way to make it so each task for each line depends on its previous line task in an iterative manner
    c
    m
    30 replies · 3 participants
  • m

    Max Lei

    08/19/2020, 10:57 PM
    Hi All, I have a flow that I wrote, and I started a prefect service using:
    prefect backend server &
    prefect agent start &
    prefect server start &
    Then I registered my service using:
    flow.register(project_name="ML")
    Finally went to the Web UI and click on quick run, but then it’s sort of been stuck there for a while now Is there any way to debug why this is happening?
    n
    7 replies · 2 participants
  • r

    raman

    08/19/2020, 11:12 PM
    Hello, I wanted to check if someone can suggest community maintained deployment method to deploy prefect on multiple Windows servers? And maintaining one database for these deployments.
  • m

    Maxwell Dylla

    08/20/2020, 12:09 AM
    Does anyone else in the community use type-hints to track data-dependencies in a Flow? Creating tasks with the
    @flow
    decorator works well for me, but the class-based API confuses my IDE. For example, my IDE thinks the class-based result should be type `Task`:
    from prefect import Flow, Task, task
    
    
    @task
    def func_task() -> str:
        return ""
    
    
    class ClassTask(Task):
        def run(self) -> str:
            return ""
    
    class_task = ClassTask()
    
    
    with Flow("test_type_hints") as flow:
        func_result: str = func_task()
        class_result: str = class_task()  # Expected type 'str', got 'Task' instead 
    flow.run()
    n
    5 replies · 2 participants
  • m

    Max Lei

    08/20/2020, 4:09 AM
    When I use
    flow.run()
    everything works fine, but when I try to use the prefect server, it’s complaining about environmental variables not setup for AWS, such as:
    KeyError: 'No access key found. Please set the environment variable AWS_ACCESS_KEY_ID.'
    Is this because the runs are being run inside the docker container?
    n
    2 replies · 2 participants
  • a

    Alfie

    08/20/2020, 7:08 AM
    Hi all, seems the local agent execute a flow run in a separate process, I’d like to do some initializations at the very beginning of flow run, maybe right after the process is created . Is there any way to achieve that?
    d
    13 replies · 2 participants
  • a

    Alfie

    08/20/2020, 7:08 AM
    Thanks
  • m

    Matias Godoy

    08/20/2020, 12:22 PM
    Hi guys! I found a weird behaviour with the agents: We have a flow that has been working perfectly for a while now. The problem is that every now and then we have a run in which every task (in the same flow run) is executed twice. We have only two agents running in an EC2 instance. I started looking today and I found that sometimes both of the agents pick the same flow run, and both execute it! [more info in the comments so I don't pollute the main thread]
    👀 2
    j
    c
    +2
    14 replies · 5 participants
Powered by Linen
Title
m

Matias Godoy

08/20/2020, 12:22 PM
Hi guys! I found a weird behaviour with the agents: We have a flow that has been working perfectly for a while now. The problem is that every now and then we have a run in which every task (in the same flow run) is executed twice. We have only two agents running in an EC2 instance. I started looking today and I found that sometimes both of the agents pick the same flow run, and both execute it! [more info in the comments so I don't pollute the main thread]
👀 2
Both agents are running under
supervisord
in the same EC2 instance, so I have a GUI that allows me to easily see their logs. Here's what I found: Agent 1:
[2020-08-19 06:39:07,450] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-08-19 06:39:07,582] INFO - agent | Deploying flow run 8d061e63-8b9e-40a6-a1b8-103cecedac01
Agent 2:
[2020-08-19 06:39:07,672] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-08-19 06:39:07,805] INFO - agent | Deploying flow run 8d061e63-8b9e-40a6-a1b8-103cecedac01
As you can see, both agents found the pending run and both picked it up with a difference of a few milliseconds.
Also in the cloud UI, you can clearly see what's happening in the logs:
The strange part is that this only happens every now and then. The agents work perfectly most of the times.
Is there something I'm doing wrong? Has this been reported before? Let me know if I you need me to provide more information.
j

Jeremiah

08/20/2020, 1:49 PM
Hi @Matias Godoy! We actually experienced a similar issue with one of our test workflows yesterday morning and identified a race condition.with multiple agents. We pushed a fix for that issue yesterday afternoon, so if it’s the same I hope you won’t see this behavior anymore. Of course if you do see anything unexpected, please let us know!
m

Matias Godoy

08/20/2020, 1:50 PM
Nice! This fix is for the agents or for the Cloud? I'm asking so I know if I have to take any action 🙂
👍 1
c

Chris White

08/20/2020, 4:10 PM
It’s for Cloud - no action required on your part 👍
👍 2
j

Jeremiah

08/20/2020, 4:16 PM
Thanks Chris - Apologies I missed your follow-up @Matias Godoy
m

Matias Godoy

08/20/2020, 4:27 PM
Excellent. Thanks a lot!
e

Eldho Suresh

11/16/2020, 3:49 AM
@Narasimhan Ramaswamy
n

Narasimhan Ramaswamy

11/16/2020, 4:02 AM
@Jeremiah - just an extension to this problem, we are having prefect job submitted twice. We just have one agent running but randomly within single flow all tasks are repeated twice. The flow used DaskKubernetesenvironment. Our agent is hosted in AKS and we manage flows with Prefect Cloud. We noticed that these random failed flows were rescheduled by Lazarus.
can you please help here?
j

Jeremiah

11/16/2020, 4:14 PM
@Narasimhan Ramaswamy it sounds like you are experiencing a different issue than the one in this thread, since your symptom is repeated tasks, not runs (apart from normal Lazarus rescheduling). Sometimes seeing repeated tasks in combination with Dask means that your Dask worker ran out of memory and died, and when it spun back up the Dask scheduler caused it to re-run all work. If you need further assistance I recommend starting a new thread here so other folks will be able to see it (this thread is 3 months old) or a GitHub discussion to maximize visibility!
View count: 1