https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
best-practices-coordination-plane
  • l

    link89

    09/09/2022, 2:58 AM
    I am wondering if I can use a CI system (like Jenkins) to run a prefect script instead of using the deployment mechanism as it is easier and more flexible for me to set up an environment in CI than using the deployment.yml. I can still use PREFECT_ORION_DATABASE_CONNECTION_URL to connect to the database so that I can check the job status in the dashboard. What loss would I have if I don’t use the deployment mechanism?
    ✅ 1
    a
    • 2
    • 4
  • l

    Louis Vines

    09/09/2022, 8:42 AM
    Hi All 👋 First of all a disclaimer - I'm a Prefect noob (although have quite deep Airflow experience 🙊) so may just be asking a silly question here that doesn't make sense, so apologies if that is the case. However... I've just joined an early stage startup and we thought a good first code change could be migrating our prefect code to v2. I'm struggling to work out how to migrate our Prefect Cloud state (past flows, projects etc...) from our v1 instance to our v2 instance. When looking at the migration guide here: https://docs.prefect.io/migration-guide/ it's very much geared around towards making our code v2 compliant but doesn't mention about migrating our cloud metadata etc. Should I even be doing this? If so how? Currently when I open the v2 ui it's just empty and is encouraging me to create a brand new workspace...
    ✅ 1
    o
    • 2
    • 1
  • r

    Richard Alexander

    09/09/2022, 2:24 PM
    I am running into multiple use cases where it seems like a pub/sub pattern could be useful. Within a flow, I know that I can start multiple subflows, but that requires me to know beforehand which subflows might pertain to the main flow's output result. Is there a pub/sub like pattern in prefect that would allow certain flows to "subscribe" or be triggered by the completion of other flows without manually specifying the relationship in the original flow?
    👀 1
    ✅ 1
    👍 1
    s
    g
    • 3
    • 7
  • e

    Eric Bolton

    09/09/2022, 2:42 PM
    Running into a problem with a flow whose latest run gets stuck in the "Running" state if a deployment happens to coincide with the run. • We are using ECS to run the
    prefect-agent
    task that registers and runs the flows • Whenever we make an update through our CI pipeline, a new ECS task is spun up and the old task gets deprovisioned • Even when the old task is marked as "Stopped" in AWS (there is no hardware now running the old
    prefect-agent
    ), the old Flow Run still says "Running" in the Prefect UI and hangs. Given our concurrency flags, it blocks all scheduled flow runs until it is manually canceled • Questions: ◦ Is there a way for Prefect to detect that the old
    prefect-agent
    is now stopped and automatically force cancel the flow run? ◦ Should we be managing our deployment process differently?
    r
    • 2
    • 1
  • j

    John Munera

    09/09/2022, 3:21 PM
    Hello guys, currently running a problem trying to create my account in prefect cloud 2.0 to do the migration. Always i tried I get a 401 error and never get the verification code on my email.
    m
    • 2
    • 3
  • s

    Stefan

    09/12/2022, 11:34 AM
    Looking for solutions on how to "call a task from within a task" • My main task, "Get data from sql" will get the data. • Within the above function, I run another function to generate the SQL-statement with the specifics sent into the parent function (such as a date from a list of datetimes). • Since I cannot decorate the sql-generating function with a @Task - how can I run it and still see it in Prefect? I see from the docs that my options are either 1) use sql.fn() or 2) not decorate it with a @Task - neither of which will not generate a task run. Here is a mockup: @task
    def sql_builder(date):
    statement = statement generator with date and tables
    return statement
    @task
    def get_data(date):
    sql = sql_builder(date)
    get_data(sql, con)
    ✅ 1
    t
    b
    • 3
    • 7
  • s

    Surat Mukker

    09/13/2022, 12:32 AM
    Hello, we are using Prefect 2.x (2.0.4 but planning to switch to 2.3.x very soon) and have a use case where we have a flow running every 3 minutes. We do not want to receive a failure notification if up to 2 runs fail in a row, but if the 3rd consecutive failure happens we want to be notified, what is the best way to set this up?
    ✅ 1
    a
    • 2
    • 2
  • s

    Surat Mukker

    09/15/2022, 3:54 AM
    Hello two new questions, 1. our use case requires that we have only one flow run of a deployment executing at a given time,. i.e if it a flow run running behind for any reason, we do not want the next flow run of the same deployment to start executing even if it is scheduled to run, we would prefer to have the next run cancelled. Is there a setting for this in Prefect 2.x. Using Queues to control this concurrency will require us to create a queue per deployment, which is not something we want since we have many deployments and creating a new queue for each deployment is going to make for a very complex system. Is there another way we are missing? 2. Prefect 2 schedules next 100 runs of deployment when it is scheduled. Is there a setting we can use to reduce this number?
    p
    b
    c
    • 4
    • 7
  • m

    Mike He

    09/16/2022, 9:09 AM
    Hello everyone. I have a problem about Prefect Agent. It cannot fetch the flow codes from a deployment with FTP RemoteFileSystem Storage Block. Test case are as follows: • Up an FTP server with no directory named
    codes
    !!! the deployment will overwrite the directory !!! mark down the following 4 credentials for connecting to the FTP server.
    FTP_HOST = "127.0.0.1"
    FTP_PORT = 21
    FTP_USERNAME = "prefect"
    FTP_PASSWORD = "--your-password"
    • Test codes structure:
    codes/
    ┣ .prefectignore
    ┣ deploy.py
    ┗ my_flow.py
    • Codes are as follows, please alter the 4
    FTP_xxx
    parameters with your FTP credentials
    # codes/.prefectignore
    __pycache__
    deploy.py
    
    
    # codes/deploy.py
    from prefect.deployments import Deployment
    from prefect.filesystems import RemoteFileSystem
    
    from my_flow import main_flow
    
    FTP_HOST = "127.0.0.1"
    FTP_PORT = 21
    FTP_USERNAME = "prefect"
    FTP_PASSWORD = "--your-password"
    
    ftp_storage_block = RemoteFileSystem(
        basepath=f"ftp://{FTP_HOST}/codes",
        settings={
            "host": FTP_HOST,
            "port": FTP_PORT,
            "username": FTP_USERNAME,
            "password": FTP_PASSWORD,
        },
    )
    ftp_storage_block.save("ftp-localhost", overwrite=True)
    deployment = Deployment.build_from_flow(
        main_flow, name="Main Flow", storage=ftp_storage_block
    )
    
    if __name__ == "__main__":
        deployment.apply()
    
    
    # codes/my_flow.py
    from prefect import flow
    from prefect.logging import get_run_logger
    
    @flow
    def main_flow():
        logger = get_run_logger()
        <http://logger.info|logger.info>("Hello")
    •
    prefect orion start
    •
    cd codes
    and run
    python deploy.py
    • Up an agent:
    prefect agent start --work-queue "default"
    • Run the deployment now with defaults through Prefect Orion UI Error Messages:
    17:15:22.238 | INFO    | prefect.agent - Submitting flow run '681f7ee9-ad7c-4961-bebc-6381a954e0b4'
    17:15:22.320 | INFO    | prefect.infrastructure.process - Opening process 'copper-unicorn'...
    17:15:22.326 | INFO    | prefect.agent - Completed submission of flow run '681f7ee9-ad7c-4961-bebc-6381a954e0b4'
    17:15:25.478 | ERROR   | Flow run 'copper-unicorn' - Flow could not be retrieved from deployment.
    Traceback (most recent call last):
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\engine.py", line 256, in retrieve_flow_then_begin_flow_run
        flow = await load_flow_from_flow_run(flow_run, client=client)
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\client.py", line 103, in with_injected_client
        return await fn(*args, **kwargs)
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\deployments.py", line 54, in load_flow_from_flow_run
        await storage_block.get_directory(from_path=deployment.path, local_path=".")
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\filesystems.py", line 296, in get_directory     
        return self.filesystem.get(from_path, local_path, recursive=True)
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\fsspec\spec.py", line 801, in get
        self.get_file(rpath, lpath, **kwargs)
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\fsspec\implementations\ftp.py", line 136, in get_file   
        outfile = open(lpath, "wb")
    PermissionError: [Errno 13] Permission denied: 'C:/Users/Mike/AppData/Local/Temp/tmpz2x16tivprefect'
    17:15:25.827 | INFO    | prefect.infrastructure.process - Process 'copper-unicorn' exited cleanly.
    I have also write a post asking this question in Discourse Link.
    👍 1
  • m

    Mike He

    09/16/2022, 9:46 AM
    I have tried to patch the
    prefect.filesystem.RemoteFileSystem
    with the following code the other day. But after some digging into the
    fsspec
    source code I am not sure the problem should be classified as directly related with
    Prefect
    or the
    fsspec
    . Also the patching is a little bit ugly and I don't want to patch it that way for every agent environments XD. So I am asking here, if that is the case for me only with Windows platform? And if the error is universal, will the Prefect Team fix it in the future or waiting
    fsspec
    to fix that? BTW: I have also raised an Issue to
    fsspec
    here Summary: Added 2 lines to
    RemoteFileSystem.get_directory
    , as well as a new function
    RemoteFileSystem.get_directory_ftp
    # prefect/filesystem.py
    ...
    
    class RemoteFileSystem(WritableFileSystem, WritableDeploymentStorage):
    
        ...
    
        async def get_directory(
            self, from_path: Optional[str] = None, local_path: Optional[str] = None
        ) -> None:
            """
            Downloads a directory from a given remote path to a local direcotry.
    
            Defaults to downloading the entire contents of the block's basepath to the current working directory.
            """
            if from_path is None:
                from_path = str(self.basepath)
            else:
                from_path = self._resolve_path(from_path)
    
            if local_path is None:
                local_path = Path(".").absolute()
    
            if urllib.parse.urlsplit(self.basepath).scheme == 'ftp':  # Add
                return await self.get_directory_ftp(from_path, local_path)  # Add
    
            return self.filesystem.get(from_path, local_path, recursive=True)
    
        async def get_directory_ftp(
            self, from_path: Optional[str] = None, local_path: Optional[str] = None
        ) -> None:
            from_path_raw = urllib.parse.urlsplit(from_path).path
            for file_directory_item in <http://self.filesystem.ls|self.filesystem.ls>(from_path):
                type_ = file_directory_item["type"]
                name = file_directory_item["name"]
                other_path = name[len(from_path_raw) :]
                if other_path.startswith("/"):  # Change to relative path
                    other_path = other_path[1:]
                dest_path = Path(local_path).joinpath(other_path)
                if type_ == "directory":
                    if (not dest_path.exists()) or dest_path.is_file():
                        dest_path.mkdir()
                    await self.get_directory_ftp(name, dest_path)
                if type_ == "file":
                    try:
                        self.filesystem.get_file(name, dest_path)
                    except:
                        print(f"FTP Error downloading {name} to {dest_path}")
  • m

    Misha B

    09/22/2022, 5:23 PM
    General Prefect question, which I couldn’t seem to find an answer to online. What kinds of QPS can we expect the scheduler to handle?
    m
    • 2
    • 4
  • m

    Misha B

    09/22/2022, 5:23 PM
    Can the scheduler be horizontally scaled if needed?
    ✅ 1
    m
    • 2
    • 2
  • b

    Bertangela Loret de Mola

    09/23/2022, 2:14 AM
    Hi there. I'm using prefect v1. I'm adding a task to a custom task file (library) but when executing the flow from prefect cloud, I'm getting the error: _Failed to load and execute Flow's environment: ImportError(\"cannot import name 'task_name' from 'tasks.myLib' (/opt/prefect/tasks/myLib.py)\")_ Only thing I found is that the task definition in tasks/myLib.py is separated by one blank line from the previous one. Is it mandatory to separate task's definitions with at least 2 blank lines?
    • 1
    • 1
  • j

    John Kang

    09/23/2022, 1:47 PM
    I'm having a task fail when running on an agent within a flow. The task takes the current working directory and uploads that as a string value into a string block. Any idea why this could be? This started when I updated to 2.4.1
    ✅ 1
    a
    • 2
    • 7
  • z

    Zi Yuan

    09/26/2022, 7:54 AM
    Helloo!!! I have a question regarding Prefect 1.0. Is it possible to send notifications to multiple channels in Slack (which means to have multiple SLACK_WEBHOOK_URL)? In UI, it only allowed me to use one url. Many thaanks!
    j
    • 2
    • 3
  • r

    Richard Alexander

    09/27/2022, 2:01 PM
    I'm using prefect 2.3 and having trouble running a new flow. One of my tasks instantiates a class and returns it, then fails with the following error:
    cannot pickle '_thread.lock' object
    I have found some methods to get around this for v1, but I can't find anything for v2. How can I get this flow to run?
    m
    • 2
    • 6
  • j

    Jon Young

    09/27/2022, 2:34 PM
    Hey team! Any recommendations for the best way to structure the directories of a prefect project? My company is using the medallion architecture. Currently we have one directory for tasks and another for workflows, with a file naming convention tied to the medallion architecture, eg
    b_2_c_task_descriptor.py
    . This might scale up well, but I think better to break tasks and workflows into their own directories. Thoughts? What has scaled well for you?
    ❤️ 2
  • g

    gertjan

    09/28/2022, 1:57 PM
    Hi all. I’m able to stream my program’s logs to the UI setting this ENV variable:
    export PREFECT_LOGGING_EXTRA_LOGGERS='my-loggers-name'
    Now, when I run my codebase in multiprocessing these logs are not shown in the UI. Anyone has some tips how I still stream the logs of
    my-loggers-name
    to the UI while using multiprocessing? EDIT: the
    root
    Logger is also ignored..
    r
    • 2
    • 6
  • y

    yair friedman

    09/29/2022, 4:54 AM
    Hi, i am using prefect 1.0. i am trying to run 2 flows in parallel, one of them is running on the local executor and one on the localDaskExecutor
    ✅ 1
    r
    • 2
    • 1
  • y

    yair friedman

    09/29/2022, 4:54 AM
    i want both flows to run in parallel
  • y

    yair friedman

    09/29/2022, 4:54 AM
    but it seems that one flow is waiting for the other to complete before it starts
  • y

    yair friedman

    09/29/2022, 4:54 AM
    even though it has no dependency on it
  • y

    yair friedman

    09/29/2022, 4:55 AM
    how can i implement this logic ?
  • t

    Thomas Fredriksen

    09/30/2022, 8:16 AM
    Hi there, I was wondering if there are any arguments against running an Orion deployment in kubernetes with a single agent? If I only wish for the jobs to run as KubernetesJobs, it seems to me that having a single agent with an appropriate service account is the only thing necessary to have an effective deployment? Furthermore - Since we only need a single agent, could we simply have the agent run as a sidecar to the orion server? I guess we would have to disable pod autoscalers to avoid any issues that may arrise when the number of servers (and thus agents) decrease?
    m
    k
    • 3
    • 7
  • t

    Thomas Fredriksen

    10/03/2022, 6:19 PM
    Hi there, I did some brief testing with the Deployment description. Does it support any kind of markup-language, HTML, Markdown or similar? To me it seems to only be plaintext?
    ✅ 1
    s
    • 2
    • 2
  • j

    Jordan Charlier

    10/04/2022, 2:51 PM
    Hello Everyone ! I use Prefect 1 and I have an odd behavior with flows registration. The image is always push on AWS ECR. Even if there is no change. Is this normal ?
    t
    r
    c
    • 4
    • 6
  • r

    Rico Farina

    10/06/2022, 9:55 AM
    Hello everyone, I have been exploring the documentation for prefect 2 and mostly love how intuitive the concepts and design is. There are only a few questions that come up when I think of how I would setup prefect as a scalable orchestration tool. I think I have some grasp about them but would really appreciate help clarifying them to make sure I am not going in the wrong direction. I am aiming for a setup that can deal with ever increasing numbers of flows to orchestrate and have flows with different compute needs (some heavy, some light), what would the best approach (or combination of them?): 1) should I increase the number of agents picking work from each work queue? 2) should I setup task runners to run in a pre-existing Dask/Ray cluster and increase/decrease the compute of the cluster? 3) should I set the infrastructure to run flows in ephemeral kubernetes pods and increase/decrease the k8s cluster compute according to need? On a side note: 4) If I have 2 (or more) agents monitoring the same queue, once the first agent picked a flow from the queue, no other agent will pick it, right? Thanks in advance 🙂
    m
    b
    • 3
    • 4
  • j

    Jaime Raldua Veuthey

    10/06/2022, 6:34 PM
    Hi, I am fairly new to Prefect and would like to orchestrate my current workflow but not sure where to start. At the moment I have a big python script that takes 2h to run locally. Basically scrapping, data analysis and writing in an DB (likely Airtable). My next step will be making different scripts to have sort of a pipeline. After that not sure if I should try to have the scripts running serverless with Google functions or AWS Lambdas, or go directly for containers. Any suggestions, no matter how basic, are very welcome :) Thank you!
    ✅ 1
    a
    • 2
    • 3
  • r

    rkoolbergen

    10/07/2022, 2:06 PM
    Hi, I'm evaluating Prefect 2 together with Snowflake and DBT. I've got all the individual components working - I can run a flow in prefect and run dbt jobs that transform data in a Snowflake. However, if I try to combine the 3 I cannot get it to work. I am using the blocks 'Snowflake connector' and 'Snowflake Credentials', as well as the dbt CLI Profile block and the "dbt CLI Snowflake Target Configs" block. Can somebody guide me to a good tutorial, or provide a basic example ? thanks ! Rogier
    ✅ 1
    a
    • 2
    • 1
  • j

    John

    10/08/2022, 7:24 AM
    Since Prefect uploads supporting files to storage (I'm using LocalFileSystem, so
    /tmp/
    ), any code that relies on relative paths is now broken. What's the best practice on this? Using absolute paths throughout? Change working directory? Related threads: thread 1, thread 2, Github open issue 6391
    ✅ 1
    a
    • 2
    • 1
Powered by Linen
Title
j

John

10/08/2022, 7:24 AM
Since Prefect uploads supporting files to storage (I'm using LocalFileSystem, so
/tmp/
), any code that relies on relative paths is now broken. What's the best practice on this? Using absolute paths throughout? Change working directory? Related threads: thread 1, thread 2, Github open issue 6391
✅ 1
a

Anna Geller

10/08/2022, 10:09 AM
Best practice is to use remote storage or Docker image storage, local storage is meant more for local development and single node deployment (you build on this machine and you deploy to the same machine). Prefect always executes flow run in a tmp directory and downloads code from remote storage directory but afaik it doesn't copy files into tmp for local storage
👍 1
View count: 4