• r

    Richard Alexander

    2 weeks ago
    I am running into multiple use cases where it seems like a pub/sub pattern could be useful. Within a flow, I know that I can start multiple subflows, but that requires me to know beforehand which subflows might pertain to the main flow's output result. Is there a pub/sub like pattern in prefect that would allow certain flows to "subscribe" or be triggered by the completion of other flows without manually specifying the relationship in the original flow?
    Sahil Rangwala
    7 replies
    Copy to Clipboard
  • s


    1 week ago
    Looking for solutions on how to "call a task from within a task" • My main task, "Get data from sql" will get the data. • Within the above function, I run another function to generate the SQL-statement with the specifics sent into the parent function (such as a date from a list of datetimes). • Since I cannot decorate the sql-generating function with a @Task - how can I run it and still see it in Prefect? I see from the docs that my options are either 1) use sql.fn() or 2) not decorate it with a @Task - neither of which will not generate a task run. Here is a mockup: @task
    def sql_builder(date):
    statement = statement generator with date and tables
    return statement
    def get_data(date):
    sql = sql_builder(date)
    get_data(sql, con)
    7 replies
    Copy to Clipboard
  • m

    Mike He

    1 week ago
    Hello everyone. I have a problem about Prefect Agent. It cannot fetch the flow codes from a deployment with FTP RemoteFileSystem Storage Block. Test case are as follows: • Up an FTP server with no directory named
    !!! the deployment will overwrite the directory !!! mark down the following 4 credentials for connecting to the FTP server.
    FTP_HOST = ""
    FTP_PORT = 21
    FTP_USERNAME = "prefect"
    FTP_PASSWORD = "--your-password"
    • Test codes structure:
    ┣ .prefectignoredeploy.py
    ┗ my_flow.py
    • Codes are as follows, please alter the 4
    parameters with your FTP credentials
    # codes/.prefectignore
    # codes/deploy.py
    from prefect.deployments import Deployment
    from prefect.filesystems import RemoteFileSystem
    from my_flow import main_flow
    FTP_HOST = ""
    FTP_PORT = 21
    FTP_USERNAME = "prefect"
    FTP_PASSWORD = "--your-password"
    ftp_storage_block = RemoteFileSystem(
            "host": FTP_HOST,
            "port": FTP_PORT,
            "username": FTP_USERNAME,
            "password": FTP_PASSWORD,
    ftp_storage_block.save("ftp-localhost", overwrite=True)
    deployment = Deployment.build_from_flow(
        main_flow, name="Main Flow", storage=ftp_storage_block
    if __name__ == "__main__":
    # codes/my_flow.py
    from prefect import flow
    from prefect.logging import get_run_logger
    def main_flow():
        logger = get_run_logger()
    prefect orion start
    cd codes
    and run
    python deploy.py
    • Up an agent:
    prefect agent start --work-queue "default"
    • Run the deployment now with defaults through Prefect Orion UI Error Messages:
    17:15:22.238 | INFO    | prefect.agent - Submitting flow run '681f7ee9-ad7c-4961-bebc-6381a954e0b4'
    17:15:22.320 | INFO    | prefect.infrastructure.process - Opening process 'copper-unicorn'...
    17:15:22.326 | INFO    | prefect.agent - Completed submission of flow run '681f7ee9-ad7c-4961-bebc-6381a954e0b4'
    17:15:25.478 | ERROR   | Flow run 'copper-unicorn' - Flow could not be retrieved from deployment.
    Traceback (most recent call last):
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\engine.py", line 256, in retrieve_flow_then_begin_flow_run
        flow = await load_flow_from_flow_run(flow_run, client=client)
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\client.py", line 103, in with_injected_client
        return await fn(*args, **kwargs)
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\deployments.py", line 54, in load_flow_from_flow_run
        await storage_block.get_directory(from_path=deployment.path, local_path=".")
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\filesystems.py", line 296, in get_directory     
        return self.filesystem.get(from_path, local_path, recursive=True)
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\fsspec\spec.py", line 801, in get
        self.get_file(rpath, lpath, **kwargs)
      File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\fsspec\implementations\ftp.py", line 136, in get_file   
        outfile = open(lpath, "wb")
    PermissionError: [Errno 13] Permission denied: 'C:/Users/Mike/AppData/Local/Temp/tmpz2x16tivprefect'
    17:15:25.827 | INFO    | prefect.infrastructure.process - Process 'copper-unicorn' exited cleanly.
    I have also write a post asking this question in Discourse Link.
  • m

    Mike He

    1 week ago
    I have tried to patch the
    with the following code the other day. But after some digging into the
    source code I am not sure the problem should be classified as directly related with
    or the
    . Also the patching is a little bit ugly and I don't want to patch it that way for every agent environments XD. So I am asking here, if that is the case for me only with Windows platform? And if the error is universal, will the Prefect Team fix it in the future or waiting
    to fix that? BTW: I have also raised an Issue to
    here Summary: Added 2 lines to
    , as well as a new function
    # prefect/filesystem.py
    class RemoteFileSystem(WritableFileSystem, WritableDeploymentStorage):
        async def get_directory(
            self, from_path: Optional[str] = None, local_path: Optional[str] = None
        ) -> None:
            Downloads a directory from a given remote path to a local direcotry.
            Defaults to downloading the entire contents of the block's basepath to the current working directory.
            if from_path is None:
                from_path = str(self.basepath)
                from_path = self._resolve_path(from_path)
            if local_path is None:
                local_path = Path(".").absolute()
            if urllib.parse.urlsplit(self.basepath).scheme == 'ftp':  # Add
                return await self.get_directory_ftp(from_path, local_path)  # Add
            return self.filesystem.get(from_path, local_path, recursive=True)
        async def get_directory_ftp(
            self, from_path: Optional[str] = None, local_path: Optional[str] = None
        ) -> None:
            from_path_raw = urllib.parse.urlsplit(from_path).path
            for file_directory_item in <http://self.filesystem.ls|self.filesystem.ls>(from_path):
                type_ = file_directory_item["type"]
                name = file_directory_item["name"]
                other_path = name[len(from_path_raw) :]
                if other_path.startswith("/"):  # Change to relative path
                    other_path = other_path[1:]
                dest_path = Path(local_path).joinpath(other_path)
                if type_ == "directory":
                    if (not dest_path.exists()) or dest_path.is_file():
                    await self.get_directory_ftp(name, dest_path)
                if type_ == "file":
                        self.filesystem.get_file(name, dest_path)
                        print(f"FTP Error downloading {name} to {dest_path}")
  • Surat Mukker

    Surat Mukker

    1 week ago
    Hello two new questions, 1. our use case requires that we have only one flow run of a deployment executing at a given time,. i.e if it a flow run running behind for any reason, we do not want the next flow run of the same deployment to start executing even if it is scheduled to run, we would prefer to have the next run cancelled. Is there a setting for this in Prefect 2.x. Using Queues to control this concurrency will require us to create a queue per deployment, which is not something we want since we have many deployments and creating a new queue for each deployment is going to make for a very complex system. Is there another way we are missing? 2. Prefect 2 schedules next 100 runs of deployment when it is scheduled. Is there a setting we can use to reduce this number?
    Surat Mukker
    7 replies
    Copy to Clipboard
  • m

    Misha Badov

    2 days ago
    General Prefect question, which I couldn’t seem to find an answer to online. What kinds of QPS can we expect the scheduler to handle?
    Michael Adkins
    4 replies
    Copy to Clipboard
  • m

    Misha Badov

    2 days ago
    Can the scheduler be horizontally scaled if needed?
    Michael Adkins
    2 replies
    Copy to Clipboard
  • Bertangela Loret de Mola

    Bertangela Loret de Mola

    2 days ago
    Hi there. I'm using prefect v1. I'm adding a task to a custom task file (library) but when executing the flow from prefect cloud, I'm getting the error: Failed to load and execute Flow's environment: ImportError("cannot import name 'task_name' from 'tasks.myLib' (/opt/prefect/tasks/myLib.py)") Only thing I found is that the task definition in tasks/myLib.py is separated by one blank line from the previous one. Is it mandatory to separate task's definitions with at least 2 blank lines?
    Bertangela Loret de Mola
    1 replies
    Copy to Clipboard
  • John Kang

    John Kang

    1 day ago
    I'm having a task fail when running on an agent within a flow. The task takes the current working directory and uploads that as a string value into a string block. Any idea why this could be? This started when I updated to 2.4.1
    John Kang
    7 replies
    Copy to Clipboard