Soham
07/15/2024, 11:23 PMConnectError: [Errno 8] nodename nor servname provided, or not known.
Does Prefect not accommodate intermittent connectivity issues? Does it require a constant internet connection?Marvin
07/15/2024, 11:24 PMSoham
07/15/2024, 11:25 PMMarvin
07/15/2024, 11:25 PMpython
from prefect import flow, task, Flow
from prefect.engine.state import Failed
import time
import os
# Function to simulate connectivity check
def is_connected():
# Simulate network check (replace with actual check)
return os.system("ping -c 1 <http://google.com|google.com>") == 0
# Local cache for temporarily storing results
local_cache = []
@task(retries=3, retry_delay_seconds=5)
def process_data(data: int):
if not is_connected():
raise Exception("No internet connection")
# Process data (placeholder for actual processing logic)
result = data * 2
return result
@task
def cache_data_locally(data: int):
local_cache.append(data)
return data
@task
def sync_data():
if is_connected():
for data in local_cache:
process_data.submit(data)
local_cache.clear()
@flow
def my_flow(data: int):
try:
result = process_data(data)
except Failed:
result = cache_data_locally(data)
sync_data()
# To run the flow
if __name__ == "__main__":
my_flow(5)
This example includes:
- A connectivity check (replace with an actual network check).
- A process_data task that doubles the input value, with retry logic.
- A cache_data_locally task to store the data in a local list if the connection is down.
- A sync_data task to process and clear the local cache when connectivity is restored.
Feel free to adapt this to your specific requirements, and refer to the Prefect documentation for more advanced use cases.