Joël Luijmes
05/20/2021, 7:26 AMJoël Luijmes
05/20/2021, 7:26 AM20 May 2021,05:47:40 	prefect.WebsiteScraper	ERROR	Error occurred while scraping <https://website.com>
Traceback (most recent call last):
  File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 44, in run
    self._scrape()
  File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 57, in _scrape
  File "/Users/joell/joell.dev/prefect-joell/env/lib/python3.8/site-packages/backoff/_sync.py", line 94, in retry
  File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 136, in _parse_products
  File "/usr/local/lib/python3.8/site-packages/selenium/webdriver/support/wait.py", line 80, in until
    raise TimeoutException(message, screen, stacktrace)
selenium.common.exceptions.TimeoutException: Message: 
20 May 2021,05:47:41 	prefect.WebsiteScraper	INFO	Saved dump to local folder
20 May 2021,05:47:41 	prefect.CloudTaskRunner	ERROR	Unexpected error: TimeoutException('', None, None)
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 56, in run
    raise exception
  File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 44, in run
    self._scrape()
  File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 57, in _scrape
  File "/Users/joell/joell.dev/prefect-joell/env/lib/python3.8/site-packages/backoff/_sync.py", line 94, in retry
  File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 136, in _parse_products
  File "/usr/local/lib/python3.8/site-packages/selenium/webdriver/support/wait.py", line 80, in until
    raise TimeoutException(message, screen, stacktrace)
selenium.common.exceptions.TimeoutException: Message: 
20 May 2021,05:47:41 	prefect.CloudTaskRunner	INFO	Task 'WebsiteScraper': Finished task run for task with final state: 'Failed'
20 May 2021,05:47:41 	prefect.CloudFlowRunner	INFO	Flow run FAILED: some reference tasks failed.
20 May 2021,05:47:53 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
20 May 2021,05:48:04 	prefect.WebsiteScraper	INFO	No cookie wall
20 May 2021,05:48:08 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
20 May 2021,05:48:08 	prefect.WebsiteScraper	INFO	Category/?start=1416&sz=24 scraped 24 products (1439 total), page 60/123
20 May 2021,05:48:23 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
20 May 2021,05:48:23 	prefect.WebsiteScraper	INFO	Category/?start=1440&sz=24 scraped 24 products (1463 total), page 61/123
20 May 2021,05:48:35 	prefect.WebsiteScraper	INFO	Category/?start=1464&sz=24 scraped 24 products (1487 total), page 62/123Joël Luijmes
05/20/2021, 7:27 AM# Base Task
class BaseScraperTask(Task):
    # Init code ...
    def run(self):
        # Init code ...
        try:
            self._scrape()
        except Exception as exception:
            # Exception logging ...
            # NOTE: this log line
            <http://self.logger.info|self.logger.info>("Saved dump to local folder")
            
            # Reraise to notify Prefect of failure
            raise exception
        
    @abc.abstractmethod
    def _scrape(self):
        raise RuntimeError("Not implemented")
# Derived
class WebsiteScraperTask(BaseScraperTask):
    # Init code ... 
    def _scrape(self):
        # Code ...
        for page in range(1, page_count + 1):
            # Code ...
            
            products = self._parse_products(page, current_position)
            
            # NOTE: this log line
            <http://self.logger.info|self.logger.info>(
                f"{'/'.join(url.split('/')[3:])} scraped {len(products)} products ({current_position} total), page {page}/{page_count}"
            )
            self._navigate_next_page()Joël Luijmes
05/20/2021, 7:27 AMSaved dump to local folderKevin Kho
prefect.engine.signals.FAILraise FAILJoël Luijmes
05/20/2021, 12:42 PMJoël Luijmes
05/20/2021, 12:43 PMJoël Luijmes
05/20/2021, 12:43 PMKevin Kho
try-exceptstate_handlersKevin Kho
Kevin Kho
Joël Luijmes
05/20/2021, 12:50 PMJoël Luijmes
05/20/2021, 12:52 PMKevin Kho
Joël Luijmes
05/20/2021, 12:55 PMKevin Kho
Kevin Kho
Joël Luijmes
05/20/2021, 1:03 PMKevin Kho
Kevin Kho
from prefect import Task, Flow
from abc import abstractmethod
class BaseTask(Task):
    @abstractmethod
    def _process(self):
        raise RuntimeError("Not implemented")
    def run(self):
        try:
            self._process()
        except Exception as exception:
            <http://self.logger.info|self.logger.info>("The exception is reached")
            raise exception
class NextTask(BaseTask):
    def _process(self):
        x = list(range(5))
        x[2] = "test" # Cause an error
        for i in x:
            <http://self.logger.info|self.logger.info>(f"Currently processing {i}")
            i = i + 1
        return
next_task = NextTask()
with Flow('test') as flow:
    next_task()
flow.run()Kevin Kho
Joël Luijmes
05/20/2021, 1:28 PMJoël Luijmes
05/20/2021, 1:29 PMKevin Kho
Joël Luijmes
05/20/2021, 1:33 PM