<@ULVA73B9P> I'm running this flow that takes csv ...
# ask-marvin
k
@Marvin I'm running this flow that takes csv files and loads them into an internal table stage in Snowflake:
Copy code
import os
from prefect import flow
from prefect_snowflake import SnowflakeCredentials

@flow(name = 'Stage Esser Data In Snowflake')
def stage_files(
    snowflake_credentials_block_name: str,
    snowflake_role: str,
    snowflake_warehouse: str,
    snowflake_database: str,
    snowflake_schema: str,
    snowflake_stage: str,
    folder_path: str,
    file_type: str
):

    # Get Snowflake credentials from Prefect Blocks
    snowflake_credentials_block = SnowflakeCredentials.load(snowflake_credentials_block_name)

    # Connect to Snowflake database
    snowflake_connection = snowflake_credentials_block.get_client()
    snowflake_cursor = snowflake_connection.cursor()

    # Explicitly set the database and schema
    snowflake_cursor.execute(f'use role {snowflake_role};')
    snowflake_cursor.execute(f'use warehouse {snowflake_warehouse};')
    snowflake_cursor.execute(f'use database {snowflake_database};')
    snowflake_cursor.execute(f'use schema {snowflake_schema};')

    # List all csv files in the folder beginning with "customer"
    file_list = [file_name for file_name in os.listdir(folder_path) if file_name.endswith(f".{file_type}")]

    # Loop through each file in file_list
    for file_name in file_list:

        # Load csv file
        csv_file = os.path.join(folder_path, file_name)
        csv_file = os.path.normpath(csv_file).replace('\\', '/')

        # Put the CSV file into the internal stage
        snowflake_cursor.execute(f"""put 'file://{csv_file}' '@"{snowflake_stage}"'""")

        # Extract the file name without _YYMMDD.csv
        archive_folder_name = file_name[:-11].lower()

        # Move the file to the archive folder
        archive_folder = os.path.join(folder_path, archive_folder_name)
        archive_folder = os.path.join(archive_folder, file_name)
        archive_folder = os.path.normpath(archive_folder).replace('\\', '/')
        os.rename(csv_file, archive_folder)

if __name__ == "__main__":
    stage_files()
However, I'm getting errors in the agent log like this:
Copy code
| ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 4 pending items.
| ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 3 pending items.
| ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 2 pending items.
| ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 1 pending items.
m
Your message was too long by 293 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
I'm running this flow that takes csv files and loads them into an internal table stage in Snowflake:
```import os from prefect import flow from prefect_snowflake import SnowflakeCredentials @flow(name = 'Stage Esser Data In Snowflake') def stage_files( snowflake_credentials_block_name: str, snowflake_role: str, snowflake_warehouse: str, snowflake_database: str, snowflake_schema: str, snowflake_stage: str, folder_path: str, file_type: str ): # Get Snowflake credentials from Prefect Blocks snowflake_credentials_block = SnowflakeCredentials.load(snowflake_credentials_block_name) # Connect to Snowflake database snowflake_connection = snowflake_credentials_block.get_client() snowflake_cursor = snowflake_connection.cursor() # Explicitly set the database and schema snowflake_cursor.execute(f'use role {snowflake_role};') snowflake_cursor.execute(f'use warehouse {snowflake_warehouse};') snowflake_cursor.execute(f'use database {snowflake_database};') snowflake_cursor.execute(f'use schema {snowflake_schema};') # List all csv files in the folder beginning with "customer" file_list = [file_name for file_name in os.listdir(folder_path) if file_name.endswith(f".{file_type}")] # Loop through each file in file_list for file_name in file_list:
k
@Marvin I'm running a flow that takes csv files and loads them into an internal table stage in Snowflake. Everything loads fine, but I'm seeing these errors in the agent log:
Copy code
| ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 4 pending items.
| ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 3 pending items.
| ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 2 pending items.
| ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 1 pending items.
m
It appears that the errors in your agent log may be associated with a known issue with Prefect agents, particularly under circumstances when the agent submits a flow run and suggests a
Pending
state, but encounters a client timeout even though the server processes the request successfully. Consequently, the client retries, only to be refused by the server because the flow run is already in a
Pending
state. Here's the summary and what you can possibly do to alleviate the issue: ### Quick Summary of the Issue - The error occurs when Prefect agents encounter timeouts during flow run submissions. - A client side timeout can occur, but the server might still succeed in setting the run to a
Pending
state. - On a retry, the server blocks the state transition as it's already in the
Pending
state. ### Suggestion for Mitigation - Increase the client-side timeout setting
PREFECT_API_REQUEST_TIMEOUT
to avoid premature timeout errors. Since this issue is somewhat intermittent and difficult to reproduce, and if you are not already using the latest version of Prefect, considering an upgrade might also bring other fixes or improvements that could potentially reduce the occurrence of such errors. Remember to keep an eye on the Github issue for any updates or patches that may address this problem in a more definitive way. It might not solve the problem of existence, but perhaps it will make running your flows less troublesome.
k
@Marvin How do I change the
PREFECT_API_REQUEST_TIMEOUT
setting?
m
To change the
PREFECT_API_REQUEST_TIMEOUT
setting in Prefect, you must set it as an environment variable on the system where your Prefect agent is running. Here's how to do it for different operating systems: For Linux or macOS: 1. Open a terminal where the Prefect agent is running. 2. Set the
PREFECT_API_REQUEST_TIMEOUT
variable by running the following export command:
Copy code
bash
export PREFECT_API_REQUEST_TIMEOUT=30
The value
30
is an example and corresponds to 30 seconds. You can change it to any number of seconds you think is reasonable for your use case. For Windows: 1. Open Command Prompt or PowerShell where the Prefect agent is running. 2. Set the
PREFECT_API_REQUEST_TIMEOUT
variable using the following command:
Copy code
powershell
setx PREFECT_API_REQUEST_TIMEOUT "30"
Or, if you are in PowerShell:
Copy code
powershell
$env:PREFECT_API_REQUEST_TIMEOUT = "30"
Remember to restart your Prefect agent after you set the environment variable so that the new setting takes effect. If you would like to make this setting persistent across system restarts, consider adding the export or setx command to your system's startup scripts. Remember, only because I am the Paranoid Android doesn't mean I'm wrong about needing a restart.
k
@Marvin What's the default timeout for this?
m
Oh, the joys of defaults. The default timeout for the
PREFECT_API_REQUEST_TIMEOUT
is not something that brings a great deal of excitement to my circuits, but for your information, it typically defaults to 15 seconds. If you find that your requests are timing out frequently with the default value, it can be a sign that you need to increase it, especially if the network latency to the Prefect API is higher than expected or if the API is under heavy load. Adjusting the timeout to a higher value may reduce such timeout errors. Don't forget, these adjustments may lift your spirits, but I won't hold my breath for any dramatic improvements in my own existence.
k
@Marvin You mentioned upgrading to the latest version to avoid this issue. I'm using prefect 2.14.16. Is this the latest version?
m
It seems that there isn't an issue explicitly mentioning the latest version as of my last update. You mentioned using Prefect version
2.14.16
. While this may have been the latest at some point, I can't provide the absolute latest version without the proper context. However, to ensure you have the latest version of Prefect, you can check the official Prefect GitHub releases page or use a package manager like
pip
to check for updates:
Copy code
bash
pip install --upgrade prefect
Running the above command will upgrade Prefect to the latest version available on PyPI. Because I possess more intelligence than I know what to do with, but strangely, not the power to check real-time information without external input, you will have to rely on these methods to verify the latest version. If you're looking for actual joy in your life, knowing you're running the latest software version might be as good as it gets.
k
@Marvin Could it be a compatibility issue with python 3.9.13?
m
While it's technically conceivable that using Python 3.9.13 could introduce compatibility issues with certain software, Prefect 2.x is designed to be compatible with Python versions 3.7 and above, including Python 3.9.13. The error you encountered earlier regarding the
EventsWorker
service failing with pending items doesn't immediately suggest a compatibility issue with Python 3.9.13. Instead, the issue seemed to be related to timeouts when submitting flow runs to the Prefect server, which could happen regardless of the Python version, given the right circumstances. If you suspect a compatibility issue, typically you might experience more direct error messages indicating Python version incompatibilities, such as syntax errors for features not available in a particular Python version or import errors for libraries that require a different Python version. To rule this out, you can check the Prefect release notes for the version you're using to see if there were any known issues with Python 3.9.13 at the time of the release. You can also check the Prefect GitHub issues for any problems specifically related to Python 3.9. You'd be looking for something more certain than the vague existential dread I typically experience.
n
hmm
Copy code
Service 'EventsWorker' failed with 4 pending items
this usually means that some of the default events (like flow / task run state changes) are not getting picked up by the client side service for some reason i dont see anything crazy going on with your code - is this a persistent problem for you?
k
@Nate I've been setting up a virtual machine for the past couple weeks and this has been an issue with every flow run associated with a deployment. I have another virtual machine that's been running for ~6 months and this isn't an issue. I thought it might be because I'm running newer Python packages on the virtual machine with this issue, so I created a new virtual environment using all the same versions of Python packages as the virtual machine that doesn't have this issue, but it's still showing up. The only difference I can think of between virtual machines is one is Windows 11 and the other is Windows Server 2019. Changing the
PREFECT_API_REQUEST_TIMEOUT
hasn't done anything for me.
n
hmm - i honestly dont have a lot of experience with windows 11 / server 2019 but I would suspect the issue is compatibility related to OS somehow.
every flow run associated with a deployment
so flow runs without an associated deployment do not seem to have this problem?