Thread
#prefect-community
    Joël Luijmes

    Joël Luijmes

    1 year ago
    Hi there! I’m running into an issue where an exception is raised, my code still continues. This however doesn’t really agree with Prefect (understandably). How I notice this? I can see the exception being logged, and Prefect already noticed that the task failed. However, the task still continues, and I see more logs. Check the thread for the details ➡️
    Logs
    20 May 2021,05:47:40 	prefect.WebsiteScraper	ERROR	Error occurred while scraping <https://website.com>
    Traceback (most recent call last):
      File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 44, in run
        self._scrape()
      File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 57, in _scrape
      File "/Users/joell/joell.dev/prefect-joell/env/lib/python3.8/site-packages/backoff/_sync.py", line 94, in retry
      File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 136, in _parse_products
      File "/usr/local/lib/python3.8/site-packages/selenium/webdriver/support/wait.py", line 80, in until
        raise TimeoutException(message, screen, stacktrace)
    selenium.common.exceptions.TimeoutException: Message: 
    
    20 May 2021,05:47:41 	prefect.WebsiteScraper	INFO	Saved dump to local folder
    20 May 2021,05:47:41 	prefect.CloudTaskRunner	ERROR	Unexpected error: TimeoutException('', None, None)
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 56, in run
        raise exception
      File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 44, in run
        self._scrape()
      File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 57, in _scrape
      File "/Users/joell/joell.dev/prefect-joell/env/lib/python3.8/site-packages/backoff/_sync.py", line 94, in retry
      File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 136, in _parse_products
      File "/usr/local/lib/python3.8/site-packages/selenium/webdriver/support/wait.py", line 80, in until
        raise TimeoutException(message, screen, stacktrace)
    selenium.common.exceptions.TimeoutException: Message: 
    
    20 May 2021,05:47:41 	prefect.CloudTaskRunner	INFO	Task 'WebsiteScraper': Finished task run for task with final state: 'Failed'
    20 May 2021,05:47:41 	prefect.CloudFlowRunner	INFO	Flow run FAILED: some reference tasks failed.
    20 May 2021,05:47:53 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
    20 May 2021,05:48:04 	prefect.WebsiteScraper	INFO	No cookie wall
    20 May 2021,05:48:08 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
    20 May 2021,05:48:08 	prefect.WebsiteScraper	INFO	Category/?start=1416&sz=24 scraped 24 products (1439 total), page 60/123
    20 May 2021,05:48:23 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
    20 May 2021,05:48:23 	prefect.WebsiteScraper	INFO	Category/?start=1440&sz=24 scraped 24 products (1463 total), page 61/123
    20 May 2021,05:48:35 	prefect.WebsiteScraper	INFO	Category/?start=1464&sz=24 scraped 24 products (1487 total), page 62/123
    Code
    # Base Task
    class BaseScraperTask(Task):
        # Init code ...
    
        def run(self):
            # Init code ...
            try:
                self._scrape()
            except Exception as exception:
                # Exception logging ...
    
                # NOTE: this log line
                <http://self.logger.info|self.logger.info>("Saved dump to local folder")
                
                # Reraise to notify Prefect of failure
                raise exception
            
        @abc.abstractmethod
        def _scrape(self):
            raise RuntimeError("Not implemented")
    
    
    # Derived
    
    class WebsiteScraperTask(BaseScraperTask):
        # Init code ... 
    
        def _scrape(self):
            # Code ...
            for page in range(1, page_count + 1):
                # Code ...
                
                products = self._parse_products(page, current_position)
                
                # NOTE: this log line
                <http://self.logger.info|self.logger.info>(
                    f"{'/'.join(url.split('/')[3:])} scraped {len(products)} products ({current_position} total), page {page}/{page_count}"
                )
    
                self._navigate_next_page()
    Note the log line,
    Saved dump to local folder
    is printed, after which it continues to operate and scrape more pages. Even though, the exception should be reraised
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Joël Luijmes, Prefect uses it’s own signals to communicate changes in state. These signals are exceptions also. You want to use
    prefect.engine.signals.FAIL
    . You can
    raise FAIL
    and this will stop the flow.
    Joël Luijmes

    Joël Luijmes

    1 year ago
    Okay, but I guess my issue lies more with my python code? (Or maybe in combination with prefect and/or selenium, that may put things of track). I’m getting an exception somewhere in WebsiteScraperTask._scrape function, as that function doesn’t handle the exception, it is propagated to the BaseScraperTask.run right? There I log the exception, and propagate it through reraising it, making prefect aware of the failure (hopefully).
    Somehow Prefect is aware of the propagated exception (it marks it as failed, and is logging Flow run in no longer running state). However my _scrape keeps on working. Do you have any clues?
    Seems like magic :magic_wand:
    Kevin Kho

    Kevin Kho

    1 year ago
    I think the awkwardness here comes from the fact that in general the
    try-except
    kind of logic defeats the things Prefect provides. In this specific case, we have
    state_handlers
    that are functions that execute upon failure of the task. You would log out there as opposed to the except block.
    And let Prefect handle the exceptions being passed on it’s side.
    Prefect is built so you don’t have to handle failure and edit your code like that
    Joël Luijmes

    Joël Luijmes

    1 year ago
    Yes I’m aware of the prefect functionalities such as state_handlers. Although I’m not sure if they are suited in this case, as I need to refer to certain local members of the class during except/finally (omitted above). My issue I’m trying to describe is, that the exception is (I believe) correctly propagated back up to Prefect. However, the code in the try block is still running. This shouldn’t happen right?
    Or is this because somehow my reraise within Prefect does something funky? And I should raise a FAIL signal as you describe?
    Kevin Kho

    Kevin Kho

    1 year ago
    Ah I see what you mean with the local members of the class. Ohh my bad. I re-read. I thought it was downstream code still running. Yeah that’s weird.
    Joël Luijmes

    Joël Luijmes

    1 year ago
    Nope, it is the actual code in the try block that keeps on running. So i’m trying to find out : • if Prefect may do something causing this • I don’t understand python excpetions • Some code may trigger this (selenium or backoff lib)
    Kevin Kho

    Kevin Kho

    1 year ago
    Yeah looking into it more
    Ok I’ll try to replicate
    Joël Luijmes

    Joël Luijmes

    1 year ago
    That be great 🙂 I’ve tried locally debugging, but then the flow exits correctly. So maybe in combination when ran from a Kubernets agent or something 🤷
    Kevin Kho

    Kevin Kho

    1 year ago
    I tried this but it seems to be exiting successfully
    from prefect import Task, Flow
    
    from abc import abstractmethod
    
    class BaseTask(Task):
    
        @abstractmethod
        def _process(self):
            raise RuntimeError("Not implemented")
    
        def run(self):
            try:
                self._process()
            except Exception as exception:
                <http://self.logger.info|self.logger.info>("The exception is reached")
                raise exception
    
    class NextTask(BaseTask):
    
        def _process(self):
            x = list(range(5))
            x[2] = "test" # Cause an error
            for i in x:
                <http://self.logger.info|self.logger.info>(f"Currently processing {i}")
                i = i + 1
            return
    
    next_task = NextTask()
    
    with Flow('test') as flow:
        next_task()
    
    flow.run()
    You’re not mapping or parallelizing anything right? Are you using Dask?
    Joël Luijmes

    Joël Luijmes

    1 year ago
    Nope, nothing of that kind. Haven’t configured an executor at all. I’m running on Kubernetes. I think maybe Selenium may interfer or something. is going wrong within backoff (retry library which i hadn’t used before). But at least I can rule out I’m not going crazy and the exception should make the code stop 😛
    Thanks for trying to reproduce, I guess I’m off to dive more into debugging. To see if selenium has any influence or backoff
    Kevin Kho

    Kevin Kho

    1 year ago
    I see….sorry about the initial confusion there.
    Joël Luijmes

    Joël Luijmes

    1 year ago
    That’s okay, thanks for thinking with me 😉 Myabe I’ll find it out and i can report it back