https://prefect.io logo
Join Slack
Channels
ask-community
announcements
ask-marvin
best-practices
data-ecosystem
data-tricks-and-tips
events
feedback-deployment-concurrency
find-a-prefect-job
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
intros-test
livestream-chatter
marvin-ai
marvin-in-the-wild
pacc-apr-10-11-2024
pacc-apr-30-may-1-2024
pacc-apr-9-10-2025
pacc-aug-16-2023
pacc-aug-22-23-2023
pacc-aug-28-29-2024
pacc-aug-29-30-2023
pacc-clearcover-june-12-2023
pacc-dec-17-18-2024
pacc-feb-13-14-2024
pacc-feb-26-27-2025
pacc-jan-28-29-2025
pacc-july-11-12-2023
pacc-july-17-18-2023
pacc-july-30-31-2024
pacc-july-6-2023
pacc-june-14-2023
pacc-june-20-21-2024
pacc-london-2023
pacc-london-sept-2024
pacc-mar-12-13-2024
pacc-may-31-2023
pacc-nov-19-20-2024
pacc-nov-8-2023
pacc-nyc-may-2024
pacc-oct-11-12-2023
pacc-oct-1-2-2024
pacc-oct-16-17-2024
pacc-sept-13-14-2023
pacc-sept-20-21-2023
pacc-sept-26-27-2023
ppcc-may-16-2023
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-contributors-archived
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-and-tell
Powered by
# data-tricks-and-tips
  • d

    Darren Liu

    01/25/2023, 3:05 AM
    Hi prefectionists, first of all great work on a promising product! I am looking for a solution rec for this use case. Suppose there are events that starts at unknown times. Once it is detected that an event has started a flow is looped with some persistent state until it's detected that the event has ended. There can be multiple events happening at once, so there can be multiple loops but only as many as there are events. Therefore, only one loop for each event, as there are no safeguards for multiple loops working on the same event with the same state and inserting identical data. Normally, I would have a repeating event monitoring job that detects the start of events, publish the job with initial state into a work queue, and job consumers that carry out the job, and re-publish into the queue once completed with latest state so the next iteration can begin. The same job can detect end of event and complete without publishing back into the queue. What would be the ideal setup using prefect? thanks in advance for reading the long rant!
    p
    • 2
    • 6
  • i

    Ishan Anilbhai Koradiya

    01/31/2023, 5:33 AM
    Hi community, I just want to undertand is there a way I can interact with prefect apis (not the cloud version) to handle multi-tenancy ? Can I create workspaces in the open source using the prefect apis ?
  • y

    YSF

    02/01/2023, 7:27 AM
    Is there a workaround to do
    prefect cloud login
    without verifying ssl certs? I'm getting an ssl, unable to find local issuer cert error. I can resolve it with IT it'll just take a week and a bit to go through the whole process. I'm just prototyping some stuff. I checked here: https://discourse.prefect.io/t/how-to-disable-the-ssl-verification-when-setting-up-a-pr[…]verify-failed-unable-to-get-local-issuer-certificate/597/2 But the solutions didn't seem to work. I'm using windows btw
  • y

    YSF

    02/01/2023, 8:13 AM
    Also if I do put in a ticket with IT what domain do I need to whitelist *.prefect.cloud ?
    c
    • 2
    • 1
  • h

    Haotian Li

    02/08/2023, 9:09 AM
    Hi Prefect Team, first a thank you to the wonderful job you have done creating this platform! I have a question about heterogeneous flow run: 1. Is there any way for a user to create a flow that runs on different type of hardware in the queue? I need to create a flow that partially run on CPU kubernetes queue and partially on GPU enabled nodes. There doesn't seem to be a way to do this and each flow can only run on pure CPU or all with GPU. 2. If not, what's the best practice for doing this? Should I break a flow into separate subflows and deploy them separately and run each step by hand? Or can I trigger this automatically by having a task run other deployed flow in a different queue. 3. Is there any plan to add per task tagging system for agents so when an agent pulls from a queue it only pulls tasks with a certain tag
    t
    r
    • 3
    • 13
  • e

    Evan Curtin

    02/08/2023, 5:14 PM
    Is this a supported pattern in prefect 2? • Task A generates some output • Use a remote storage for persistence • Task B retrieves output from storage I want to be able to: • Control the filename in azure blob store (right now prefect gives it some random nonsense name) • If the file already exists, skip task A
  • s

    Stephen

    02/09/2023, 1:45 AM
    Hi - its been toted that prefect could be used to run arbitrary workflows in languages other than python (via docker or aome other means). Maybe it's a rare use case but I haven't seen an example of this. Are there any toy examples around?
    👍 1
    a
    l
    • 3
    • 2
  • j

    Jacob Bedard

    02/13/2023, 10:27 PM
    I'm just upgrading to prefect 2.0 and I' finding the secrets don't work when I try to get() them. There's a post in the community forum, but it's not resolved. Has anyone run into this problem
    AttributeError: 'coroutine' object has no attribute 'get'
    and resolved it?
    ✅ 1
    j
    • 2
    • 3
  • r

    Ravi

    02/21/2023, 11:49 AM
    Hi all, I am curious if there is a method in prefect 2 to allow for manual reading of persisted results files. I see that PersistedResult object has a .get() method, however from what I gather this is meant to be used behind the scenes. If I can use it manually, could someone show an example?
  • k

    Kelvin DeCosta

    02/22/2023, 9:38 AM
    Hey everyone! Just wanted to share something I found interesting. When submitting many
    async
    tasks, either via
    .submit
    or
    .map
    , I've found that using a
    SequentialTaskRunner
    causes the flow to run much faster than using
    ConcurrentTaskRunner
    . I'd like to know more about this behavior. Any feedback is appreciated!
    p
    j
    • 3
    • 5
  • a

    Alex Shea

    03/07/2023, 9:11 PM
    Hello all, I have been looking for if prefect 2 is able to leverage different container for different tasks. At Datateer we are leveraging kubenetes as our deployment model. There was a post made on the discord asking the same question. I also found on the discord for prefect 1 that there are ideas that lead me to the kubernetes job task. This lead me to find this prefect-kubernetes package which provides support for prefect 2. I also know that there is the native ability to run sub-flows in a task runner through dask. My question is if there is an accepted standard for managing tasks that one would want to run in an isolated container to better manage dependency conflicts and design runtimes for the tasks that are optimized for the needed resources. The latter would help with reducing runtime costs.
    e
    • 2
    • 1
  • y

    YSF

    03/09/2023, 6:25 PM
    Hi all, I think I'm doing something simple and mistaken. I'm using Python 3.9.6 on Windows 10, and Prefect version 2.8.4, I have the simplest code in a file called dev.py from the 1st tutorial example from the site. I'm using a Python venv too
    Copy code
    from prefect import flow
    
    @flow
    def my_favorite_function():
        print("What is your favorite number?")
        return 42
    
    print(my_favorite_function())
    And when I run
    python dev.py
    it gives me an error
    RuntimeError: Cannot create flow run. Failed to reach API at <http://127.0.0.1:4200/api/>
    I tried changing the code to:
    Copy code
    from prefect import flow
    
    @flow
    def my_favorite_function():
        print("What is your favorite number?")
        return 42
    
    if __name__ == "__main__":
        print(my_favorite_function())
    And it still gives me the same error. Am I missing something? I'm sure I've run this code before when trying to learn
    k
    j
    • 3
    • 3
  • v

    Vincenzo

    03/14/2023, 4:36 PM
    [THEORETICAL QUESTION] Hi everyone, new to DBT / Prefect and Data Engineering. I am currently building my first DE capstone project where I am pulling data from an API, and pushing it to BQ. Orchestration happens with Prefect. I want to use DBT to partition and prepare the data in BQ for Looker. I am having issues wrapping my head around how I can make my code reproducible for others on GitHub as I am using DBT cloud. I found repo from Anna but it does not fully answer my question on how to make the code reproducible (especially in terms of DBT credentials). What would be the best practices here? Would I need to explain them how to setup the DBT cloud as well or is there a better way?
    ✅ 1
    s
    • 2
    • 4
  • v

    Vincenzo

    03/19/2023, 3:05 PM
    [Beginners questions] [VM coordination with Prefect Cloud] [Work pools] [Agents] Hey there, I successfully deployed a prefect flow and it perfectly pulls data from an API and uploads it to GCS and then to BigQuery. But when it comes to scheduling I am missing understanding. I use a VM from Google and host my code on GitHub. When I am running the flow from my VMs CLI: 1. I start the default work pool via
    prefect agent start -q 'default'
    2. I call my flow
    python my_flow_file.py
    I would like my flow now to run independently without me needing to do (1) and (2). I scheduled my flow to run every Saturday, but when I checked, today, it was in status
    late
    and was waiting for a agent to pick up the run. How would I start an agent when I am not in VS code, starting it via the CLI? I assumed it would be started via the Prefect Cloud or my VM that was running.
    ✅ 1
    r
    r
    j
    • 4
    • 9
  • v

    Vincenzo

    03/21/2023, 9:05 AM
    [DBT] [Prefect] Thanks for the relentless support everyone. I am amazed by Prefect and really want to dive deeper into it! I am trying to orchestrate my dbt job with prefect. Following these instructions I was able to set up the credentials block. Unfortunately the job is not executed and it throws no error 🧵 - somebody with a similar setup that encountered this issue as well?
    c
    a
    • 3
    • 12
  • j

    John Kang

    04/07/2023, 2:45 PM
    Hi all, I need help setting up deployments using a docker container as infrastructure (rather than the local environment in which the agent is running). For context, the deployment works when I don't specify infrastructure. The error I receive is: 143142.716 | INFO | Flow run 'enigmatic-octopus' - Downloading flow code from storage at 'generic_wholesale_cert_folder' 143143.776 | ERROR | Flow run 'enigmatic-octopus' - Flow could not be retrieved from deployment. Traceback (most recent call last): File "<frozen importlib._bootstrap_external>", line 839, in exec_module File "<frozen importlib._bootstrap_external>", line 975, in get_code File "<frozen importlib._bootstrap_external>", line 1032, in get_data FileNotFoundError: [Errno 2] No such file or directory: 'sql_queries/sql_query_pull.py' The above exception was the direct cause of the following exception:
    • 1
    • 4
  • b

    Bebeto Nyamwamu

    04/11/2023, 6:37 AM
    Hello, I need help with the following stack trace:
    Copy code
    Flow could not be retrieved from deployment.
    Traceback (most recent call last):
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
        flow = await load_flow_from_flow_run(flow_run, client=client)
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
        return await fn(*args, **kwargs)
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/deployments.py", line 159, in load_flow_from_flow_run
        await storage_block.get_directory(from_path=deployment.path, local_path=".")
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/filesystems.py", line 553, in get_directory
        return await self.filesystem.get_directory(
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/filesystems.py", line 310, in get_directory
        return self.filesystem.get(from_path, local_path, recursive=True)
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 113, in wrapper
        return sync(self.loop, func, *args, **kwargs)
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 98, in sync
        raise return_result
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
        result[0] = await coro
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 561, in _get
        return await _run_coros_in_chunks(
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 269, in _run_coros_in_chunks
        await asyncio.gather(*chunk, return_exceptions=return_exceptions),
      File "/usr/lib/python3.10/asyncio/tasks.py", line 408, in wait_for
        return await fut
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/gcsfs/core.py", line 1266, in _get_file
        await self._get_file_request(u2, lpath, callback=callback, **kwargs)
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/decorator.py", line 221, in fun
        return await caller(func, *(extras + args), **kw)
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/gcsfs/retry.py", line 115, in retry_request
        return await func(*args, **kwargs)
      File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/gcsfs/core.py", line 1253, in _get_file_request
        f2.write(data)
    OSError: [Errno 28] No space left on device
    Its deployed and running on GCP
  • a

    Aaron

    04/11/2023, 2:15 PM
    I think your last line there tells you what you need to know
  • s

    sundeep

    04/29/2023, 4:46 PM
    I am trying to run a deployment via Docker. However, I receive the following error:
    Copy code
    FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows'
    Which is surprising because I don't use the above location to reference the flow. This is what my Docker file look like
    Copy code
    FROM python:3.8-slim-buster
    
    ARG PREFECT_API_KEY
    ENV PREFECT_API_KEY=$PREFECT_API_KEY
    
    ARG PREFECT_API_URL
    ENV PREFECT_API_URL=$PREFECT_API_URL
    
    ARG GCP_DATASET_NAME
    ENV GCP_DATASET_NAME=$GCP_DATASET_NAME
    
    ARG GCP_DATASET_TABLE_NAME
    ENV GCP_DATASET_TABLE_NAME=$GCP_DATASET_TABLE_NAME
    
    ARG GCP_PROJECT_ID
    ENV GCP_PROJECT_ID=$GCP_PROJECT_ID
    
    ARG GCP_REGION
    ENV GCP_REGION=$GCP_REGION
    
    COPY poetry.lock .
    COPY pyproject.toml .
    
    RUN pip install poetry --trusted-host <http://pypi.python.org|pypi.python.org> --no-cache-dir
    RUN poetry config virtualenvs.create false
    RUN poetry install --no-root --without dev
    
    
    RUN mkdir scripts
    copy scripts/ scripts
    
    RUN mkdir config
    COPY config/ config
    
    RUN mkdir -p dbt/xetra
    COPY dbt/xetra dbt/xetra
    Any idea why Prefect is looking for the flow in the /opt/prefect/flows directory? I am running this via the Cloud
    a
    r
    k
    • 4
    • 20
  • j

    John Kang

    05/10/2023, 1:11 PM
    I was at PyCon last month and met some folks from Temporal (https://temporal.io/) who work on workflows similar to Prefect. I looked at their documentation but could use help deciphering their advantage over Prefect. Does anyone know the pros/cons of Temporal vs. Prefect? I ask because we're using Prefect to automate our data pipelines and even some of our application data refreshes (replacing celery workers). I'm wondering what advantage Temporal would provide over Prefect in this regard.
    👀 3
    y
    l
    • 3
    • 3
  • f

    flapili

    05/10/2023, 2:19 PM
    Hi, Is it possible to remove duplicate todos with prefect ? for now I'm using Queue.queue and thread locks to have 1 jobs of type with url = https://google.fr and 1 job with this parameter in queue max ?
  • j

    juandavidlozano

    05/10/2023, 11:15 PM
    Hi I have an issue when writing to a google cloud storage, when I use the
    upload_from_path
    on my code you will see that I am passing the same variable path as the
    from_path
    and the
    to_path
    but for some reason prefect changes the structure of the
    to_path
    variable, here is the code I have that builds the path:
    Copy code
    @task()
    def write_local(df: pd.DataFrame, color: str, dataset_file: str) -> Path:
        """Write DataFrame out locally as parquet file"""
        Path(f"data/{color}").mkdir(parents=True, exist_ok=True)
        path = Path(f"data/{color}/{dataset_file}.parquet")
        df.to_parquet(path, compression="gzip")
        return path
    
    
    @task
    def write_gcs(path: Path) -> None:
        """Upload local parquet file to GCS"""
        gcs_block = GcsBucket.load("zoom-gcs")
        gcs_block.upload_from_path(from_path=path, to_path=path)
        return
    you can see in the second task
    write_gcs
    both of the paths are the same variable called
    path
    and that is just a path structure that has originally this value:
    'data/yellow/yellow_tripdata_2021-01.parquet'
    . The prefect flows runs, but after it runs, in the details of the flow we can see on the second picture I am attaching it changed the text structure of the path for GCS to:
    'data\\yellow\\yellow_tripdata_2021-01.parquet'
    , no idea why this is happening and because of this you can see in the picture 1 that it saves the file with that weird name instead of creating the folders in GCS, any help on maybe why this is happening?
    m
    • 2
    • 3
  • m

    Matthieu Lhonneux

    05/11/2023, 12:35 PM
    Hi All, I use the persist_result for my tasks, however I have two agents and when retrying flow it may happen that the flow no longer runs on the same agent. So I can't use local storage (PREFECT_HOME/storage) I don't want to use s3, is it possible to use prefect's blocks to store these results? or even better, use the postgres DB? Thanks
    d
    • 2
    • 1
  • d

    Devin

    05/30/2023, 2:37 PM
    Hi everyone, I am running into some issues this morning with some of my deployments. I think it may be related to low disk space on our server but our deployments/ flows are failing with no logging. There is a State message to the right that says "Submission failed. FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp9zwn92g4prefect". I think this has something to do with the space issue that we are working on addressing but I am curious why there would be no logging?
  • a

    Austin Weisgrau

    06/01/2023, 5:42 PM
    Has anyone set up a Zap in Zapier to trigger a prefect deployment via webhook? I'm having trouble figuring out how to authenticate a post request from Zapier securely
  • c

    Chandan Maruthi

    06/05/2023, 8:45 AM
    Using Scrapy and CrawlProcess in Prefect 2 Today I learned Scrapy CrawlProcess complains about needing to be the main process if used in tasks or subflows. I had to move the Scrapy process under the main flow for it to work.
    gratitude thank you 1
    👍 2
  • a

    Adrian Brudaru

    06/22/2023, 3:58 PM
    Just wanna share the existence of this (NEW) open source python library for loading data (from json to sql) with automatic normalisation and schema evolution how schema evolution works: https://dlthub.com/docs/reference/explainers/schema-evolution existing pipelines https://dlthub.com/docs/dlt-ecosystem/verified-sources pipeline building guide that gives you an idea of what is possible https://dlthub.com/docs/getting-started/build-a-data-pipeline comes with versioning too It is very scalable, so it can run on tiny cloud functions or large machines and utilize resources well It can of course run on prefect 🙂 Suports these destinations, https://dlthub.com/docs/dlt-ecosystem/destinations next release adds snowflake and parquet to storage destinations feedback welcome the paradigm it is for: https://dlthub.com/docs/blog/automating-data-engineers
    • 1
    • 1
  • k

    Kohjunwei J

    06/27/2023, 5:21 AM
    Hi there, I just started using prefect and have a question about scheduling flow for auto deployment. Currently using prefect cloud and sending the flow to it from my IDE (vscode). But i noticed after i closed my IDE the scheduled flow which is suppose to run daily at 17:00 fails to run. Looks like i need to have an active agent running (?) in order for my flows to work? Is there any other way i can let the flow run automatically without my local machine maintaining the IDE and the agent opened? Thanks!
    ✅ 1
    r
    • 2
    • 6
  • d

    Dan Cabrol

    06/28/2023, 4:18 PM
    Hey Everyone, Quick Questions, I've been using prefect for a little bit of time, with cron scheduling. I would like to know if someone knows if Prefect has the possibility to do something like : run every 10min until the flow succeed Basically i have a flow that scrap some data that is updated between 10 and 12am, I Could update my script and set a check at the beginning of the script that if data is received stop the script. But it's not optimal imo as it will still run the code 10times + for nothing as i already have the data. Thanks !!! Dan
    r
    t
    • 3
    • 2
  • b

    Bebeto Nyamwamu

    07/06/2023, 10:22 AM
    Hello, Is there a tutorial for multiple deployments using GCS as the cloud storage with
    prefect.yaml
    configurations and work pool and queue arrangement? Please share the steps and details on this.
    👍 3
123Latest