https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Chris Hart

    04/06/2020, 8:15 PM
    looking to use prefect with dask-ml on the default DaskExecutor.. how might I get the sklearn bits to use dask as the joblib backend as referenced here: https://ml.dask.org/joblib.html ?
    j
    m
    • 3
    • 8
  • r

    Richard

    04/06/2020, 8:52 PM
    First time on here with at utterly Prefect naive question. I have an overwrought, buggy 'workflow' framework (called gendarme, never realized publicly) I developed to assist my computational biology research -- started it about 15 years ago without knowledge of other workflows other than Make. One of my needs was (and is) to fully define the output of every task in terms of the specification values that are used by the task itself, and the specification of all upstream tasks. By specifications, I mean parameters or variables that fully define, along with the code, the output -- ie not all parameters to a function are defining specifications (of course one might have intentional or unintentional stochasticity which create caveats to the notion of being fully defined). A single task tends to be a long running computationally expensive, often memory intensive, perhaps parallelized fairly complex procedures often defined by multiple classes. I might have several dozen or more of these tasks to approach a meaningful goal, but the final goal has usually been a moving 'goalpost' as yet another layer of analysis seems like a good idea at the time. A full DAG may well be years in the building up of analyses -- though likely at that point tasks (nodes, I called them cells) have been swapped out. Unlike Make (and as far as I can tell, Luigi and similar workflows), the gendarme DAG is discovered upon runtime rather than predefined. That was convenient, but also because the DAG is defined over instantiated tasks defined by their parameters, I never found much point in predefining the DAG externally. I am at a major decision point of either doing a complete overhaul of gendarme or abandoning it and adopting some other workflow and modifying to my needs. I think those needs at this point distills in the ability to store intermediate results (they may have been final results until a downstream task was written that requires them) with the specification metadata that fully defines them, so that they can be retrieved when needed and not rebuilt without need. Time completed might be one of many specs, but typically a task might have a half dozen critical specifications that would need to match the specification state of the calling downstream task in order for the existing output to be congruent and useful. Part of the challenge has been that the downstream task knows its own specs, but it doesn't know the subset of the specs that define its upstream dependencies. One can't simply query the entire spec space as the DAG may have hundreds of parameters, but only those used in the requested subgraph are relevant to defining the outputs. Gendarme passes spec space both upstream to search for existing output, or build anew if it doesn't exist, and then back downstream to the calling task to capture the full description of each output, and then stored it as metadata in an external db for fast querying. I will need to do something like that again, both for efficiency, but also a scientific integrity position of fully knowing what ones results represent (and part of this is the ability to truly isolate training data influences on ever receding final test sets). From a high level perspective, each task (gendarme calls them cells) calls other cells when required. If there is a breadth first part of the DAG one can explicitly run them concurrently for efficiency. Otherwise the graph runs as it is discovered depth first. Sorry for the long winded request. If that made any sense, my question to you is -- should I pursue Prefect for these needs? Clearly I have a fairly extreme case use on the side of complex and slow. My datasets have been big historically, but not by today's standards -- it is the complexity and need for incremental building of fully defined outputs that appears to be more unusual. Prefect looks inspired regardless, and eventually I'll want to learn more. But I would rather not get side-tracked at this moment studying the guts it there is a clear logical incompatibility.
    j
    d
    • 3
    • 9
  • b

    Brad

    04/06/2020, 9:12 PM
    Hi team - is it possible to schedule a flow with parameters? I’m getting some
    Flows with required parameters can not be scheduled automatically
    errors when trying to create one with
    parameter_defaults
    j
    s
    • 3
    • 10
  • b

    Brad

    04/06/2020, 9:13 PM
    I have a flow I’d like to run on a cron schedule, with different parameters on each run
  • z

    Ziyao Wei

    04/06/2020, 9:56 PM
    Potentially dumb Q - is there a way to not update the flow version when doing a
    flow.register()
    ? So that if multiple identical tasks (either autodetect through hashing the flow object (?) or using a client provided ID) are submitted through different agents all of the agents can run the same flow
    c
    • 2
    • 4
  • b

    Ben Fogelson

    04/06/2020, 10:36 PM
    I’ve seen subflows mentioned a few different places around here. I know they aren’t supported in a first-class way yet, and was wondering if they are on the roadmap and if so whether there is an expected release date. It would help us decide how much working effort we need to put into workarounds for the short term
    j
    • 2
    • 8
  • j

    Jackson Maxfield Brown

    04/06/2020, 11:55 PM
    Looking for potential insight that others may have. We are running a prefect core workflow using the
    DaskExecutor
    which in turn connects to a
    SLURMCluster
    from
    dask_jobqueue
    . There are about 14000 items to process in a mapped task and each result in that map returns
    None
    so no worries on memory management there. However, the workflow runs until about ~2200 tasks are completed then errors and restarts the whole workflow. The errors that I see on the main workflow / scheduler thread all follow something like:
    distributed.core - ERROR - 'process_fov_row-e7df27e7-ca99-433d-b1e1-c4a3f6b2a683-61d201c4-4edb-4243-9e76-4b90c3aa7835-1474'
    Traceback (most recent call last):
      File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/core.py", line 411, in handle_comm
        result = await result
      File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 1758, in add_worker
        await self.handle_worker(comm=comm, worker=address)
      File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 2714, in handle_worker
        await self.handle_stream(comm=comm, extra={"worker": worker})
      File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/core.py", line 472, in handle_stream
        handler(**merge(extra, msg))
      File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 2675, in handle_long_running
        ts = self.tasks[key]
    KeyError: 'process_fov_row-e7df27e7-ca99-433d-b1e1-c4a3f6b2a683-61d201c4-4edb-4243-9e76-4b90c3aa7835-1474'
    Any ideas what could possibly be causing this issue? (This seems like a
    dask
    question and not too much
    prefect
    but thought someone here may know)
    c
    j
    a
    • 4
    • 9
  • a

    Argenis Leon

    04/07/2020, 3:58 AM
    Hi, I am trying to run prefect server from Google Cloud Platform but I am getting
    (rapids_013) argenisleon@rapids:~$ prefect server start
    Pulling postgres  ... done
    Pulling hasura    ... done
    Pulling graphql   ... done
    Pulling scheduler ... done
    Pulling apollo    ... done
    Pulling ui        ... done
    Creating network "prefect-server" with the default driver
    Creating cli_postgres_1 ... done
    Creating cli_hasura_1   ... done
    Creating cli_graphql_1  ... done
    Creating cli_scheduler_1 ... done
    Creating cli_apollo_1    ... done
    Creating cli_ui_1        ... 
    Creating cli_ui_1        ... error
    
    ERROR: for cli_ui_1  Cannot start service ui: driver failed programming external connectivity on endpoint cli_ui_1 (ab05e1d530a187151aa2ca7d03d0cdc964c4ed96f6ddb41ca0173850d2719ae8): Error starting userland proxy: listen tcp 0.0.0.0:8080: bind: address already in use
    
    ERROR: for ui  Cannot start service ui: driver failed programming external connectivity on endpoint cli_ui_1 (ab05e1d530a187151aa2ca7d03d0cdc964c4ed96f6ddb41ca0173850d2719ae8): Error starting userland proxy: listen tcp 0.0.0.0:8080: bind: address already in use
    ERROR: Encountered errors while bringing up the project.
    Any help?
    c
    • 2
    • 3
  • o

    Ogaday

    04/07/2020, 10:43 AM
    I'm having some trouble with prefect Context. According to the docs, I can use context for time sensitive tasks and backfills (https://docs.prefect.io/core/faq.html#does-prefect-support-backfills). However, I've had a number of problems with this approach. I can't supply a datetime to the context of a flow which is run on a schedule, as that datetime is never recalculated after the initialisation of the flow run. And I can't seem to overwrite the default time values in the prefect context as described here: https://docs.prefect.io/core/concepts/execution.html#context Finally, a weird thing is that the date values (such as prefect.context.today) seems to be a strings, while the datetime values (such as prefect.context.date) are datetime objects. Am I doing anything wrong? I'm on Prefect 0.9.8 In the meantime, I'm going to look at using parameters instead of context.
    my_flow.py
    c
    m
    • 3
    • 7
  • b

    Brett Naul

    04/07/2020, 9:09 PM
    think I know the answer to this (no?) but is there a way to use values like
    today
    from https://docs.prefect.io/api/latest/utilities/context.html as parameter defaults? right now I have a separate task that wraps each date param
    n
    c
    • 3
    • 3
  • m

    Manuel Aristarán

    04/07/2020, 9:09 PM
    Is there any best practice to obtain a Flow ID by the Flow name with 
    prefect.client.Client
    ? my use case is triggering a Flow from an app.
    b
    j
    • 3
    • 2
  • b

    Brad

    04/07/2020, 11:13 PM
    Hi team - I am trying to run a local agent from a virtualenv alongside the new server, but when I try and trigger a flow from the UI, my agent reports
    [2020-04-07 22:52:35,213] ERROR - agent | Error while deploying flow: FileNotFoundError(2, "No such file or directory: 'prefect'")
    c
    • 2
    • 13
  • b

    Brad

    04/07/2020, 11:18 PM
    I’ve also raised this issue - https://github.com/PrefectHQ/prefect/issues/2296 if anyone would like to comment on if this is the expect usage of
    parameter_defaults
    ?
    👀 1
    c
    j
    • 3
    • 6
  • b

    Ben Fogelson

    04/08/2020, 4:35 PM
    Apologies if this is in the docs somewhere and I just couldn’t find it. Are there hooks for task input and output validation?
    d
    • 2
    • 9
  • a

    Arsenii

    04/09/2020, 2:54 AM
    Hey all... A very basic problem -- but I can't find where I can change my password in the Cloud UI?? Thanks!
    c
    • 2
    • 1
  • j

    Jayakar

    04/09/2020, 8:37 AM
    Hello, I got interested in Prefect as I was investigating Python based orchestration engine. First off, can we use MYSQL as the underlying DB instead of Postgres. Our use case is that we want to define a flow with some tasks. And that flow's trigger should be event driven (say a message in queue). We can have potentially 100's of thousands of those triggers in a day, and the tasks are short tasks. But the individual tasks involve calling some api, but we get the response asynchronously. So the next task can only proceed after previous task was successful, but we need another trigger to inform the flow that previous task was successful. I wonder if Prefect can be used to solve this particular use case ? Also we'll use a self-hosted env, if that makes any difference.
    j
    z
    • 3
    • 5
  • t

    Toby Coleman

    04/09/2020, 11:01 AM
    Hi all, Firstly just wanted to introduce myself to the group - I'm a data scientist specialising in renewable energy and IoT data. I've just come across Prefect after having used Airflow on a few projects, so am looking forward to learning more and trying it out. Also a request: I'm currently researching different ways people are building data pipelines for their projects. What kind of challenges/solutions have you come across, and what are the best ways of using tools like Prefect as part of larger projects. I'd really appreciate a 10-15min Skype/Zoom chat with anyone with experience in this area, and will happily share findings with the group afterwards. Do please get in touch if you are able to spare a few moments. 😁 Thanks!
    👋 4
    j
    • 2
    • 1
  • d

    David Ojeda

    04/09/2020, 1:50 PM
    Hello again, moving forward on my quest to upgrade to the newest prefect version and use the newly open-sourced ui… One question whose answer eludes me completely: when registering a flow, how can I set the default context for it? A bit of background: we have many flows with tasks that use context variables to share common parameters, for example, a URL to some external API. What I would like is to schedule the run of a flow with a particular value for this context variable, but I can’t find where I can set it. The closest thing I could find was setting different parameters (not context values) when setting the schedule with an IntervalClock. Another alternative could be to change my context variables to task parameters, but that would require a lot of changes.
    j
    • 2
    • 6
  • s

    shazard

    04/09/2020, 2:28 PM
    Hello there ! I am an engineer working for a small french company looking to improve - actually create - our data science workflows. I have been looking at prefect for a couple days now and it looks really good, especially with the open-sourced Core server. One feature we really need and I have not been able to read about in the documentation/GitHub Issues is running flows (or even tasks) on GPU instances such as EC2 ones. As Fargate does not support GPU instances at this time, I guess the Fargate Task Environment is not good enough. As a newbie it is entirely possible that I am misunderstanding how Agents and Environments work, but I was envisioning creating a custom Environment that requests instances from EC2 as a setup and runs the flow on it. Is that something that would be feasible ? Or maybe there is an easier way to do it ? Anyways, thank you for you work on this project !
    j
    j
    • 3
    • 6
  • c

    Chris Hart

    04/09/2020, 5:26 PM
    I've got a task that outputs a trained ML model, which has already been serialized due to running on the DaskExecutor.. is there a way to load the already cloudpickled object for a following persistence task to just save it directly rather than re-serializing? (or I may be misunderstanding and the only serialized thing floating around is the whole task itself, which should mean that i need to serialize the model again but that's ok)?
    k
    c
    j
    • 4
    • 8
  • v

    Viv Ian

    04/09/2020, 11:18 PM
    Hi - I’m very new to prefect (previously and currently using Airflow) and engineering, so I apologize if these seem like silly questions. Wondering: • What is the ideal structure? For example, airflow wants a separate project dedicated to just airflow stuff (dags, operators, hooks, plugins, etc) independent from the actual app. Ideally, I would like to host all prefect stuff in a separate ec2 from the app. Which leads to the next question… • My app is in Django; as a result, I would need to import models to use the django ORM. The app is a private project in github running on an ec2. Currently, with Airflow, I need to import the entire app into the dags folder in order to access the models. Is it possible to pip install a github repo into the prefect stuff and import into each model as needed (without running
    django.setup()
    )? • Is there a way to manually trigger a Flow? For example, I design a flow to do some calculations, and I only want it to run if a user clicks a button. Is there an API endpoint I can use to fire it off? THANKS!
    👋 4
    n
    • 2
    • 3
  • s

    shazard

    04/10/2020, 7:26 AM
    Hello there ! Would there be any reason explaining why a flow does not generate all mapped tasks it should ?
    @task
    def dummy_task(param):
        pass
    
    
    with Flow("test") as flow:
        dummy_task.map(list(range(100)))
    
    flow.run()
    flow.register()
    This code does run correctly locally, but gets stuck running the 64th task when manually ran from the Core Server UI. Since it always gets stuck on the same task I suppose this has to do with some resource or process limit but I can't seem to point exactly what it can be. Any idea ?
    j
    • 2
    • 5
  • j

    Jie Lou

    04/10/2020, 3:20 PM
    Hi All. I have a question about retries on mapped tasks. A little background: we use prefect cloud with Dask workers to run multiple tasks in parallel. The chances are some of the workers could die when the flow is running, and we hope retries can help recover the task run. Here is a simple flow with a cal function:
    @task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
    def cal(x):
        print("starting sleep {} seconds".format(x))
        time.sleep(x)
        return x
    
    with Flow("test", result_handler=s3_result_handler) as flow:
        time = Parameter("time", default=[60])
        results = cal.map(time)
    When the cal task was running, I manually killed the worker, and kept observing the log in Cloud UI, but found this
    starting sleep 60 seconds .    #then I killed the worker...
    Task 'cal[0]': Starting task run...
    Task 'cal[0]': task is already running.
    Task 'cal[0]': finished task run for task with final state: 'Running'
    1 mapped tasks submitted for execution.
    Task 'cal': Handling state change from Mapped to Mapped
    Task 'cal': task has been mapped; ending run.
    Task 'cal': finished task run for task with final state: 'Mapped'
    Flow run RUNNING: terminal tasks are incomplete.
    Marked "Failed" by a Zombie Killer process.
    It seems that the state of task does not change from running after I killed the worker. And that’s why the flow was finally tagged as zombie. However, if cal task is a regular task without mapping, the retries mechanism worked as expected. I just tweaked the flow above a little bit:
    @task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
    def cal(x):
        print("starting sleep {} seconds".format(x[0]))
        time.sleep(x[0])
        return x[0]
    
    with Flow("test", result_handler=s3_result_handler) as flow:
        time = Parameter("time", default=[60])
        results = cal(time)
    Still, when the task was running, I killed the worker. And this time retries worked. Here are the logs:
    starting sleep 60 seconds #then I killed the worker...
    Task 'time': Starting task run... 
    Task 'time': Handling state change from Pending to Running
    Task 'time': Calling task.run() method...
    Task 'time': Handling state change from Running to Success
    Task 'time': finished task run for task with final state: 'Success'
    Task 'cal': Starting task run...
    Task 'cal': Handling state change from Pending to Running
    Task 'cal': Calling task.run() method...
    starting sleep 60 seconds
    and then the flow finished successfully. I would expect mapped tasks and regular tasks can handle this issue in a consistent way, but not sure why mapped tasks did not survive a killed worker. Sorry for this long message, and any thoughts are welcome. Thanks!!
    j
    a
    • 3
    • 5
  • j

    Jacob (he/him)

    04/10/2020, 4:28 PM
    Hello everyone 👋 I’m a data engineer tasked with choosing our new orchestration/monitoring tools and am interested in using Prefect with Redshift. Is this a design pattern that other Redshifters are using? 1. api call to gather source data 2. schema test 3. write data to s3 4. copy command to redshift temp table (using psycopg2 connection) 5. append/insert/merge to staging table
    a
    • 2
    • 4
  • j

    John Ramirez

    04/10/2020, 8:30 PM
    Hey is there a way to run the same flow is parallel with different parameters
    👍 1
    c
    b
    • 3
    • 4
  • m

    Mitchell Bregman

    04/10/2020, 10:28 PM
    Hey there! I was able to setup the Prefect Local UI just fine on an EC2... However, when I register the flow, it does not show up in the UI and "Upcoming Runs" gets stuck loading... I was wondering if anyone else is experiencing this? It seems like the flow UUID gets picked up just fine on the backend local server and when I
    flow.register
    but nothing seems to be showing up
    c
    n
    +3
    • 6
    • 53
  • l

    Leo Meyerovich (Graphistry)

    04/11/2020, 12:03 AM
    Is there a healthcheck route we can use for an
    agent
    /
    executor
    docker service? I saw a feb 5 PR around some storage healthchecks, but not seeing docs for instrumented monitoring here. Ideally something curl-able, like,
    healthcheck:
          test: ["CMD-SHELL", "curl -sSf <http://prefect/health> | jq .code | grep 200 || exit 1"]
    👍 1
    c
    • 2
    • 29
  • l

    Leo Meyerovich (Graphistry)

    04/11/2020, 12:50 AM
    I guess we can do something hackish like: -- docker host-level metrics as usual -- for prefect executor health, use unique per-executor labels and schedule healthcheck jobs on them, and have the docker healthcheck check for the output of those... quite hackish..
  • z

    Ziyao Wei

    04/11/2020, 3:17 AM
    Is there an env var to change the server the UI points to?
  • z

    Ziyao Wei

    04/11/2020, 3:18 AM
    Tried
    PREFECT_API_URL
    but doesn’t seem to work (seems obvious in retrospect)
    c
    • 2
    • 10
Powered by Linen
Title
z

Ziyao Wei

04/11/2020, 3:18 AM
Tried
PREFECT_API_URL
but doesn’t seem to work (seems obvious in retrospect)
c

Chris White

04/11/2020, 3:21 AM
Hi @Ziyao Wei - setting
PREFECT__CLOUD__API
should work!
oh sorry you’re asking about the UI
z

Ziyao Wei

04/11/2020, 3:22 AM
Yep - I’m trying to run the server + UI on a server, but the UI is trying to query localhost:4200
c

Chris White

04/11/2020, 3:22 AM
check out the discussion on this github issue for how to update where the UI points to: https://github.com/PrefectHQ/prefect/issues/2237#issuecomment-609917317
z

Ziyao Wei

04/11/2020, 3:23 AM
so sed is still the method for now?
(Thanks! Way better than scratching my head for hours)
👍 1
c

Chris White

04/11/2020, 3:24 AM
Sed will work but the true way is to rebuild the site with the value that you want; the docker image that server uses by default is a pre-built site which means the value is hard coded unless you rebuild
👍 1
z

Ziyao Wei

04/11/2020, 3:28 AM
Cool - so if we rebuild we can specify the server URL in an env var?
c

Chris White

04/11/2020, 3:30 AM
Note quite - you need to update the .env file / references that Scott references in that issue; most JavaScript applications don’t respond to your local environment unfortunately
👍 1
z

Ziyao Wei

04/11/2020, 4:53 AM
Thanks for the help - that worked!
💯 1
View count: 1