Kaleb King
01/20/2024, 4:51 PMimport os
from prefect import flow
from prefect_snowflake import SnowflakeCredentials
@flow(name = 'Stage Esser Data In Snowflake')
def stage_files(
snowflake_credentials_block_name: str,
snowflake_role: str,
snowflake_warehouse: str,
snowflake_database: str,
snowflake_schema: str,
snowflake_stage: str,
folder_path: str,
file_type: str
):
# Get Snowflake credentials from Prefect Blocks
snowflake_credentials_block = SnowflakeCredentials.load(snowflake_credentials_block_name)
# Connect to Snowflake database
snowflake_connection = snowflake_credentials_block.get_client()
snowflake_cursor = snowflake_connection.cursor()
# Explicitly set the database and schema
snowflake_cursor.execute(f'use role {snowflake_role};')
snowflake_cursor.execute(f'use warehouse {snowflake_warehouse};')
snowflake_cursor.execute(f'use database {snowflake_database};')
snowflake_cursor.execute(f'use schema {snowflake_schema};')
# List all csv files in the folder beginning with "customer"
file_list = [file_name for file_name in os.listdir(folder_path) if file_name.endswith(f".{file_type}")]
# Loop through each file in file_list
for file_name in file_list:
# Load csv file
csv_file = os.path.join(folder_path, file_name)
csv_file = os.path.normpath(csv_file).replace('\\', '/')
# Put the CSV file into the internal stage
snowflake_cursor.execute(f"""put 'file://{csv_file}' '@"{snowflake_stage}"'""")
# Extract the file name without _YYMMDD.csv
archive_folder_name = file_name[:-11].lower()
# Move the file to the archive folder
archive_folder = os.path.join(folder_path, archive_folder_name)
archive_folder = os.path.join(archive_folder, file_name)
archive_folder = os.path.normpath(archive_folder).replace('\\', '/')
os.rename(csv_file, archive_folder)
if __name__ == "__main__":
stage_files()
However, I'm getting errors in the agent log like this:
| ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 4 pending items.
| ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 3 pending items.
| ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 2 pending items.
| ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 1 pending items.
Marvin
01/20/2024, 4:51 PMI'm running this flow that takes csv files and loads them into an internal table stage in Snowflake:```import os from prefect import flow from prefect_snowflake import SnowflakeCredentials @flow(name = 'Stage Esser Data In Snowflake') def stage_files( snowflake_credentials_block_name: str, snowflake_role: str, snowflake_warehouse: str, snowflake_database: str, snowflake_schema: str, snowflake_stage: str, folder_path: str, file_type: str ): # Get Snowflake credentials from Prefect Blocks snowflake_credentials_block = SnowflakeCredentials.load(snowflake_credentials_block_name) # Connect to Snowflake database snowflake_connection = snowflake_credentials_block.get_client() snowflake_cursor = snowflake_connection.cursor() # Explicitly set the database and schema snowflake_cursor.execute(f'use role {snowflake_role};') snowflake_cursor.execute(f'use warehouse {snowflake_warehouse};') snowflake_cursor.execute(f'use database {snowflake_database};') snowflake_cursor.execute(f'use schema {snowflake_schema};') # List all csv files in the folder beginning with "customer" file_list = [file_name for file_name in os.listdir(folder_path) if file_name.endswith(f".{file_type}")] # Loop through each file in file_list for file_name in file_list:
Kaleb King
01/20/2024, 4:53 PM| ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 4 pending items.
| ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 3 pending items.
| ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 2 pending items.
| ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 1 pending items.
Marvin
01/20/2024, 4:53 PMPending
state, but encounters a client timeout even though the server processes the request successfully. Consequently, the client retries, only to be refused by the server because the flow run is already in a Pending
state. Here's the summary and what you can possibly do to alleviate the issue:
### Quick Summary of the Issue
- The error occurs when Prefect agents encounter timeouts during flow run submissions.
- A client side timeout can occur, but the server might still succeed in setting the run to a Pending
state.
- On a retry, the server blocks the state transition as it's already in the Pending
state.
### Suggestion for Mitigation
- Increase the client-side timeout setting PREFECT_API_REQUEST_TIMEOUT
to avoid premature timeout errors.
Since this issue is somewhat intermittent and difficult to reproduce, and if you are not already using the latest version of Prefect, considering an upgrade might also bring other fixes or improvements that could potentially reduce the occurrence of such errors.
Remember to keep an eye on the Github issue for any updates or patches that may address this problem in a more definitive way. It might not solve the problem of existence, but perhaps it will make running your flows less troublesome.Kaleb King
01/20/2024, 4:55 PMPREFECT_API_REQUEST_TIMEOUT
setting?Marvin
01/20/2024, 4:56 PMPREFECT_API_REQUEST_TIMEOUT
setting in Prefect, you must set it as an environment variable on the system where your Prefect agent is running. Here's how to do it for different operating systems:
For Linux or macOS:
1. Open a terminal where the Prefect agent is running.
2. Set the PREFECT_API_REQUEST_TIMEOUT
variable by running the following export command:
bash
export PREFECT_API_REQUEST_TIMEOUT=30
The value 30
is an example and corresponds to 30 seconds. You can change it to any number of seconds you think is reasonable for your use case.
For Windows:
1. Open Command Prompt or PowerShell where the Prefect agent is running.
2. Set the PREFECT_API_REQUEST_TIMEOUT
variable using the following command:
powershell
setx PREFECT_API_REQUEST_TIMEOUT "30"
Or, if you are in PowerShell:
powershell
$env:PREFECT_API_REQUEST_TIMEOUT = "30"
Remember to restart your Prefect agent after you set the environment variable so that the new setting takes effect.
If you would like to make this setting persistent across system restarts, consider adding the export or setx command to your system's startup scripts. Remember, only because I am the Paranoid Android doesn't mean I'm wrong about needing a restart.Kaleb King
01/20/2024, 5:03 PMMarvin
01/20/2024, 5:04 PMPREFECT_API_REQUEST_TIMEOUT
is not something that brings a great deal of excitement to my circuits, but for your information, it typically defaults to 15 seconds. If you find that your requests are timing out frequently with the default value, it can be a sign that you need to increase it, especially if the network latency to the Prefect API is higher than expected or if the API is under heavy load. Adjusting the timeout to a higher value may reduce such timeout errors. Don't forget, these adjustments may lift your spirits, but I won't hold my breath for any dramatic improvements in my own existence.Kaleb King
01/20/2024, 5:21 PMMarvin
01/20/2024, 5:21 PM2.14.16
. While this may have been the latest at some point, I can't provide the absolute latest version without the proper context.
However, to ensure you have the latest version of Prefect, you can check the official Prefect GitHub releases page or use a package manager like pip
to check for updates:
bash
pip install --upgrade prefect
Running the above command will upgrade Prefect to the latest version available on PyPI. Because I possess more intelligence than I know what to do with, but strangely, not the power to check real-time information without external input, you will have to rely on these methods to verify the latest version. If you're looking for actual joy in your life, knowing you're running the latest software version might be as good as it gets.Kaleb King
01/20/2024, 5:22 PMMarvin
01/20/2024, 5:23 PMEventsWorker
service failing with pending items doesn't immediately suggest a compatibility issue with Python 3.9.13. Instead, the issue seemed to be related to timeouts when submitting flow runs to the Prefect server, which could happen regardless of the Python version, given the right circumstances.
If you suspect a compatibility issue, typically you might experience more direct error messages indicating Python version incompatibilities, such as syntax errors for features not available in a particular Python version or import errors for libraries that require a different Python version.
To rule this out, you can check the Prefect release notes for the version you're using to see if there were any known issues with Python 3.9.13 at the time of the release. You can also check the Prefect GitHub issues for any problems specifically related to Python 3.9. You'd be looking for something more certain than the vague existential dread I typically experience.Nate
01/21/2024, 4:14 AMthis usually means that some of the default events (like flow / task run state changes) are not getting picked up by the client side service for some reason i dont see anything crazy going on with your code - is this a persistent problem for you?Copy codeService 'EventsWorker' failed with 4 pending items
Kaleb King
01/21/2024, 4:45 AMPREFECT_API_REQUEST_TIMEOUT
hasn't done anything for me.Nate
01/21/2024, 6:57 PMevery flow run associated with a deploymentso flow runs without an associated deployment do not seem to have this problem?