https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • w

    Will

    10/29/2021, 3:10 PM
    Hi all, I'm looking at configuring logging for our prefect setup. We use [structlog](https://www.structlog.org) across our python services to format our log entries which are typically sent to cloudwatch log groups; these are then ingested by centralised logging (ELK stack). I'd like to know if it's possible to intercept all logs getting sent to Prefect cloud and duplicate them to stdout, applying formatting (we use JSON). Probably a complicated request; any help much appreciated!
    k
    w
    • 3
    • 11
  • s

    Sahil Chopra

    10/29/2021, 5:25 PM
    Are there any examples of how to use Email Task with inputs taken from another task? I have a
    monitor_scrape
    task that is monitoring the status of a long running job. At the completion of this task, I want to shoot out an email that provides stats outputted from that task. I’m passing the output of the
    monitor_scrape
    task into my email task as the message; but the email seems to be firing independently (see diagram). Any pointers on what I might be doing wrong? Flow Code:
    completion_email_task = EmailTask(email_to='xxx', email_to_cc='xxx')
    @task()
    def monitor_scraper_run_status() -> Output:
        # task code here
    def create_flow(flow_name: str) -> Flow:
        with Flow(flow_name,
                  state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
                                  ]) as flow:
            output = monitor_scraper_run_status()
            completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output.__repr__())
        return flow
    k
    a
    • 3
    • 8
  • p

    Payam K

    10/29/2021, 8:00 PM
    Hello All. I get this error in Prefect UI. I am trying to run a job in Fargate and use a custom image in ECR An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Fargate requires task definition to have execution role ARN to support ECR images.
    a
    • 2
    • 21
  • j

    Jacob Bedard

    10/29/2021, 11:02 PM
    I'm sure I'm doing something wrong here, but I get the oddest message when I try to schedule a flow that I registered:
    Unexpected error: TypeError("cannot pickle '_thread.lock' object")
    Can someone let me know what this is if they've had this same issue? My flow runs ok from the machine where I have the agent, but I can't get it to run when I attempt scheduling or even doing an ad-hoc run from the prefect cloud UI
    j
    m
    a
    • 4
    • 13
  • h

    haf

    10/30/2021, 4:07 PM
    I had a flow with DAG like
    A -> B[0..N] -> C[0..N]
    Sometimes some index N fails and then C never runs, because at least one of the upstream Mapped tasks failed. However, I'd always like to run C for the indices that worked; and the flow itself I want to be marked successful despite a few mappings failing. How could I configure this?
    k
    • 2
    • 13
  • w

    William Grim

    10/30/2021, 7:26 PM
    Does anyone here have a problem with prefect agent not flushing stdout or just contents getting lost when doing something like
    prefect agent local start -l mylabel --show-flow-logs 2>&1 | tee /my/file/here.log
    ? I've got a very long-running flow, and I don't see any output. Is prefect buffering? Do I need to use
    stdbuf
    when running the process?
    k
    • 2
    • 5
  • m

    Mike Lev

    10/30/2021, 7:44 PM
    hey all trying to run dependent flows where I pass the output of a flow to the next flow… the docs on dependent flows were not super specific any ideas on how to approach this use case?
    k
    • 2
    • 3
  • m

    mr Memedi

    10/30/2021, 10:13 PM
    Hello prefecters, I was wondering if it's possible to run only a part of your flow. Lets say there are 3 tasks in a flow and you are only interested in re-running task 2 & 3.
    k
    • 2
    • 1
  • f

    Fina Silva-Santisteban

    10/31/2021, 2:37 PM
    Hi everyone! Is there a way to get/download the timeline summary of a flow run? E.g. a table and/or bar chart with completion time for each task? That would help with benchmarking and identifying bottlenecks. If I take one of our examples, you can see that there’s a task that takes a long time to run in the middle, and some that take less. It gives me an idea of potential bottlenecks and how the completion times relate to each other, so that’s good, but I don’t know how long each task actually takes unless I hover over each value. The first impression of the task in the middle being a bottleneck is still valid, but since it only takes 1minute to run we might be ok with that bottleneck. A summary, at least in table form, would help out a lot!
    a
    k
    • 3
    • 7
  • e

    Eric Feldman

    10/31/2021, 2:57 PM
    Hi 🙂 I want to pass a
    pd.DataFrame
    to a task, but it failed when querying GraphQL since
    pd.DataFrame
    isn’t json serializable
    ~/Library/Caches/pypoetry/virtualenvs/recipe-t7nMXg6Y-py3.8/lib/python3.8/site-packages/prefect/client/client.py in graphql(self, query, raise_on_error, headers, variables, token, retry_on_api_error)
        550             server=self.api_server,
        551             headers=headers,
    --> 552             params=dict(query=parse_graphql(query), variables=json.dumps(variables)),
        553             token=token,
        554             retry_on_api_error=retry_on_api_error,
    I can pass
    df.dict()
    and deserialize it, but I rather not
    a
    k
    • 3
    • 3
  • t

    Todd de Quincey

    10/31/2021, 5:39 PM
    Hi all, I'm exploring using Prefect for a project instead of Airflow, but a bit confused about what Prefect Cloud provides. I have setup a Prefect Cloud account, registered my first flow etc, but I then need to either have a local or Docker etc agent running. If I am on the Prefect Cloud free tier, does this not include an agent as part of the tier? Or is the idea that we need to setup our own server for an agent and then register this with Prefect Cloud? Many thanks Todd
    a
    k
    • 3
    • 10
  • h

    Harish

    10/31/2021, 10:16 PM
    Hi all, I was exploring Prefect Automations and was wondering if we could run a flow when an another flow has been cancelled from the UI? I'm trying to catch the cases where a user would cancel a flow from the UI either as a separate flow/in state handler..
    k
    • 2
    • 5
  • d

    Daniel Siles

    11/01/2021, 9:15 AM
    Hi! I’m trying to map over 1k urls (that will eventually be 1 million) but a simple task takes too much time.
    @task()
        def transform_url(url):
            try:
                if '?' in url:
                    return url[:url.find('?')]
            except Exception:
                pass
            return url
    
        urls = #list with 1k urls
    
        with Flow("flow") as flow:
            transformed_urls = transform_url.map(urls)
    
        import time
        start = time.time()
        flow.run()
        print(time.time() - start)
    Execution time: 31.85s Is there a way to reduce the overhead of a task?
    a
    • 2
    • 1
  • m

    Marko Herkaliuk

    11/01/2021, 9:27 AM
    Hi, I noticed this situation several times last week. During the registration of an updated Flow, we have notification of 10 flow runs with the status of a Cancel arrival. It looks as if in the process of registration he creates a new flow (canceled flow has a different id), then realizes that such a flow already exists and makes cancellation of its 10 scheduled launches, and then updates the old flow. This is not critical, but maybe someone else has encountered it. We use Cloud.
    a
    • 2
    • 6
  • e

    Eric Feldman

    11/01/2021, 12:23 PM
    Hi, I have a very small flow (visualization attached) but the execution of this flow tasks around 6 seconds, for only 1 record Why is it so slow? can I make it faster somehow?
    a
    k
    • 3
    • 17
  • a

    Adam Brusselback

    11/01/2021, 2:18 PM
    With prefect server and localagent is there any way to limit job execution concurrency? If I get 1000 jobs submitted in one min, but that server can really only handle executing 150 at a time before it falls over, what ways do I have of improving this situation?
    k
    • 2
    • 17
  • j

    Jean-Michel Provencher

    11/01/2021, 3:00 PM
    Hi 👋 , is there a way in the CLI to actually deregister a flow? How do people actually manage flows at scale in their CI/CD when they want to remove existing flow to make sure they don’t execute in the future.
    k
    • 2
    • 3
  • j

    Joe

    11/01/2021, 3:40 PM
    Hi. I'm trying to setup the Prefect server locally using the output from
    prefect server config
    . I tried to create a project in the UI, but I get an "uh-oh" error - looking at the GraphQL response I can see the error is "Variable "$tenantId" of non-null type "UUID!" must not be null.". I think a tenant is a team, and if I try to update the team name from the profile page I see a similar GraphQL error: "Field update_tenant_name_input.tenant_id of required type UUID! was not provided". If I try to create a project using the CLI (
    prefect create project "Test Project"
    ), I get the error: "You have not set an API key for authentication" - I can't see a way to setup an API key that doesn't seem to related to Prefect Cloud? If I try to create a tenant with the CLI (
    prefect server create-tenant --name="Test"
    ) I get: "To create a tenant with Prefect Cloud, please signup at ...". Have I somehow missed creating the 'default' team - is there a way I can fix that?
    m
    • 2
    • 2
  • p

    Payam K

    11/01/2021, 3:42 PM
    Here is general question that I am really wondering about. What is the benefit of using Prefect and ECR Fargate together to run a machine learning end to end project while you can use SageMaker pipelines and parallelize your work? I like using Prefect and want to understand the benefits that I am not aware of. Thanks.
    k
    • 2
    • 2
  • j

    Joe Still

    11/01/2021, 5:31 PM
    Howdy again, I am trying to wrap my head around how to deliver MLOps Level 1 using Prefect for the automated pipeline. Can anyone point me to a resource that addresses dev/stage/prod pipeline deployment with Prefect? For Airflow I believe I can treat it like a common application deployment building my DAGs into a docker image I deploy at each stage. Maybe I just don’t understand how Prefect flows are managed in this way or am thinking incorrectly about pipeline promotion through dev/stage/prod. Thoughts?
    k
    • 2
    • 6
  • v

    Vamsi Reddy

    11/01/2021, 6:01 PM
    Hi everyone, I am trying to send slack notifications for each and every task if it is a succeeds or fails. do i have to specify the state_handler for each task as or specify it once in the
    with Flow(state_handler=[post_to_Slack]):
    or
    @task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(minutes=5),state_handler=[post_to_slack])
    def some_task():
       return something
    k
    • 2
    • 6
  • a

    Andrey Kozhevnikov

    11/01/2021, 6:20 PM
    Hi everyone
    👋 1
    k
    • 2
    • 1
  • g

    Guozheng Kuang

    11/01/2021, 6:23 PM
    Hi Everyone, has anyone used BQTask before?: https://docs.prefect.io/api/latest/tasks/gcp.html
    k
    a
    • 3
    • 5
  • g

    Guozheng Kuang

    11/01/2021, 6:23 PM
    Is there any example code that I can follow?
  • v

    Vamsi Reddy

    11/01/2021, 6:37 PM
    Hi Everyone, is it possible to have a task that will only run if a flow fails? will i be able to use the state of a flow to run a task after the flow fails ?
    k
    a
    • 3
    • 5
  • k

    Kevin Kho

    11/01/2021, 9:30 PM
    Hello everyone, we have formed a Prefect Community Trivia team and joined a league. :marvin: Anyone want to join? If you can’t make the designated time slot, you can still do it through a form and your score still counts towards the team score.
    ➕ 1
  • p

    Phil

    11/01/2021, 10:01 PM
    Hi folks, I am relatively new to prefect and may have a beginner question that I hope you can help me out. I have not been able to answer it with the forum or documentation by myself so far. This is my use case: I want to extract some symbols from my internal database (A) to extract external data from the web for those symbols in the form of a Pandas DataFrame (B). After that I transform and enrich the extracted external data by some information also as a Pandas DataFrame (C). Tasks B and C should be processed in parallel by calling the mapping function. Task A is called by the normal run method. Now I would like to have the results from Task C reduced to a list of DataFrames to load them to disk as one feather file (D). According to the documentation the reduce step is done automatically if I call the run method and the task expects a list. Up to task C, everything works fine. But I’ve some trouble with the reduce step and my custom class for task D. Task D expects a list of DataFrames (as expected from the reduce step) and calls the pandas.concat function to merge the results back into one DataFrame to finally save it to disk as a single file. But if I run the flow with my class, the following error occurs:
    TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
    If I got it right, Task D only gets a reference to the previous task object instead of the reduced list of DataFrames. The same logic with the task decorator, on the other hand, works without any problems. There is no difference in both logics, which is why I am quite irritated by the different results. Please, can someone help me out? It's probably just a small thing, but I just can't find it 😕
    ✅ 1
    k
    • 2
    • 6
  • d

    Donny Flynn

    11/01/2021, 10:25 PM
    ❓ I have a Flow with three tasks, but I also have 10 secrets to retrieve from Prefect Cloud. When I tried to push those retrievals a level of abstraction below the flow like:
    with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
         Task_with_secret_pulling()
    The task failed with this error message: `Error during execution of task: ValueError("Could not infer an active Flow context while creating edge to <Task: LessThan>. This often means you called a task outside a
    with Flow(...)
    block. If you're trying to run this task outside of a Flow context, you need to call `LessThan(...).run(...)`")`. Then I brought all secrets to the flow level which created them as separate tasks, but I am curious: is it the Prefect intention to have each "get" of a Secret be a separate task?
    k
    • 2
    • 2
  • t

    Todd de Quincey

    11/02/2021, 6:36 AM
    Passing runtime arguments via the functional API at task instantiation
    a
    k
    • 3
    • 12
  • c

    Charles Hunt

    11/02/2021, 9:04 AM
    Hi - I am looking for a bit of help. I have created some python flows and tasks which mostly execute against the k8 agent on azure just fine, the task makes a REST put call on each flow, but some longer running calls get an error after 30 mins, and an exception is raised. I am certain the REST endpoint is ok, because I can run a call which lasts 1 hour without any problem outside of the flow. I am pretty new to python; but I will do my best to describe the whole scenario. The exception appears to be raised within the urllib3 library - the timeout I set is a single value of 14400 seconds (4 hours) but I have 4 flows out of 20+ that are long running and each of them blows up after 30 mins. I have isolated the problem to either the python code or the k8 agent, but I do not know which. any suggestions would be really appreciated! thanks and regards charles
    a
    • 2
    • 34
Powered by Linen
Title
c

Charles Hunt

11/02/2021, 9:04 AM
Hi - I am looking for a bit of help. I have created some python flows and tasks which mostly execute against the k8 agent on azure just fine, the task makes a REST put call on each flow, but some longer running calls get an error after 30 mins, and an exception is raised. I am certain the REST endpoint is ok, because I can run a call which lasts 1 hour without any problem outside of the flow. I am pretty new to python; but I will do my best to describe the whole scenario. The exception appears to be raised within the urllib3 library - the timeout I set is a single value of 14400 seconds (4 hours) but I have 4 flows out of 20+ that are long running and each of them blows up after 30 mins. I have isolated the problem to either the python code or the k8 agent, but I do not know which. any suggestions would be really appreciated! thanks and regards charles
a

Anna Geller

11/02/2021, 9:44 AM
Hi @Charles Hunt, could you perhaps move the traceback and all subsequent messages to this thread so that we can keep the main channel a bit cleaner? 🙂 It looks like there can be several issues here, but we need to look at them step by step: 1. Before moving to Kubernetes, could you show me your Flow so that I could reproduce locally and check if there are no issues there? 2. Then, when it comes to the Kubernetes agent on Azure - how did you start it? did you assign any labels to this agent? When it comes to the exception handling: • you generally don’t need to do that in Prefect - instead of suppressing errors, Prefect will use it to provide visibility when tasks in your Flow fail, and let you take action based on that e.g. set notification on failure • what is this POST request doing - does it return something immediately?
c

Charles Hunt

11/02/2021, 10:49 AM
the python code to call the REST api is:
#call the simpleingest endpoint
 response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)
the exception is caught in a try-except block with
except:
     logger.error("General runtime error during API call.")
     raise RuntimeError("General runtime error during API call.")
What I would like to happen is to avoid the exception being raised at all and the request to wait nicely until the REST api has completed
I am using the requests urllib3 library, and the error (and traceback) I get is:
Task 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer
# prefect libraries for accessing prefect runtime api
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret

# psycp[g2 libraries for accessing PostgreSQL
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor

# used for making API calls
import requests
# used for date functions
from datetime import datetime, timedelta

# have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True

@task(log_stdout=True)
def ingest_task():
    logger = prefect.context.get("logger")
    apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
    headers = {
        'Content-type':'application/json',
        'api-key':'secretkey'
    }

    # get db config
    dbconfig = Secret("fsdb").get()

    # set up default hwm date and days
    hwmdate = (datetime.today() - timedelta(days = 1)).strftime('%Y/%m/%d')
    days = 0
    
    # get db hwm
    with connect(f"dbname={dbconfig['dbname']} host={dbconfig['host']} user={dbconfig['user']} password={dbconfig['password']} sslmode=require", cursor_factory=RealDictCursor) as pgcon:
        with pgcon.cursor() as pgcursor:

            # run the sql
            pgcursor.execute("SELECT max(next_max_hwm) as hwm FROM staging.high_water_marks where source_table = 'AAAAA_BBBBBB';")
            if pgcursor.rowcount > 0 :
                # extract the result
                pgrecord = pgcursor.fetchone()
                <http://logger.info|logger.info>(f"Control: High water mark date: {pgrecord['hwm']}")
                hwm = datetime.date(pgrecord['hwm'])                
                hwmdate = hwm.strftime('%Y/%m/%d')
                days = (datetime.date(datetime.today()) - hwm).days    
                <http://logger.info|logger.info>(f"Number of days delta: {days}")        
                if days > 0 :
                    requestdata = f"{{'JobName': 'delta_prod','DataJobId': 105, 'JobPathId': 'YYYYYYYY', 'DataJobType': 'Bounces', 'StartDate': '{hwmdate}', 'NumberOfDays': {days}, 'Identifiers': '1D3E4DAC-00000-0000-0000-10BD9A9869BC,ZZZ'}}"

                    # log the endpoint
                    <http://logger.info|logger.info>(apiendpointurl)

                    # log the inputs
                    <http://logger.info|logger.info>(requestdata)
                    
                    try:
                        #call the simpleingest endpoint
                        response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)

                        if response.text.__contains__("completedSuccess") == False:
                            logger.error(response.json())
                            raise RuntimeError('SmartDataLoader API did not complete successfully')      

                    except RuntimeError as exc:
                        logger.error("API runtime failure.")
                        raise RuntimeError from exc
                    except TimeoutError as exc:
                        logger.error("API Timeout")
                        raise TimeoutError from exc
                    except:
                        logger.error("General runtime error during API call.")
                        raise RuntimeError("General runtime error during API call.")
                    else:
                        <http://logger.info|logger.info>(response.json())
                else:
                    <http://logger.info|logger.info>("Up to date with delta")

with Flow("git-prod-bronto-bounces-flow") as flow:
    ingest_task()

flow.storage = GitHub(
    repo="XXXXXXX",
    path="YYYYYYYY/flows/production/bronto/bronto_bounces_delta_flow.py",
    access_token_secret="GITHUB_ACCESS_TOKEN"
    )
flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
flow.register(project_name="YYYYYYYY")
Hi @Anna Geller thank you for your response- sorry about posting follow on messages on the main thread - I had not intended to do that - I thought I had posted on the question thread
@Anna Geller I have posted the python code - this is a code pattern I have deployed 20+ times and it works very reliably on flows that take less than 30 mins to complete
to answer your questions: 1. the flow is posted above 2. I am not sure how the agent was started - it was deployed by a colleague - I expect using the default labels.
The requests.post makes a request to a listener which goes off to perform an ingest task - when it is complete a response is issued with status and error information.
you can probably see in the code, although it traps exceptions it raises them again to ensure the flow is flagged with a failure.
03:26:13
ERROR
ingest_task
General runtime error during API call.
03:26:13
ERROR
CloudTaskRunner
Task 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 532, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.7/site-packages/urllib3/packages/six.py", line 769, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<string>", line 64, in ingest_task
  File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 119, in post
    return request('post', url, data=data, json=json, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
    logger=self.logger,
  File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "<string>", line 78, in ingest_task
RuntimeError: General runtime error during API call.
My hunch is either some kind of missing configuration/option on the requests object which means the http long running request connection is reset by the python stack or the container aborts the connection. But I don't know much about either python or the container config to know if either of those are correct. The testing I have done on the api host indicate that it is not resetting the connection as I can execute a request running for more than 1 hour without issue using a tool (which I wrote in another language) to make ad hoc requests against the api. (written in C#)
a

Anna Geller

11/02/2021, 11:28 AM
first of all, thank you so much for moving all the information into the thread! 🙏 I researched about this a bit and it looks like your issue may occur due to Azure Load Balancer dropping connections in AKS Kubernetes cluster, as described here. The issue was already reported by the community in this thread - sharing in case you want to have a look. Main findings: • Azure has given the advice to increase the idle timeout on the load balancer but acc. to a person from the community, this solutions did not fix the problem, and the connection reset was still appearing after roughly 4 minutes • There is an open PR to fix this: https://github.com/PrefectHQ/prefect/pull/5066
c

Charles Hunt

11/02/2021, 11:37 AM
@Anna Geller The REST api used to be hosted by the azure web application service which indeed does have a 4 minute time out, which we ran into and have since mitigated by re-hosting the API onto an Azure Linux VM, so the 4 minute problem has been worked around. this issue is 30 minutes. As I mentioned I can run a long running request against the Azure VM for at least 60 minutes without any problem. The AKS related post mentions a 4 minute threshold but I don't know if that is the issue or not. I will try and test the python code outside of the k8 agent to see if it can work.
a

Anna Geller

11/02/2021, 11:41 AM
@Charles Hunt you’re 100% correct with that. If this is urgent for you, perhaps you can try the fix from the PR (i.e. deploy a new AKS Kubernetes agent using the code from the PR) and chime in with your findings. Alternatively, as a temporary workaround, perhaps you can (as you already mentioned): • create a VM on Azure, start a LocalAgent there and run this flow on a local agent, until the AKS Azure issue is fixed and released • temporarily disable load balancer for this specific cluster, and see if the issue occurs again
c

Charles Hunt

11/02/2021, 1:04 PM
@Anna Geller Thank you for the ideas. I think we will try running a local agent on the VM - do you have a link to a resource on how to set that up?
a

Anna Geller

11/02/2021, 1:07 PM
Sure! But it’s really a single command after installing Prefect:
prefect agent local start
But you may want to set up a Supervisord to make it run as a background process: • Docs: https://docs.prefect.io/orchestration/agents/local.html#flow-configuration • Supervisor: https://docs.prefect.io/orchestration/agents/local.html#using-with-supervisor
c

Charles Hunt

11/05/2021, 12:00 PM
@Anna Geller I have set up a local agent on the azure VM and it is working well - all the tasks that worked against the K8 agent are still performing well, but now curiously now have almost the opposite problem with the longer running flows. The local agent is installed and running on the same host as the REST endpoint listener. The python REST call waits indefinitely even though I can see the REST endpoint has completed the request and logs that the request has completed and has a successful (no error) response - except the python based REST call just sits and waits and waits and waits. I think I must be making a silly mistake, but the same code pattern I am using works well against the shorter running tasks.
import socket
from urllib3.connection import HTTPConnection
TCP_KEEPALIVE = 0x10
HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
    (socket.IPPROTO_TCP, TCP_KEEPALIVE, 60),
]
I found this bit of code which suggests that the keepalive needs to be configured on the HTTPConnection
I'm going to give it a go
a

Anna Geller

11/05/2021, 12:10 PM
Does this job run for a very long time?
c

Charles Hunt

11/05/2021, 12:10 PM
the REST endpoint takes about 10 mins (600 secs) to respond if there is data available to process.
a

Anna Geller

11/05/2021, 12:10 PM
my idea was that since you mentioned you run your agent on the same host as the API now, that you may change this to localhost:
apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
c

Charles Hunt

11/05/2021, 12:11 PM
ok - I will try
apiendpointurl = '<http://localhost:5000/SimpleIngest>'
a

Anna Geller

11/05/2021, 12:15 PM
overall, it’s hard for me to help since I don’t know your API. But I would try determining which component is a root cause here. You can try running this ingest job as a normal Python script without Prefect so that you can check whether this is a Prefect issue or a general DevOps configuration issue with the VM or API. If Prefect is not an issue, then you could investigate the API and networking a bit more.
c

Charles Hunt

11/05/2021, 12:26 PM
I have successfully been able to run long running API requests against the VM based endpoint on azure - so I eliminated that as the root cause - I believe now the issue resides in the python urllib3 stack with perhaps the complication of some azure networking.
so I am trying the "localhost" idea in the hope the urllib3 python stack behaves better
I am not a native python developer, but of course it is the language of choice for writing the flows - I feel I have got some way to getting what I need from this, but it is just a few troubling flows that are tripping me up and it all revolves around the REST api request.
I just tested the flow using the "localhost:5000" - and it seemed to work - the response came back after 10 mins
👏
Thank you @Anna Geller - your ideas really helped me.
🙌 1
a

Anna Geller

11/05/2021, 12:32 PM
Nice work! Great to hear that!
View count: 3