https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
data-tricks-and-tips
  • k

    Kevin Kho

    04/26/2022, 6:15 PM
    set the channel description: Discussion place for all things data - ask about SQL queries, Python libraries, databases, visualization and more.
  • a

    Anna Geller

    04/26/2022, 6:31 PM
    Welcome, data people! If you want to chat about various trends in the data industry or share some tricks to work with data more efficiently, this is the best place to start a conversation! 👋
    🙌 2
  • a

    Anna Geller

    04/26/2022, 11:26 PM
    Let's kick this off! First topic: BI! There are many blog posts these days that BI is dead. Do you agree? Coincidentally to us creating this channel today, Maxime Beauchemin (considered by many as a "father" of data engineering) gave

    one of the best data talk▾

    s I've seen about the topic. As a person who is quite obsessed with naming things correctly (without using BS marketing terms), I love how Maxime disambiguates many terms used in the data industry, including: • why BI is a loaded term -- data consumption layer is much more appropriate these days • why headless BI is a complete BS term introduced by marketing folks. Maxime claims that while managing assets as code may have its merits, the industry needs a more pragmatic approach with tooling that can consolidate the everything-as-code world with the GUI world by building more user-friendly interfaces and tools that consolidate both in a bidirectional manner • why Gartner’s BI magic quadrant is also pretty much BS, especially these days when the Modern Data Stack allows you to pick and choose your stack, • why the semantic layer doesn’t make much sense these days and why has it failed to deliver on a promise of self-service -- instead, a more sensible approach is shifting this more toward the transformation layer • he also covered many trends in the data "space" including data apps - bringing the visualizations out of the BI layer into the tools we use every day. I highly recommend watching this talk!
    👀 1
    🙌 3
    💪🏽 1
    m
    4 replies · 2 participants
  • m

    Mansoor Hassan

    05/06/2022, 11:31 AM
    👋 Hello, team!
    👋 4
    a
    k
    2 replies · 3 participants
  • m

    Marcin Grzybowski

    05/30/2022, 8:35 AM
    Hello, I don't know if it's the best place for this beginner's question. I've tried to use SnowflakeQuery task and I got
    NameError: name 'err' is not defined
    when I run this line:
    from prefect.tasks.snowflake import SnowflakeQuery
    It leads me to this file
    /src/prefect/tasks/snowflake/__init__.py
    and this is the code from init.py that fails for me:
    try:
        from prefect.tasks.snowflake.snowflake import (
            SnowflakeQuery,
            SnowflakeQueriesFromFile,
        )
    except ImportError:
        raise ImportError(
            'Using `prefect.tasks.snowflake` requires Prefect to be installed with the "snowflake" extra.'
        ) from err
    what am I doing wrong? Thank you
    ✅ 1
    a
    5 replies · 2 participants
  • y

    Yossi

    06/29/2022, 10:08 AM
    Is there a way to fork/trigger a new Flow from the middle? For example, assume I have some successful Flow named "baseline", will it be possible to fork new runs out of it and adding new tasks/configurations?
    ✅ 1
    b
    a
    3 replies · 3 participants
  • g

    Guilherme Bordignon

    06/29/2022, 10:26 PM
    hey everyone, I have this task that gets a list of every object for each model passed and return a tuple with each list,
    @async_task()
    async def get_all_objects_from_models(
        models: List
    ):
        """
        Get all objects from each model from the @param models.
        """
        return await asyncio.gather(*[model.get_all_objects() for model in models])
    to be used like this
    all_objects_model_1, all_objects_model_2, all_objects_model_3 = get_all_objects_from_models([Model1, Model2, Model3])
    the problem is that I have to pass it a
    nout
    in the decorator or a
    Tuple
    in the return type annotation with the number of items inside (
    Tuple[List, List, List]
    ) for this to work, otherwhise a got this error `'Task is not iterable. If your task returns multiple results, pass
    nout
    to the task decorator/constructor, or provide a
    Tuple
    return-type annotation to your task.',` but I would like to make this function generic and pass and return as many models as I want. Is there a way to make it work?
    ✅ 1
    k
    a
    +1
    3 replies · 4 participants
  • k

    Kevin Focke

    07/02/2022, 3:13 PM
    Hello everyone, I'm having some issues with the orion.db growing too large. Context: I'm using Prefect Orion 2.06b with a persistent local sqlite database with the environment variable: export PREFECT_ORION_DATABASE_CONNECTION_URL="sqlite+aiosqlite:////home/sqlite_database/orion.db" The culprit for the growing size seems to be that I use large Pandas DataFrames and pass them through several flows. Is there any way to reduce the size of the orion.db? Can the output of a nested flow be deleted upon completion of the parent flow? Thanks for your help!
    a
    1 reply · 2 participants
  • d

    David Bender

    07/04/2022, 2:46 PM
    Hey. Having an issue with a simple deployment- when I attempt to "deployment create" I encounter a ModuleNotFoundError for any local imports. Even though the flow runs successfully when executed locally. Is this a bug or a property of the DeploymentSpec that I need to work around?
    ✅ 1
    a
    s
    +2
    11 replies · 5 participants
  • j

    Jarrod Clark

    07/08/2022, 3:04 PM
    Greetings. I am a bit of a Prefect noob, and I am having what seems on the surface to be a simple problem but I just can't figure it out. I have a flow that makes a SQL query, creates a pandas dataframe then uploads to an Elasticsearch index. When I use flow.run() in Jupyterlab it will run as expected, it registers with no errors but when I run it from the dashboard I receive the following error: 'FileNotFoundError: [Errno 2] No such file or directory: 'all_cases.sql' Anyone have any suggestions?
    k
    4 replies · 2 participants
  • j

    Jaco

    07/13/2022, 4:01 PM
    Hello everyone! I'm testing prefect to help in my mlops workflow. I need help to understand the following thing: when I do .result() for example:
    @flow def main(): train,test,unique_movie_titles,unique_user_ids = getData().result()
    i keep getting type errors like:
    tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot convert a Tensor of dtype variant to a NumPy array
    , am I limited in what type of return I need to have in my functions to use the .result()?
    ✅ 1
    a
    1 reply · 2 participants
  • j

    J

    07/14/2022, 6:53 PM
    Is it only Python that can be used with prefect?
    ✅ 1
    a
    3 replies · 2 participants
  • f

    Falk

    07/15/2022, 10:04 AM
    Hey everyone, I have a bit of an issue with one of my flows. I'm trying to read a yaml file containing configuration like this in my `flow.py`:
    with open(f"{os.getcwd()}{file_name}.yaml", "r") as file:
        return yaml.safe_load(file)
    Both
    flow.py
    and
    file.yaml
    are in the same directory. I also tried setting the path without cwd and like this:
    with open(f"{file_name}.yaml", "r") as file:
        return yaml.safe_load(file)
    but also no luck. I always get
    FileNotFoundError: [Errno 2] No such file or directory: '/file.yaml'
    Any ideas what is causing this? Do I have to specify files other than
    .py
    files for the agent somehow?
    d
    2 replies · 2 participants
  • r

    rami ben shalom

    07/20/2022, 12:00 PM
    Hey everyone, I'm having a bit of a problem, I'm using python logging library to export txt files with the run output for my function. Since I'm using cron scheduler I need to run the py file in windows task scheduler so the file always runs. Since it always runs the txt logging file still gets additional outputs since the flow never stops (I want txt file to represent a flow). I looked for quite a while for solutions to export the prefect flow output to separate txt for each run. I couldn't find the solution in the documentations, would very much appreciate the help! using prefect 1.0 - locally on windows server 2016 (using the prefect server is not an option currently).
    a
    1 reply · 2 participants
  • i

    ishan

    07/22/2022, 9:49 AM
    Hi - I have a question about orchestrating a DAG of tasks with the 2.0 API. Specifically, I am trying to write something like DBT, where I define table views dependent on other views. When materializing a view, it first materializes its dependencies, and then runs the code for itself. The dependencies for each view are available statically (i.e. in the type signature etc.). • When the dependency graph is a tree, execution is easy enough. I try to run the flow for view E; E evaluates its dependencies C and D, which recursively evaluate A and B before running themselves. ◦
    A -> C, B -> D, (C, D) -> E
    • When the dependency graph is a DAG, I am not sure how to approach it. E will try to evaluate B and C, but both could concurrently try to evaluate A, while we only want A to be executed once. ◦
    A -> B, A -> C, (B, C) -> E
    A solution to this is topological sorting the full graph, and executing views as soon as their parent dependencies have already been run. I wrote the code manually but curious if the prefect API can handle it for me.
    ✅ 1
    👀 1
    a
    a
    +1
    7 replies · 4 participants
  • a

    Alex

    07/25/2022, 9:58 PM
    Hello! I got a question regarding caching in Prefect 2.0 I found that caching with "task_input_hash" only works for a task if it is called from the same flow. If I call it from a different flow, but with the same parameters then caching does not work. Is this intended behaviour?
    a
    1 reply · 2 participants
  • j

    John Kang

    07/26/2022, 3:40 PM
    Hi all, I'm trying to test a flow that imports over a module from the contents of another file. When I run the flow locally it works fine but when I try to deploy it to the cloud (on 2.0b12) I receive an error that the module does not exist. Any tips on how to import modules from another file (in the same folder or another folder)?
    ✅ 1
    b
    a
    +2
    15 replies · 5 participants
  • s

    Stefan

    07/27/2022, 10:37 PM
    Total beginner question: When you start the 'prefect orion' server.. what do you guys use to make sure it restarts if it crashes, stays running after the terminal is closed, starts on boot etc?
    👀 1
    ✅ 1
    k
    c
    2 replies · 3 participants
  • m

    Marty Ko

    07/29/2022, 10:18 AM
    Hi my first time setting up Prefect and I seem to be failed in the first step
    ✅ 1
  • m

    Marty Ko

    07/29/2022, 10:18 AM
    ValueError: the greenlet library is required to use this function. dlopen(/opt/homebrew/lib/python3.9/site-packages/greenlet/_greenlet.cpython-39-darwin.so, 0x0002): tried: ‘/opt/homebrew/lib/python3.9/site-packages/greenlet/_greenlet.cpython-39-darwin.so’ (mach-o file, but is an incompatible architecture (have ‘x86_64’, need ‘arm64e’))
    ✅ 1
  • m

    Marty Ko

    07/29/2022, 10:18 AM
    I’m using a M1 machine. Is that an issue?
    ✅ 1
    a
    1 reply · 2 participants
  • a

    Andreas Nigg

    08/02/2022, 7:37 AM
    Hey there, I've a general question about best practices for data transformations, but not related to prefect itself. I how hope, this channel is appropriate. We use prefect to coordinate ingestion of data to our warehouse (bigquery). From there, we use dbt to transform them as we need. One of our data imports is rather huge (let's say 100GB in total to make it easy). We use airbyte to daily ingest additional 1GB. This daily ingest also creates a lot of duplicates (so the 100GB table already contains some of the rows, which are inserted with the daily insert) - this is due to underlying data structure and not much we can do about it. How would you actually go ahead and deduplicate this data? I would like to prevent daily reading 100GB of data, just for deduplication. Any ideas for that? Thanks already in advance 😄 🚀
    r
    a
    6 replies · 3 participants
  • e

    eddy davies

    08/02/2022, 10:54 AM
    The docs show use of docstrings as flow descriptions but I cannot get that working, thoughts?
    👀 2
    ✅ 1
    b
    k
    +1
    16 replies · 4 participants
  • n

    Nikita Kodenko

    08/04/2022, 6:19 AM
    Hi, I am experimenting with Prefect 1.2.+ in k8s cluster and I wonder how to pull the data from other places in this scenario: 1. Prefect server inside k8s cluster 2. Flows are started via KubernetesRun in gitlab storage Before flow started, I need to execute several shell commands, which will pull data from several sources. What's the best way to do it? I guess it is possible to define Task which executes shell commands and make it first task in the flow, but are there any other options? For example, modifying job template for KubernetesRun?
    👍 1
  • m

    Marty Ko

    08/08/2022, 8:27 AM
    Hi new to prefect I’m curious if V2 is actually production ready? I can tell at least from the UI there are a lots of features are missing like creating a flow run etc. If this is my first time setting it up is it better to start off with V1 first?
    ✅ 1
    a
    c
    +1
    13 replies · 4 participants
  • b

    Bigya Man Pradhan

    08/08/2022, 2:46 PM
    Hi everyone, I am using the cli commnad for prefect [2.0.1]
    prefect deployment build --help
    Which has a line
    --manifest-only Generate the manifest file only.
    But the yaml file is also being generated when the build command is run with manifest-only flag. Additionallly, adding the
    --output
    flag and setting an output name results in only the YAML file being renamed and not the manifest file. Is this an intended behaviour?
    ✅ 1
    a
    3 replies · 2 participants
  • a

    Adam Eury

    08/10/2022, 1:01 PM
    Hi everyone, I have a question about the the intended use of the
    --manifest-only
    flag. My understanding is that it skips the step of uploading the flow file to the configured storage location. Is it intended to support the use case of having the flow file added to a Docker image rather than remote storage like GCS or S3?
    ✅ 1
    a
    7 replies · 2 participants
  • c

    Chris L.

    08/10/2022, 1:24 PM
    Hello there, question about Prefect 2.0 and Dask. In Prefect 1.0, any Dask delayed computation within a Prefect task are "picked" up by the same Dask cluster that executes the Prefect flow (with DaskExecutor). However, in Prefect 2.0, I don't think this is the case. Using the dask distributed
    worker_client
    context manager doesn't seem to work either and raises a "no workers found" error, despite the fact that I can access the Dask dashboard in localhost showing Prefect 2.0 tasks. Below is a small reproducible example. You can see that in the screenshot that none of the
    inc
    double
    add
    delayed function calls are picked up in the dashboard. Wondering if there is some generic way for me to access the Dask task runner's cluster within tasks in Prefect 2.0?
    import dask
    import time
    
    from prefect import flow, task
    from prefect_dask import DaskTaskRunner
    
    
    @task
    def parallel_sum(data):
        def inc(x):
            return x + 1
    
        def double(x):
            return x * 2
    
        def add(x, y):
            time.sleep(5)
            return x + y
    
        output = []
        for x in data:
            a = dask.delayed(inc)(x)
            b = dask.delayed(double)(x)
            c = dask.delayed(add)(a, b)
            output.append(c)
    
        total = dask.delayed(sum)(output).compute()
        time.sleep(30)
        return total
    
    
    @flow(task_runner=DaskTaskRunner())
    def run_parallel_sum(data):
        parallel_sum.submit(data)
    
    
    if __name__ == "__main__":
        run_parallel_sum(list(range(1000)))
    z
    4 replies · 2 participants
  • a

    Angel Acosta

    08/11/2022, 3:41 PM
    Hello, I am new to Prefect. I scheduled a flow on prefect cloud. the flow should end up generating an excel report and saving it locally but I never see it. The flow runs successfully, and the timing looks right to generate the file. What I did find was an encoded blob file in my .prefect/storage folder.. is the how my file gets saved?? when I run the flow locally on Orion it outputs the report where I specify.
    ✅ 1
    k
    6 replies · 2 participants
  • y

    Yusuf

    08/15/2022, 7:46 PM
    Hello all! I was wondering if there was a way to do something similar to airflow sensors? I need a task to kick off everytime a new file of a certain format lands in azure blob storage. I think there might be a way for me to hook the storage account up to a queue and notification service and subscribe to an event grid etc. But I was wondering if there was a simpler way to do this via prefect?
    ✅ 1
    a
    7 replies · 2 participants
Powered by Linen
Title
y

Yusuf

08/15/2022, 7:46 PM
Hello all! I was wondering if there was a way to do something similar to airflow sensors? I need a task to kick off everytime a new file of a certain format lands in azure blob storage. I think there might be a way for me to hook the storage account up to a queue and notification service and subscribe to an event grid etc. But I was wondering if there was a simpler way to do this via prefect?
✅ 1
a

Anna Geller

08/15/2022, 8:08 PM
this goes in detail about that for 1.0 https://discourse.prefect.io/t/is-there-an-equivalent-to-sensors-in-prefect-how-do-i-trigger-event-driven-workflows/76
for 2.0 we are working on something really cool that will address the problem much nicer than sensors
y

Yusuf

08/15/2022, 8:14 PM
Ah awesome thanks. I'm actually picking up from scratch so I was going to start with 2.0 from the get go. Is there a branch or issue/etc I can track for that feature on github? Or do you have an estimated timeline for beta?
a

Anna Geller

08/15/2022, 10:41 PM
no ETA, but I believe event grid is, for now the easiest way to do it - I will ask my colleague who is familiar with Azure whether we could build a recipe showing how to approach that use case firing a workflow any time a new file lands in Azure Blob Storage
y

Yusuf

08/15/2022, 11:10 PM
Ah that would be awesome if you could. I'd really appreciate it. Because without it I have to go read all the docs for event grid etc.
🙌 1
a

Anna Geller

08/15/2022, 11:18 PM
you can follow #show-us-what-you-got - this is usually where we post recipes and blog posts
also discourse.prefect.io
View count: 5