https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Alister Lee

    06/04/2020, 12:17 AM
    (selecting prefect) Hiya, I have a general question - thanks for considering it. I'm trying to explain the case that the orchestration capability for our very large (500 source system) data lake shouldn't just rush to bake in airflow. I think we should have an event bus, and hang airflow off the back of the bus so that we can evolve and swap/add tools (to prefect or others) over the life of the platform. Are you aware of limitations in airflow or strengths in prefect that I could use as examples to warn my team as we try many invocations/second of a smallish number of dynamic dags? Thanks!
    n
    j
    +2
    • 5
    • 14
  • a

    Avi A

    06/04/2020, 9:18 AM
    using Task `timeout`: I have a task where I’ve set the
    timeout
    argument. This causes all tasks of this type to not report their completion! I see on the agent itself that their execution is done, but there’s no call to the state handler to change state to complete, or to write the task result, as is normally the case with the tasks. Here’s the gist of my task definition; when I remove the line setting
    kwargs['timeout']
    everything works.
    TIMEOUT = 10. # minutes
    class MyTask(Task):
        def __init__(
                self,
                param1 = None,
                param2 = None,
                cache: bool = False,
                **kwargs
        ):
            self.cache = cache
            if 'max_retries' not in kwargs and 'retry_delay' not in kwargs:
                kwargs['max_retries'] = DEFAULT_MAX_RETRIES
                kwargs['retry_delay'] = timedelta(minutes=DEFAULT_RETRY_DELAY)
            kwargs['timeout'] = TIMEOUT * 60  # If I remove this line, everything works great
            super().__init__(**kwargs)
        def run(self, param1=None, param2=None):
            # Code!
    I fear it’s a bug in prefect but not sure. Would love to get some input before I open an issue
    👀 1
    l
    • 2
    • 8
  • m

    Michał Junczyk

    06/04/2020, 2:25 PM
    Hi everyone, my name is Michael. I am happy to be joining the prefect user community. I'd like to report small bug in tutorial. Is this a proper place to do so? Best regards
    👀 1
    d
    • 2
    • 7
  • d

    Darragh

    06/04/2020, 2:31 PM
    Back again with more noob questions! I’ve [hopefully] successfully configured my Fargate Agent, to the point where the flow gets picked up and I get the following log output
    [2020-06-04 11:01:26,972] INFO - agent | Deploying flow run 05ec3609-4457-4085-8cf3-d01a6d13ccb1
    I can see that it’s created the TaskDefinition , but after that it just sits there and never runs the Task, so I’ve no idea what’s going on, and the UI and logs never show any update. Am I missing something? Some magic words to make the agent actually run the task as opposed to just registering? Being able to turn on debug logging would also be a massive bonus 🙂 All I’m getting from the agent is INFO, and I’ve added the
    [logging] level="DEBUG"
    to the config.toml, and added
    export PREFECT__LOGGING__LEVEL="DEBUG"
    but nothings happening UPDATE In a fit of desperation I went on to my EC2 box and edited the fargate agent code to make all DEBUG statements INFO, and lo and behold, more log outpu. BUT!!! As far as the
    deploy_flow
    function is concerned everything succeeded. And I can see the task definition has been created, but nothing else ever happens in log s or the UI. So I’m slightly wiser but not by a lot. 🙂 Even digging into
    deploy_and_update_flow_run
    in the root agent.py file isn’t giving me much, it seems like everything is succeeding, so I’m at a loss as to why nothing is coming in the UI to show failed, passed, stalled… UPDATE 2 Ok, found something - I took the definition that prefect created and ran it manually, and it gets a
    CannotStartContainerError
    failed to initialise logging. AccessDenied to create Cloudwatch log group. Fixing. I’ll be back 😂 Does the fargate task need access to call back to the agent/graphql for updates? Finally found some logs and it seems like the flow could be trying to make a callback to localhost:4200, is that right? I realised this was stupid question as I wrote it. UPDATE 3 Turns out it does. From the Fargte Agent docs: Core server
    In order to use this agent with Prefect Core's server the server's GraphQL API endpoint must be accessible.
    But nothing to say how to configure it? I’ve overridden the cloud.api endpoint in the config.toml and I can see the update coming out in the logs, but the task is still trying to call back to localhost, presumably because the api endpoint override needs to be injected into the task? As in, into the docker container running the flow?
    👀 1
    d
    j
    +2
    • 5
    • 75
  • d

    Dan DiPasquo

    06/04/2020, 4:36 PM
    What is the expected behavior if one were to provide a _scheduled_start_time to_ Client.create_flow_run() that had passed/was in the past? https://docs.prefect.io/api/latest/client/client.html#client-2
    d
    • 2
    • 2
  • s

    Scott Zelenka

    06/04/2020, 6:04 PM
    Is there a better way to dynamically configure which
    PrefectSecret
    to read from? Our use case is that we have different key/value pairs depending on the lifecycle the Flow is being executed in, so we split that up into two different Secrets, and use a Parameter to trigger which lifecyle it should attempt to process:
    with Flow() as flow:
        lifecycle = Parameter("lifecycle", default='dev')
        secrets = PrefectSecret(f"SECRET_NAME_{lifecycle.run().upper()}")
    That seems hacky, to have to call
    .run()
    on the Parameter though. Wondering if anyone found a cleaner approach.
    n
    • 2
    • 1
  • s

    Scott Zelenka

    06/04/2020, 7:22 PM
    Is there an easy way to alter the
    imagePullPolicy
    when launching a Job through the Kubernetes Agent? Since the image of the Flow to run will always have a tag for the specific version, this value could be changed from
    Always
    to
    IfNotPresent
    . This would decrease the startup time for Flows that get executed frequently, since there's a good chance the image will already exist on the K8 node where it's being executed.
    👀 1
    d
    • 2
    • 14
  • s

    Skip Breidbach

    06/04/2020, 11:23 PM
    Hi there, another person evaluating Prefect here (it's going very well, btw). I saw https://github.com/PrefectHQ/prefect/pull/2646, which is great since it directly addresses one of my use-cases as well. I'm curious to know if that would also work in conjunction with the
    LOOP
    construct. The scenario I have in mind is that I have one task that is generating a series of chunks of data. Each chunk would ideally start to be processed by a different downstream task before the generator task completes. I don't know how many it will create in advance, so I don't think I can
    map
    the task. (It's also possible that I'm just thinking about the problem the wrong way.)
    c
    h
    • 3
    • 7
  • p

    philip

    06/05/2020, 1:58 AM
    How can i use prefect server to access google cloud?
    n
    • 2
    • 15
  • d

    Dan DiPasquo

    06/05/2020, 2:15 AM
    Trying to upgrade from 0.10.7 to 0.11 -- worked through a few issues, but have been banging my head for awhile trying to figure out what I'm doing wrong here: Flow Runs with newly registered flow version fail immediately with Failed to load and execute Flow's environment: AttributeError("'KubernetesJobEnvironment' object has no attribute 'unique_job_name'"). On closer inspection, Cloud UI still shows Prefect Core Version: 0.10.7 for the flow. I'm using new docker base image FROM prefecthq/prefect:0.11.5-python3.8 and updated
    python_dependencies=[
                     "kubernetes",
                     "docker",
    -                "prefect[gcp]",
    -                "prefect[kubernetes]",
    +                "prefect[gcp]~=0.11",
    +                "prefect[kubernetes]~=0.11",
    (this may be redundant?). I can see in the output from the register/build process:
    Step 1/36 : FROM prefecthq/prefect:0.11.5-python3.8
    ...
    Step 30/36 : RUN pip show prefect || pip install git+<https://github.com/PrefectHQ/prefect.git@0.10.7#egg=prefect[kubernetes]>
     ---> Running in e3773eba79d4
    Name: prefect
    Version: 0.11.5
    ...
    Additionally I have pulled and run the docker image from the registered flow and inside:
    root@b3fc72e3e101:/# python
    Python 3.8.3 (default, May 20 2020, 20:38:46) 
    [GCC 8.3.0] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import prefect
    pr>>> prefect.__version__
    '0.11.5'
    I've now even deleted the flow, all past versions, and registered again and still seeing Prefect Core Version: 0.10.7 in Cloud UI - what am I missing / doing wrong?
    j
    • 2
    • 5
  • a

    An Hoang

    06/05/2020, 3:12 PM
    Hi, has anyone created a “Flow as a service” system with Prefect? We don’t plan to charge anything, it’s just a collaboration. We want to provide the code for the flows and the end user just provide 1) Data, 2) Dask cluster IP that end user created/paid for independent from us, 3) Result endpoints and we run the flow. We want to let the user provide the compute backend to not have to deal with compute resource billing, and don’t want to see their proprietary data. How does one go about doing that at a high level? Are there any issues, especially security issues, to be aware of?
    j
    d
    • 3
    • 4
  • a

    An Hoang

    06/05/2020, 3:16 PM
    Also what is the spectrum of deliverables with this? I was thinking, from least work to most work: I was thinking these, from least involved to most involved: 1. Code as a service: • Option 1: Using a third party like Saturn Cloud. We bring the code, User Company only needs to pay for compute power • Option 2: Prefect Cloud, User configure their backend 2. Platform as a service: • We host something on the cloud using customized Prefect server Open source. User Company brings the data and configure backend 3. SaaS: • Option 1: Making an ami/container that is easy to spin up and plug into the resources • Option 2: a complete customized app deployable within User Company’s tools and cluster Am I correct? Did I miss anything?
  • m

    Matthias

    06/05/2020, 4:12 PM
    Hi! I am registering a flow like this:
    schedule = IntervalSchedule(
                start_date=datetime.utcnow() + timedelta(seconds=1),
                interval=timedelta(hours=1),
            )
    
            flow.schedule = schedule
            flow.register()
    and it works, but after 10 intervals, it simply stops. Where am I wrong?
    c
    j
    • 3
    • 15
  • m

    Marwan Sarieddine

    06/05/2020, 8:08 PM
    Hi everyone - I am trying to understand what
    Result.validate
    does - and where exactly it is being called (can’t seem to find when its invoked by the runner?) - what does it mean for a task to have an invalid Result ? (any explanation would be much appreciated - thanks)
    j
    • 2
    • 5
  • l

    Luis Muniz

    06/06/2020, 11:26 AM
    Hi, newb here trying to find a replacement for airflow. What's the recommended infrastructure to deploy prefect core in production? Looking for minimum CPU/memory. The documentation only mentions a simple pip installation, no real discussion on dimensioning, external dependencies like databases, etc. Is it really that simple? Then why prefect.io cloud?
    i
    j
    t
    • 4
    • 8
  • l

    Luis Muniz

    06/06/2020, 11:28 AM
    airflow seemed so complicated to deploy properly for a decent production environment, that I dare not hope
    :upvote: 1
  • k

    Kai Weber

    06/06/2020, 11:34 AM
    Hi everybody, I have two questions: 1. My Prefect Docker App is called "temp" and all subcontainers subsequently temp_xxxxx. Is there a chance to control the name (instead of having the name by chance)? 2. I need to have all of my flows, tasks, etc, on a local drive. Can I mount that local directory like I do it for other docker container with -v xxx:NAME OF PREFECT-SOURCECODE ? Thanks in advance 🙂 Kai
  • a

    Alex Joseph

    06/06/2020, 6:18 PM
    Hi everyone - We're a Data Science team trying to slowly move towards Data engineering. We've been working on R mostly, and it has served us well, but would like to try and use Prefect for the workflow orchestration. We've a lot of code in R with all the domain logic/tests and so rewriting everything in python is not a viable option. Can we use Prefect for orchestration in this case? I've tried to run the R code as subprocess, and it works to an extent, but it seems very hacky. Is there a standard way to do this?
    💪 2
    j
    a
    • 3
    • 5
  • k

    Kevin Weiler

    06/07/2020, 8:28 PM
    hi there - I was wondering if there is any support for inter-flow dependencies - as in “run flow B when flow A finishes” - I’m still just getting started, so I might be thinking about this incorrectly - thanks!
    c
    • 2
    • 2
  • k

    Kevin Weiler

    06/07/2020, 8:36 PM
    @Chris White - shamelessly bugging you because I know you’re there 🙂 - I noticed a PR and commit to remove nomad support. I think the initial support was mostly just a stub, so i get it. Would you be open to PRs for a nomad agent?
    c
    • 2
    • 2
  • m

    Matt Segal

    06/08/2020, 4:49 AM
    Hello, I was wondering what the recommended set up was for running the prefer core server unattended on a virtual machine. The
    prefect server
    CLI does not seem to support a daemonized mode. Is there some standard way to use the docker-compose config to run the core server using something like Docker Swarm?
    n
    • 2
    • 8
  • p

    Paul

    06/08/2020, 5:16 AM
    Hi, I am looking into getting started with Prefect Cloud. Trying to authenticate my machine with `
    prefect cloud login --token $TOKEN
    results in
    [WinError 10061] No connection could be made because the target machine actively refused it'))
    More precisely other prefect auth commands yield the same result. Did I miss anything in the docs in order to establish a connection to prefect cloud?
    n
    • 2
    • 6
  • m

    Matthias

    06/08/2020, 10:05 AM
    Hi, is it possible to add a schedule with a default inactive state? I would like to set the Schedule automatically to inactive when a specific env variable is True.
    n
    j
    • 3
    • 4
  • r

    Rafal

    06/08/2020, 12:45 PM
    Hi, simple question: did anyone try to run prefect open source components on production? What was the challanges? Really appreciate help here
    n
    • 2
    • 1
  • m

    Matthias

    06/08/2020, 1:28 PM
    Hello again, I am running Core/Server at 0.11.5 and have checkpointing set to False in my backend.toml file. Still when the flows run, they create a lot of data in the results-folder which is also not removed automatically. Where else do I have to turn off something to not have this files created, or at least removed afterwards. I suspect that this behaviour is also the reason, why my DaskWorker holds on to a lot of memory after a flow run and then breaks at one point as it does not free up memory at all.
    n
    a
    • 3
    • 9
  • d

    Dan Ball

    06/08/2020, 2:23 PM
    Hi all – I’m just getting started kicking the tires but I’m loving the API so far. Newbie question: What’s the recommended way to handle port conflicts if I’m running a local Postgres server on 5432 on my dev machine? Point Prefect at my local Postgres instance instead of the dockerized service? Use a different forwarded port on the docker service? Something else?
    r
    • 2
    • 2
  • z

    Zach

    06/08/2020, 4:42 PM
    Is there a way to configure automatic flow retries? I am not aware of one, but just wanted to make sure
    👀 2
    n
    • 2
    • 3
  • r

    Ron Van Buskirk

    06/08/2020, 6:46 PM
    Hello everyone -- I was trying to find out why a flow I was running paused for over a day and would welcome any troubleshooting suggestions 🙂 I had started a long-running flow with Prefect Core running from an Anaconda prompt on Windows 10. It runs on an AWS VM and basically calls stored procedures on a Postgres RDS database. I kicked it off processing Task A on Thursday saw that it had reached Task B by Friday. It completed Task B on Saturday at 5:22 am and then just hung afterwards. When I looked on Monday AM I saw that the backend had finished Task B but nothing was happening in the terminal. So I went to cancel the flow with a Ctrl-C. But as soon as I did that, it started running Task C after sitting idle for over a day? Here's a snippet from the flow output:
    ...
    [2020-06-05 15:50:12,454] INFO - prefect.TaskRunner | Task 'task a': Starting task run...
    [2020-06-05 17:15:04,576] INFO - prefect.TaskRunner | Task 'task a': finished task run for task with final state: 'Success'
    [2020-06-05 17:15:04,576] INFO - prefect.TaskRunner | Task 'task b': Starting task run...
    [2020-06-06 05:22:27,199] INFO - prefect.TaskRunner | Task 'task b': finished task run for task with final state: 'Success'
    [2020-06-08 14:23:12,015] INFO - prefect.TaskRunner | Task 'task c': Starting task run...
    Is there anything that I could have accidentally entered in the terminal that would have put it into some type of 'wait' state? Or any other suggestions about things I could check?
    n
    a
    • 3
    • 5
  • a

    Avi A

    06/08/2020, 7:10 PM
    Hey, what would you say is the “prefect” way to implement a simple groupBy (or some other reduce operation)? Suppose I have data from several sources (for example partitions) and would like to perform a groupBy+count on the data. With Dask I’d simply load it to a dask dataframe and do the operation on the dataframe. With prefect, the only example shows how to do a simple sum and you basically implement the map-reduce at a low level instead of specifying it logically. So, how would you recommend doing this?
    n
    • 2
    • 12
  • a

    Avi A

    06/08/2020, 9:08 PM
    Another question! I notice that some of the tasks on my flow are running multiple times, even though they have finished successfully (so they finish successfully 5 times). What could cause such a behavior?
    c
    • 2
    • 11
Powered by Linen
Title
a

Avi A

06/08/2020, 9:08 PM
Another question! I notice that some of the tasks on my flow are running multiple times, even though they have finished successfully (so they finish successfully 5 times). What could cause such a behavior?
c

Chris White

06/08/2020, 9:10 PM
This is almost always caused by Dask rescheduling work; we already have a “version locking” feature in Prefect Cloud that we hope to make more widely available in the very near future that prevents your tasks from running twice will still preserving a clean user experience
a

Avi A

06/08/2020, 9:13 PM
thanks, you mention it runs twice but I see it’s running 5 times. have you seen such a behavior too?
c

Chris White

06/08/2020, 9:13 PM
are you using the
LocalDaskExecutor
by any chance?
a

Avi A

06/08/2020, 9:30 PM
yes I am
but is it really different than setting a local “cluster” and using
DaskExecutor
?
c

Chris White

06/08/2020, 9:34 PM
yea that executor is known to rerun tasks, but I’ll admit I’ve never heard of 5 reruns! Yea it’s a total different scheduler
a

Avi A

06/08/2020, 9:35 PM
so do you recommend I stop being lazy and move to
DaskExecutor
?
c

Chris White

06/08/2020, 9:39 PM
Yea I think that is the simplest solution to your rerun issue
:prefect: 1
🚀 1
a

Avi A

06/21/2020, 11:08 AM
@Chris White following up here. I am using
DaskExecutor
now. The scheduler and the worker themselves are still local, but I specify them in different processes in the system
I’m still seeing successful tasks running several times for no reason.
moreover, it seems it doesn’t schedule that many tasks concurrently (the scheduler has 4*8 threads available), and the flow hangs after running a few tasks (there are 3*250 mapped tasks to run, and it only ran 65 of them, and hasn’t scheduled the rest yet)
View count: 1