https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Dmitry Dorofeev

    08/31/2019, 9:20 PM
    Hi all, assuming I have a config file with unknown number of jobs defined. Each job should run in parallel. I can easily define a prefect Flow for each job, but how can I fire all Flows from one python process? What if I want to fire each Flow periodically with cron scheduler? Currently I fork() separate python process for each Flow, is that right?
    a
    j
    • 3
    • 10
  • j

    Jonah Benton

    09/01/2019, 4:19 PM
    Hi folks, dumb questions I'm having a hard time finding answers to- What happens in a Dask cluster if/when your dask-scheduler process dies? Tasks continue to run to completion on workers but nothing new gets scheduled? Can you just restart the scheduler and work picks up? Is workflow state lost if the scheduler dies; if not, where does the scheduler keep it? Happy to be pointed to documentation on these questions, I just don't see lower level operational details discussed in the Dask site docs.
    j
    • 2
    • 3
  • f

    Feliks Krawczyk

    09/02/2019, 5:18 AM
    Just a quick question. With Prefect, I assume you are developing the code in such a way that things like this: https://issues.apache.org/jira/browse/AIRFLOW-5240 Don’t happen?
    j
    • 2
    • 3
  • f

    Feliks Krawczyk

    09/02/2019, 5:36 AM
    I mean #python, and external open source projects being bad at updating things.. but wow so frustrating when you run into these types of issues 😆
  • m

    Mikhail Akimov

    09/03/2019, 12:18 PM
    Where do the task_state_handlers get called? In the main flow (scheduler) process or on the workers (dask, subprocess etc.) I'm passing an object to task_state_handlers through a closure, and with sequential/threads LocalDaskExecutor they work OK, but with processes it's clear that the passed object isn't the same instance for all task runs I was under the impression that the scheduler catches state changes and calls all the callbacks, while the executor is only responsible for, well, the execution of work.
    c
    j
    • 3
    • 12
  • e

    emre

    09/04/2019, 11:51 AM
    I have a task that maps on a list of
    table_names
    and loads some files in and s3 bucket, under prefix=
    table_name
    . I wanted to add a
    state_handler
    that deletes any leftover file from the failed task, should my task go in a
    Failed
    state. The problem is that the files I want to delete are identifiable by a task input. I have failed to find a way to access input parameters from the state_handler callback. I thought
    task.inputs()
    would get me what I wanted, but that had only type informations. Any suggestions?
    m
    c
    • 3
    • 6
  • j

    Jan Therhaag

    09/04/2019, 3:07 PM
    Hi - I have a question regarding the usage of Dask collections in prefect flows. Basically what I'm trying to accomplish is: - reading a bunch of parquet files from disk repeatedly (say once a day) - combine them into a Dask dataframe and do several transformations - write out the dataframe to a Kartothek dataset (basically also a collection of parquet files with metadata if you don't know the package)
  • j

    Jan Therhaag

    09/04/2019, 3:10 PM
    Basically I wonder what happens when I create a Dask dataframe within a task - can I still take advantage from parallelization over partitions of the collection when transforming it?
    c
    j
    • 3
    • 4
  • a

    Aakarsh Nadella

    09/05/2019, 8:53 PM
    Hello Everyone, I have 2 questions. 1. Can we have more than 1 DAGs running concurrently in the same environment ? 2. Can we have executing different instances of same DAG running concurrently, with each instance having different parameters ?
    c
    • 2
    • 8
  • m

    Mikhail Akimov

    09/06/2019, 8:42 PM
    Hi! I'm trying to run my flow from another file, importing it to inject custom state handlers/do other stuff:
    >>> from example_flow import flow
    >>> from prefect.engine.executors import DaskExecutor
    >>> flow.run()  # this works
    [2019-09-06 20:38:53,240] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
    [2019-09-06 20:38:53,241] INFO - prefect.FlowRunner | Starting flow run.
    [2019-09-06 20:38:53,243] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
    .........
    >>> flow.run(executor=DaskExecutor(address="<tcp://127.0.0.1:8786>"))  # this doesn't
    [2019-09-06 20:39:52,658] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
    [2019-09-06 20:39:52,659] INFO - prefect.FlowRunner | Starting flow run.
    [2019-09-06 20:39:52,758] ERROR - prefect.FlowRunner | Unexpected error: ModuleNotFoundError("No module named 'example_flow'",)
    c
    • 2
    • 27
  • m

    Mikhail Akimov

    09/06/2019, 8:45 PM
    example_flow.py, just in case
    example_flow.py
  • o

    oliland

    09/07/2019, 1:44 PM
    I'm confused by logging. It seems like Prefect configures its own logging at import time (ugh) and that it's configured to only log warning messages from tasks. How do I change this to ensure all INFO messages are displayed when running locally?
    j
    • 2
    • 20
  • o

    oliland

    09/07/2019, 2:30 PM
    Apologies for starting off with a complaint - I should introduce myself. I'm evaluating Prefect for building workflows for transit data processing. I really like it so far and was able to get something up and running really quickly with the tutorials. I really like the direction of the project so far - it's what Airflow should have been! Highly modular with a vastly higher quality, python native codebase. This is the most promising ETL tool I've evaluated so far so I'm committing some time to building out a small scale prototype. Thanks for your help so far 🙏
    😄 2
    :marvin: 4
    j
    • 2
    • 1
  • o

    oliland

    09/07/2019, 2:46 PM
    One feature I'm interested in understanding better is how to manage data lineage / files downloaded from the internet. Let's say I have a versioned bucket on S3, and I want tasks to run if 1) the version hasn't been run before or 2) downstream tasks haven't succeeded for that data version before. I wonder if this is achievable in prefect (I understand it's evolved from Airflow which relies on execution dates for managing state)
    j
    c
    • 3
    • 26
  • k

    KJ

    09/07/2019, 3:51 PM
    @Jeremiah I am building a fintech ETL and have built some parts out with Prefect and it’s working great! 😃 Do you have any suggestions or tips? What is crucial versus nice to have.
    🎉 2
    j
    • 2
    • 2
  • a

    Akash

    09/08/2019, 7:43 AM
    I'd a thought about this PIN ~ https://docs.prefect.io/guide/PINs/PIN-09-CLI.html The proposed CLI is of the form
    prefect <VERB> <OBJECT>
    . Would it not make more sense for it to be of the form
    prefect <OBJECT> <VERB>
    ? The above is based on my experience with the gcloud and aws cli tools. Usually, the user is sure of the object they want to interact with but not necessarily the verb to be used.
    j
    • 2
    • 1
  • f

    Feliks Krawczyk

    09/09/2019, 4:07 AM
    Hey sorry this maybe a total n00b question, but coming I’m coming from a heavy Airflow background and trying to understand the correct Prefect way of doing things, I’m digging around the docs but I haven’t found a simple example that I want.. I want to create a simple flow: 1. Send a ping (start) 2. Try something 3.a) If step2 is successful send ping (complete) b) Else send failed ping Now in Airflow land I would instantiate 4 operators,
    Send Ping -> Try Something -> Send Ping (Success)
                                -> Send Ping (Failed)
    With the 3a and 3b having either
    trigger_rule="all_success"
    or
    trigger_rule="one_failed"
    . What is the correct way to do this with Prefect? I see references to handlers but I’m not quite sure what the right way to do what I want is?
    c
    • 2
    • 1
  • f

    Feliks Krawczyk

    09/09/2019, 4:09 AM
    Looking at Prefect I think I want to be doing something like:
    with Flow('dummy prefect job') as flow:
        SendStatusTask('running')
        DoSomthethingTask()
        SendStatusTask('finished')
        SendStatusTask('failed')
    But I don’t exactly know the correct way of doing the triggers in Prefect land?
  • f

    Feliks Krawczyk

    09/09/2019, 4:11 AM
    Or is this even the correct way I should be designing a flow ?
  • f

    Feliks Krawczyk

    09/09/2019, 4:12 AM
    I definitely don’t want to get trapped in trying to do create Prefect flows the Airflow way
  • c

    Chris White

    09/09/2019, 4:12 AM
    Hi Feliks; Prefect also has a concept of triggers that will do exactly what you want: https://docs.prefect.io/guide/core_concepts/execution.html#triggers
  • f

    Feliks Krawczyk

    09/09/2019, 4:13 AM
    Ahhh… so I searched for “Triggers” and it gave me: https://docs.prefect.io/api/0.5.4/triggers.html#functions
  • c

    Chris White

    09/09/2019, 4:14 AM
    Yea, we are still working on improving our search index but those are a list of available triggers for tasks; the most up to date list is available here: https://docs.prefect.io/api/unreleased/triggers.html
  • f

    Feliks Krawczyk

    09/09/2019, 4:15 AM
    Ah thank you so much. Sorry I hate asking these types of question when you’ve got comprehensive documentation. Definitely went through most of the docs but clearly didn’t associate Execution with what I wanted… Thank you so much!
  • c

    Chris White

    09/09/2019, 4:15 AM
    No worries at all - anytime!
  • f

    Feliks Krawczyk

    09/09/2019, 5:53 AM
    Does anyone know of anyone creating a Databricks Task for Prefect?
    j
    • 2
    • 4
  • d

    Dmitriy

    09/09/2019, 4:46 PM
    Has anyone seen a
    ValueError('Could not deserialize key data.')
    when trying to create a BQ table with
    CreateBigQueryTable
    task? I've got my credentials in a
    GOOGLE_APPLICATION_CREDENTIALS
    environment variable as a JSON string and my
    ~/.prefect/config.toml
    as:
    [cloud]
    use_local_secrets = true
    [context.secrets]
    GOOGLE_APPLICATION_CREDENTIALS = "$GOOGLE_APPLICATION_CREDENTIALS"
    I'm not passing anything for
    credentials
    to the task itself (since it already defaults to
    GOOGLE_APPLICATION_CREDENTIALS
    ). I'm using Python 3.7.4 and
    prefect==0.6.1
    . Below is the exact output.
    [2019-09-09 16:42:23,995] INFO - prefect.TaskRunner | Task 'Create Table': Starting task run...
    [2019-09-09 16:42:23,996] INFO - prefect.TaskRunner | Unexpected error: ValueError('Could not deserialize key data.')
    [2019-09-09 16:42:23,996] INFO - prefect.TaskRunner | Task 'Create Table': finished task run for task with final state: 'Failed'
    If anyone has any suggestions it's much appreciated.
    c
    m
    • 3
    • 6
  • r

    Ryan Abernathey

    09/09/2019, 6:26 PM
    Thanks @Chris White for stopping by the Pangeo ML Working Group meeting today. I’ve got a couple of follow-up questions. Let me know if any of these should be escalated to GitHub issues. The main question is about
    ResultHandler
    objects. In Pangeo, our I/O stack is something like Google Cloud Storage <- GCSFS <- Zarr -< Xarray. I would like a Prefect task to write data to GCS. The normal way I would do this (without Prefect) is:
    python
    ds = # ... create xarray Dataset
    gcfs_w_token = gcsfs.GCSFileSystem(project='pangeo-181919', token=token)
    gcsmap = gcsfs.GCSMap(path, gcs=gcfs_w_token)
    ds.to_zarr(gcsmap)
    Obviously I can do that from within a Prefect task, but it kind of seems like I should be using a
    ResultHandler
    . Can you point me to any examples of custom handlers? (Bonus points if they show how to use secure credentials.) Thanks again for an awesome tool.
    😁 2
    c
    m
    • 3
    • 8
  • r

    Ryan Abernathey

    09/09/2019, 7:42 PM
    I’m trying to figure out how to set GOOGLE_APPLICATIONS_CREDENTIALS via the Secrets API. I read the docs (https://docs.prefect.io/cloud/cloud_concepts/secrets.html#setting-a-secret), but only found examples of “simple” secrets (i.e. KEY = VALUE). The Google Cloud service account credentials are a .json file with many fields. How should I set the secret in this case?
    c
    a
    • 3
    • 29
  • a

    Akash

    09/10/2019, 12:46 PM
    Does Prefect core provide any metrics out-of-the-box? Reference: https://airflow.apache.org/metrics.html If not, is it in the roadmap?
    c
    • 2
    • 1
Powered by Linen
Title
a

Akash

09/10/2019, 12:46 PM
Does Prefect core provide any metrics out-of-the-box? Reference: https://airflow.apache.org/metrics.html If not, is it in the roadmap?
c

Chris White

09/10/2019, 3:40 PM
Hi Akash, this is the sort of thing that Cloud does provide out of the box, but storage / tracking of this information requires a persistence layer of some kind
View count: 1