Joël Luijmes
05/20/2021, 7:26 AMJoël Luijmes
05/20/2021, 7:26 AM20 May 2021,05:47:40 prefect.WebsiteScraper ERROR Error occurred while scraping <https://website.com>
Traceback (most recent call last):
File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 44, in run
self._scrape()
File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 57, in _scrape
File "/Users/joell/joell.dev/prefect-joell/env/lib/python3.8/site-packages/backoff/_sync.py", line 94, in retry
File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 136, in _parse_products
File "/usr/local/lib/python3.8/site-packages/selenium/webdriver/support/wait.py", line 80, in until
raise TimeoutException(message, screen, stacktrace)
selenium.common.exceptions.TimeoutException: Message:
20 May 2021,05:47:41 prefect.WebsiteScraper INFO Saved dump to local folder
20 May 2021,05:47:41 prefect.CloudTaskRunner ERROR Unexpected error: TimeoutException('', None, None)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 56, in run
raise exception
File "/prefect-joell/src/modules/scrapers/base_scraper.py", line 44, in run
self._scrape()
File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 57, in _scrape
File "/Users/joell/joell.dev/prefect-joell/env/lib/python3.8/site-packages/backoff/_sync.py", line 94, in retry
File "/Users/joell/joell.dev/prefect-joell/src/flows/scrapers/website.py", line 136, in _parse_products
File "/usr/local/lib/python3.8/site-packages/selenium/webdriver/support/wait.py", line 80, in until
raise TimeoutException(message, screen, stacktrace)
selenium.common.exceptions.TimeoutException: Message:
20 May 2021,05:47:41 prefect.CloudTaskRunner INFO Task 'WebsiteScraper': Finished task run for task with final state: 'Failed'
20 May 2021,05:47:41 prefect.CloudFlowRunner INFO Flow run FAILED: some reference tasks failed.
20 May 2021,05:47:53 prefect.CloudFlowRunner WARNING Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
20 May 2021,05:48:04 prefect.WebsiteScraper INFO No cookie wall
20 May 2021,05:48:08 prefect.CloudFlowRunner WARNING Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
20 May 2021,05:48:08 prefect.WebsiteScraper INFO Category/?start=1416&sz=24 scraped 24 products (1439 total), page 60/123
20 May 2021,05:48:23 prefect.CloudFlowRunner WARNING Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
20 May 2021,05:48:23 prefect.WebsiteScraper INFO Category/?start=1440&sz=24 scraped 24 products (1463 total), page 61/123
20 May 2021,05:48:35 prefect.WebsiteScraper INFO Category/?start=1464&sz=24 scraped 24 products (1487 total), page 62/123
Joël Luijmes
05/20/2021, 7:27 AM# Base Task
class BaseScraperTask(Task):
# Init code ...
def run(self):
# Init code ...
try:
self._scrape()
except Exception as exception:
# Exception logging ...
# NOTE: this log line
<http://self.logger.info|self.logger.info>("Saved dump to local folder")
# Reraise to notify Prefect of failure
raise exception
@abc.abstractmethod
def _scrape(self):
raise RuntimeError("Not implemented")
# Derived
class WebsiteScraperTask(BaseScraperTask):
# Init code ...
def _scrape(self):
# Code ...
for page in range(1, page_count + 1):
# Code ...
products = self._parse_products(page, current_position)
# NOTE: this log line
<http://self.logger.info|self.logger.info>(
f"{'/'.join(url.split('/')[3:])} scraped {len(products)} products ({current_position} total), page {page}/{page_count}"
)
self._navigate_next_page()
Joël Luijmes
05/20/2021, 7:27 AMSaved dump to local folder
is printed, after which it continues to operate and scrape more pages. Even though, the exception should be reraisedKevin Kho
prefect.engine.signals.FAIL
. You can raise FAIL
and this will stop the flow.Joël Luijmes
05/20/2021, 12:42 PMJoël Luijmes
05/20/2021, 12:43 PMJoël Luijmes
05/20/2021, 12:43 PMKevin Kho
try-except
kind of logic defeats the things Prefect provides. In this specific case, we have state_handlers
that are functions that execute upon failure of the task. You would log out there as opposed to the except block.Kevin Kho
Kevin Kho
Joël Luijmes
05/20/2021, 12:50 PMJoël Luijmes
05/20/2021, 12:52 PMKevin Kho
Joël Luijmes
05/20/2021, 12:55 PMKevin Kho
Kevin Kho
Joël Luijmes
05/20/2021, 1:03 PMKevin Kho
Kevin Kho
from prefect import Task, Flow
from abc import abstractmethod
class BaseTask(Task):
@abstractmethod
def _process(self):
raise RuntimeError("Not implemented")
def run(self):
try:
self._process()
except Exception as exception:
<http://self.logger.info|self.logger.info>("The exception is reached")
raise exception
class NextTask(BaseTask):
def _process(self):
x = list(range(5))
x[2] = "test" # Cause an error
for i in x:
<http://self.logger.info|self.logger.info>(f"Currently processing {i}")
i = i + 1
return
next_task = NextTask()
with Flow('test') as flow:
next_task()
flow.run()
Kevin Kho
Joël Luijmes
05/20/2021, 1:28 PMJoël Luijmes
05/20/2021, 1:29 PMKevin Kho
Joël Luijmes
05/20/2021, 1:33 PM