Hi there! I’m running into an issue where an excep...
# ask-community
j
Hi there! I’m running into an issue where an exception is raised, my code still continues. This however doesn’t really agree with Prefect (understandably). How I notice this? I can see the exception being logged, and Prefect already noticed that the task failed. However, the task still continues, and I see more logs. Check the thread for the details ➡️
Logs
Copy code
20 May 2021,05:47:40 	prefect.WebsiteScraper	ERROR	Error occurred while scraping <https://website.com>
Traceback (most recent call last):
  File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 44, in run
    self._scrape()
  File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 57, in _scrape
  File "/Users/joell/joell.dev/prefect-joell/env/lib/python3.8/site-packages/backoff/_sync.py", line 94, in retry
  File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 136, in _parse_products
  File "/usr/local/lib/python3.8/site-packages/selenium/webdriver/support/wait.py", line 80, in until
    raise TimeoutException(message, screen, stacktrace)
selenium.common.exceptions.TimeoutException: Message: 

20 May 2021,05:47:41 	prefect.WebsiteScraper	INFO	Saved dump to local folder
20 May 2021,05:47:41 	prefect.CloudTaskRunner	ERROR	Unexpected error: TimeoutException('', None, None)
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 56, in run
    raise exception
  File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 44, in run
    self._scrape()
  File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 57, in _scrape
  File "/Users/joell/joell.dev/prefect-joell/env/lib/python3.8/site-packages/backoff/_sync.py", line 94, in retry
  File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 136, in _parse_products
  File "/usr/local/lib/python3.8/site-packages/selenium/webdriver/support/wait.py", line 80, in until
    raise TimeoutException(message, screen, stacktrace)
selenium.common.exceptions.TimeoutException: Message: 

20 May 2021,05:47:41 	prefect.CloudTaskRunner	INFO	Task 'WebsiteScraper': Finished task run for task with final state: 'Failed'
20 May 2021,05:47:41 	prefect.CloudFlowRunner	INFO	Flow run FAILED: some reference tasks failed.
20 May 2021,05:47:53 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
20 May 2021,05:48:04 	prefect.WebsiteScraper	INFO	No cookie wall
20 May 2021,05:48:08 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
20 May 2021,05:48:08 	prefect.WebsiteScraper	INFO	Category/?start=1416&sz=24 scraped 24 products (1439 total), page 60/123
20 May 2021,05:48:23 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
20 May 2021,05:48:23 	prefect.WebsiteScraper	INFO	Category/?start=1440&sz=24 scraped 24 products (1463 total), page 61/123
20 May 2021,05:48:35 	prefect.WebsiteScraper	INFO	Category/?start=1464&sz=24 scraped 24 products (1487 total), page 62/123
Code
Copy code
# Base Task
class BaseScraperTask(Task):
    # Init code ...

    def run(self):
        # Init code ...
        try:
            self._scrape()
        except Exception as exception:
            # Exception logging ...

            # NOTE: this log line
            <http://self.logger.info|self.logger.info>("Saved dump to local folder")
            
            # Reraise to notify Prefect of failure
            raise exception
        
    @abc.abstractmethod
    def _scrape(self):
        raise RuntimeError("Not implemented")


# Derived

class WebsiteScraperTask(BaseScraperTask):
    # Init code ... 

    def _scrape(self):
        # Code ...
        for page in range(1, page_count + 1):
            # Code ...
            
            products = self._parse_products(page, current_position)
            
            # NOTE: this log line
            <http://self.logger.info|self.logger.info>(
                f"{'/'.join(url.split('/')[3:])} scraped {len(products)} products ({current_position} total), page {page}/{page_count}"
            )

            self._navigate_next_page()
Note the log line,
Saved dump to local folder
is printed, after which it continues to operate and scrape more pages. Even though, the exception should be reraised
k
Hi @Joël Luijmes, Prefect uses it’s own signals to communicate changes in state. These signals are exceptions also. You want to use
prefect.engine.signals.FAIL
. You can
raise FAIL
and this will stop the flow.
j
Okay, but I guess my issue lies more with my python code? (Or maybe in combination with prefect and/or selenium, that may put things of track). I’m getting an exception somewhere in WebsiteScraperTask._scrape function, as that function doesn’t handle the exception, it is propagated to the BaseScraperTask.run right? There I log the exception, and propagate it through reraising it, making prefect aware of the failure (hopefully).
Somehow Prefect is aware of the propagated exception (it marks it as failed, and is logging Flow run in no longer running state). However my _scrape keeps on working. Do you have any clues?
Seems like magic 🪄
k
I think the awkwardness here comes from the fact that in general the
try-except
kind of logic defeats the things Prefect provides. In this specific case, we have
state_handlers
that are functions that execute upon failure of the task. You would log out there as opposed to the except block.
And let Prefect handle the exceptions being passed on it’s side.
Prefect is built so you don’t have to handle failure and edit your code like that
j
Yes I’m aware of the prefect functionalities such as state_handlers. Although I’m not sure if they are suited in this case, as I need to refer to certain local members of the class during except/finally (omitted above). My issue I’m trying to describe is, that the exception is (I believe) correctly propagated back up to Prefect. However, the code in the try block is still running. This shouldn’t happen right?
Or is this because somehow my reraise within Prefect does something funky? And I should raise a FAIL signal as you describe?
k
Ah I see what you mean with the local members of the class. Ohh my bad. I re-read. I thought it was downstream code still running. Yeah that’s weird.
j
Nope, it is the actual code in the try block that keeps on running. So i’m trying to find out : • if Prefect may do something causing this • I don’t understand python excpetions • Some code may trigger this (selenium or backoff lib)
k
Yeah looking into it more
Ok I’ll try to replicate
j
That be great 🙂 I’ve tried locally debugging, but then the flow exits correctly. So maybe in combination when ran from a Kubernets agent or something 🤷
k
I tried this but it seems to be exiting successfully
Copy code
from prefect import Task, Flow

from abc import abstractmethod

class BaseTask(Task):

    @abstractmethod
    def _process(self):
        raise RuntimeError("Not implemented")

    def run(self):
        try:
            self._process()
        except Exception as exception:
            <http://self.logger.info|self.logger.info>("The exception is reached")
            raise exception

class NextTask(BaseTask):

    def _process(self):
        x = list(range(5))
        x[2] = "test" # Cause an error
        for i in x:
            <http://self.logger.info|self.logger.info>(f"Currently processing {i}")
            i = i + 1
        return

next_task = NextTask()

with Flow('test') as flow:
    next_task()

flow.run()
You’re not mapping or parallelizing anything right? Are you using Dask?
j
Nope, nothing of that kind. Haven’t configured an executor at all. I’m running on Kubernetes. I think maybe Selenium may interfer or something. is going wrong within backoff (retry library which i hadn’t used before). But at least I can rule out I’m not going crazy and the exception should make the code stop 😛
Thanks for trying to reproduce, I guess I’m off to dive more into debugging. To see if selenium has any influence or backoff
k
I see….sorry about the initial confusion there.
j
That’s okay, thanks for thinking with me 😉 Myabe I’ll find it out and i can report it back
👍 1