From 037998bf6731b37f6205299b5f64fd2aef5c0d44 Mon Sep 17 00:00:00 2001 From: lhaarman Date: Tue, 19 May 2026 14:26:14 +0000 Subject: [PATCH 1/3] Add configuration for skippable domains such as social media --- config/config_template.yaml | 1 + src/crawl/HesitantCrawler.py | 11 ++++++++++- src/scrape/__init__.py | 7 ++++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/config/config_template.yaml b/config/config_template.yaml index c53d1fe..c414ac6 100644 --- a/config/config_template.yaml +++ b/config/config_template.yaml @@ -14,6 +14,7 @@ requests: input: input_dir: ../input input_files: + skip_domains: urls: urls.txt keywords: keywords.txt url_max: 100 diff --git a/src/crawl/HesitantCrawler.py b/src/crawl/HesitantCrawler.py index 74a9de4..7ae3d74 100644 --- a/src/crawl/HesitantCrawler.py +++ b/src/crawl/HesitantCrawler.py @@ -21,7 +21,8 @@ def __init__( fetcher: HTMLFetcher, target_keywords: List[str], add_sitemapurls: bool = False, - max_depth: int = 1): + max_depth: int = 1, + skip_domains: List[str] = []): """ Depth-limited Search Targeted Crawler Crawler class for obtaining urls from start_url. @@ -64,6 +65,10 @@ def __init__( self.target_keywords = target_keywords logging.info(f"The targeted crawl will look for given keywords: {', '.join(self.target_keywords)}") + # Skip domains + self.skip_domains = skip_domains + logging.info(f"The targeted crawl will skip domains: {', '.join(self.skip_domains)}") + # Excluded URLs which contain: self._unsupported = ( ".ics", ".mng", ".pct", ".bmp", ".gif", ".jpg", ".jpeg", ".png", ".pst", ".psp", ".tif", ".tiff", ".drw", ".dxf", ".eps", @@ -91,6 +96,10 @@ def skip_this_url(self, url: str) -> bool: # prevent duplicate crawl from trailing forward slash in URL url = url.rstrip('/') if url.endswith('/') else url + if any([skip_domain in url for skip_domain in self.skip_domains]): + logging.debug(f"Skip {url}, because domain is in skip-list") + return True # skip + # Do not revisit pages if url in self._visited: logging.debug(f"Skip {url}, because we have visited it before") diff --git a/src/scrape/__init__.py b/src/scrape/__init__.py index ded790d..4ebab41 100644 --- a/src/scrape/__init__.py +++ b/src/scrape/__init__.py @@ -16,12 +16,17 @@ def build_webfocusedscraper(user_agent: str) -> IScraper: with open(f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.keywords}", 'r', encoding='utf-8') as file_in: target_keywords = [line.rstrip() for line in file_in] + with open(f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.skip_domains}", 'r', encoding='utf-8') as file_in: + skip_domains = [line.rstrip() for line in file_in] + fetcher = HTMLFetcher(user_agent=user_agent) crawler = HesitantCrawler( fetcher=fetcher, target_keywords=target_keywords, add_sitemapurls=CONFIG.crawl.use_sitemap, - max_depth=CONFIG.crawl.max_depth) + max_depth=CONFIG.crawl.max_depth, + skip_domains=skip_domains + ) htmlparser = HTMLBodyParser() return Scraper( From cab73b4427708a16e0c4c20d8b7948a5bdc60cc5 Mon Sep 17 00:00:00 2001 From: lhaarman Date: Thu, 21 May 2026 11:59:09 +0000 Subject: [PATCH 2/3] Basic setup scrapy with multiprocessing; TODO testing --- src/crawl/HesitantCrawler.py | 3 + src/crawl/__init__.py | 3 +- src/crawl/base.py | 3 +- src/crawl/scrapymodules/HesitantSpider.py | 141 ++++++++++++++++++ .../scrapymodules/ScrapyCrawlMiddleware.py | 27 ++++ src/crawl/scrapymodules/ScrapyResult.py | 7 + src/crawl/scrapymodules/__init__.py | 3 + src/main.py | 1 - src/main_scrapy.py | 118 +++++++++++++++ src/util/__init__.py | 3 +- src/util/urls.py | 35 +++++ 11 files changed, 340 insertions(+), 4 deletions(-) create mode 100644 src/crawl/scrapymodules/HesitantSpider.py create mode 100644 src/crawl/scrapymodules/ScrapyCrawlMiddleware.py create mode 100644 src/crawl/scrapymodules/ScrapyResult.py create mode 100644 src/crawl/scrapymodules/__init__.py create mode 100644 src/main_scrapy.py create mode 100644 src/util/urls.py diff --git a/src/crawl/HesitantCrawler.py b/src/crawl/HesitantCrawler.py index 7ae3d74..2703494 100644 --- a/src/crawl/HesitantCrawler.py +++ b/src/crawl/HesitantCrawler.py @@ -96,6 +96,9 @@ def skip_this_url(self, url: str) -> bool: # prevent duplicate crawl from trailing forward slash in URL url = url.rstrip('/') if url.endswith('/') else url + # prevent duplicate crawl from '#' such as '#content', '#main', etc. + url = url.rstrip("#") if url.contains("#") else url + if any([skip_domain in url for skip_domain in self.skip_domains]): logging.debug(f"Skip {url}, because domain is in skip-list") return True # skip diff --git a/src/crawl/__init__.py b/src/crawl/__init__.py index 77999db..6fe4b71 100644 --- a/src/crawl/__init__.py +++ b/src/crawl/__init__.py @@ -1,2 +1,3 @@ from .base import ICrawler, NoCrawler, BaseCrawler, CrawlResult -from .HesitantCrawler import HesitantCrawler \ No newline at end of file +from .HesitantCrawler import HesitantCrawler +from .scrapymodules import ScrapyResult \ No newline at end of file diff --git a/src/crawl/base.py b/src/crawl/base.py index 36b51ca..1a9f576 100644 --- a/src/crawl/base.py +++ b/src/crawl/base.py @@ -2,7 +2,7 @@ from typing import NamedTuple, List import logging from urllib.parse import urlparse - +from scrapy.http import Response from fetch import IFetcher @@ -11,6 +11,7 @@ class CrawlResult(NamedTuple): source: str targeted: bool = None first_keyword_hit: str = None + crawl_depth: int = 0 class ICrawler(ABC): diff --git a/src/crawl/scrapymodules/HesitantSpider.py b/src/crawl/scrapymodules/HesitantSpider.py new file mode 100644 index 0000000..e535647 --- /dev/null +++ b/src/crawl/scrapymodules/HesitantSpider.py @@ -0,0 +1,141 @@ +from typing import List +import scrapy +import validators +from urllib.parse import urlparse, urljoin +import logging +import re +from .ScrapyResult import ScrapyResult + +class HesitantSpider(scrapy.Spider): + name = "hesitant-spider" + + # Define custom settings as a class attribute + custom_settings = { + "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "AUTOTHROTTLE_ENABLED": True, # Auto throttle to maximize speed without risking blocks + "AUTOTHROTTLE_START_DELAY": 1.0, # Start slow to "warm up" + "AUTOTHROTTLE_MAX_DELAY": 10.0, # Never wait more than 10s + "AUTOTHROTTLE_TARGET_CONCURRENCY": 1.0, # Aim for 1 request per worker at a time + "DOWNLOAD_DELAY": 0, # Let Autothrottle handle the delay + } + + def __init__( + self, + start_urls: str, + target_keywords: List[str] = [], + add_sitemap_urls: bool = False, + max_depth: int = 1, + skip_domains: List[str] = [], + *args, **kwargs + ): + super(HesitantSpider, self).__init__(*args, **kwargs) + + self.start_urls = start_urls + self.logger.debug(f"Init start_urls: {self.start_urls}") + self.max_depth = max_depth + self.logger.debug(f"Init max depth: {self.max_depth}") + self.skip_domains = skip_domains + self.logger.debug(f"Init skip domains: {self.skip_domains}") + self.target_keywords = target_keywords + self.logger.debug(f"Init target keywords: {self.target_keywords}") + + self._unsupported = ( + ".ics", ".mng", ".pct", ".bmp", ".gif", ".jpg", ".jpeg", ".png", ".pst", ".psp", ".tif", ".tiff", ".drw", ".dxf", ".eps", + ".woff2", ".svg", ".mp3", ".wma", ".ogg", ".wav", ".ra", ".aac", ".mid", ".aiff", ".3gp", ".asf", ".asx", ".avi", ".mp4", + ".woff", ".mpg", ".qt", ".rm", ".swf", ".wmv", ".m4a", ".css", ".pdf", ".doc", ".docx", ".exe", ".bin", ".rss", ".zip", + ".rar", ".msu", ".flv", ".dmg", ".xls", ".xlsx", ".ico", ".mng?download=true", ".pct?download=true", ".bmp?download=true", + ".gif?download=true", ".jpg?download=true", ".jpeg?download=true", ".png?download=true", ".pst?download=true", + ".psp?download=true", ".tif?download=true", ".tiff?download=true", ".ai?download=true", ".drw?download=true", + ".dxf?download=true", ".eps?download=true", ".ps?download=true", ".svg?download=true", ".mp3?download=true", + ".wma?download=true", ".ogg?download=true", ".wav?download=true", ".ra?download=true", ".aac?download=true", + ".mid?download=true", ".au?download=true", ".aiff?download=true", ".3gp?download=true", ".asf?download=true", + ".asx?download=true", ".avi?download=true", ".mov?download=true", ".mp4?download=true", ".mpg?download=true", + ".qt?download=true", ".rm?download=true", ".swf?download=true", ".wmv?download=true", ".m4a?download=true", + ".css?download=true", ".pdf?download=true", ".doc?download=true", ".exe?download=true", ".bin?download=true", + ".rss?download=true", ".zip?download=true", ".rar?download=true", ".msu?download=true", ".flv?download=true", + ".dmg?download=true") + self.logger.debug(f"URLs will be excluded if they contain any in path:{', '.join(self._unsupported)}") + + self.results = [] + self.visited = set() + + if max_depth < 0: + self.logger.debug("Only urls from starting_url can be found, max_depth < 0") + + def url_is_target(self, url: str) -> bool: + for keyword in self.target_keywords: + first_keyword_hit = re.search(keyword, url) + if first_keyword_hit is not None: + return True + + def skip_this_url(self, url: str) -> bool: + """Function to see if we have already visited url""" + + if not validators.url(url): + return True + + if any(ext in url for ext in self._unsupported): + self.logger.debug(f"Skip {url}, because extension is unsupported") + return True + + # prevent duplicate crawl from trailing forward slash in URL + url = url.rstrip('/') if url.endswith('/') else url + + # prevent duplicate crawl from '#' such as '#content', '#main', etc. + url = url.rstrip("#") if "#" in url else url + + if any([skip_domain in url for skip_domain in self.skip_domains]): + self.logger.debug(f"Skip {url}, because domain is in skip-list") + return True # skip + + # Do not revisit pages + if url in self.visited: + self.logger.debug(f"Skip {url}, because we have visited it before") + return True # skip + return False + + async def start(self): + for start_url in self.start_urls: + yield scrapy.Request(url=start_url, callback=self.parse, meta={"depth": 0}) + + def parse(self, response): + self.logger.debug(f"Parsing url: {response.url}") + self.visited.add(response.url) + + yield {"url": response.url, "html":response.text[:10]} + current_depth = response.meta.get("depth", 0) + if not self.url_is_target(response.url) and current_depth >= self.max_depth: + return + + # Process the current page + if self.url_is_target(response.url): + # Add results + self.results.append( + ScrapyResult( + url=response.url, + status=response.status, + text=response.text[:1], + crawl_depth=current_depth + ) + ) + + # Reset current depth because we found target at current page + current_depth = 0 + + # Extract and follow links + for link in response.css("a::attr(href)").getall(): + url = urljoin(response.url, link) + + # Keep crawling restricted to the start domain and avoid skipped domains + if self.skip_this_url(url): + continue + + yield scrapy.Request( + url=url, + callback=self.parse, + meta={"depth": current_depth + 1} + ) + + def closed(self, reason): + """Optional: Scrapy built-in method called when the spider finishes""" + print(f"Spider closed because of: {reason}. Total collected pages: {len(self.results)}") \ No newline at end of file diff --git a/src/crawl/scrapymodules/ScrapyCrawlMiddleware.py b/src/crawl/scrapymodules/ScrapyCrawlMiddleware.py new file mode 100644 index 0000000..7ce42ea --- /dev/null +++ b/src/crawl/scrapymodules/ScrapyCrawlMiddleware.py @@ -0,0 +1,27 @@ +import logging +from scrapy.exceptions import IgnoreRequest + +exceptions = [ + ".txt", + ".xml", + ".rss" +] + + +class TextTypeFilterMiddleware: + """ + Drops any response that isn't HTML or XHTML. + """ + def process_response(self, request, response, spider): + if any([response.url.endswith(exception) for exception in exceptions]): + logging.debug(f"Making exception bypass for url: {response.url}") + return response + content_type = response.headers.get('Content-Type', b'').decode('utf-8').lower() + + # Only allow HTML-based content + if 'text/html' not in content_type and 'application/xhtml+xml' not in content_type and 'application/xml' not in content_type: + logging.info(f"\t\tTextTypeFilterMiddleware: Skipping non-text content: {response.url} ({content_type})") + # Returning None tells Scrapy to drop this response entirely + raise IgnoreRequest("Not Text type response, ignore request") + + return response diff --git a/src/crawl/scrapymodules/ScrapyResult.py b/src/crawl/scrapymodules/ScrapyResult.py new file mode 100644 index 0000000..f1be36b --- /dev/null +++ b/src/crawl/scrapymodules/ScrapyResult.py @@ -0,0 +1,7 @@ +from typing import NamedTuple + +class ScrapyResult(NamedTuple): + url: str + status: str + text: str + crawl_depth: int = 0 diff --git a/src/crawl/scrapymodules/__init__.py b/src/crawl/scrapymodules/__init__.py new file mode 100644 index 0000000..12459fc --- /dev/null +++ b/src/crawl/scrapymodules/__init__.py @@ -0,0 +1,3 @@ +from .HesitantSpider import HesitantSpider +from .ScrapyResult import ScrapyResult +from .ScrapyCrawlMiddleware import TextTypeFilterMiddleware \ No newline at end of file diff --git a/src/main.py b/src/main.py index e7a9b4f..27c7fb3 100644 --- a/src/main.py +++ b/src/main.py @@ -45,4 +45,3 @@ def main(): # CONFIG = setup("../config/config.yaml") # df = pd.read_parquet(f"{CONFIG.output.output_dir}/20260304_080625", engine="pyarrow") # print(df.head()) - diff --git a/src/main_scrapy.py b/src/main_scrapy.py new file mode 100644 index 0000000..a154eb9 --- /dev/null +++ b/src/main_scrapy.py @@ -0,0 +1,118 @@ +import os +import logging +import numpy as np +import multiprocessing +import sys +from datetime import datetime + +from scrapy.crawler import CrawlerProcess + +from util import setup, normalize_url +from crawl.scrapymodules import HesitantSpider + +CONFIG = setup("config/config.yaml") + + +def spawn_spider_process(urls, keywords, skip_domains, process_id, log_level, logfile): + print(f"Args: urls: {urls}, keywords: {keywords}, skip_domains: {skip_domains}, process_id: {process_id} ") + print(f"Starting crawling process (PID: {process_id}, OSPID: {os.getpid()}) for {urls}!") + project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) + if project_root not in sys.path: + sys.path.insert(0, project_root) + + process = CrawlerProcess( + settings={ + "ROBOTSTXT_OBEY": True, + "LOG_LEVEL": "INFO", + "LOG_FILE": logfile, + "DOWNLOADER_MIDDLEWARES": { + "src.crawl.scrapymodules.ScrapyCrawlMiddleware.TextTypeFilterMiddleware": 543 # High priority + }, + "DOWNLOAD_CONTENT_TYPES": ["text/html", "application/xhtml+xml"] + } + ) + + root_logger = logging.getLogger() + root_logger.setLevel(log_level) + root_logger.handlers = [] + + fileHandler = logging.FileHandler(logfile) + fileHandler.setLevel(log_level) + root_logger.addHandler(fileHandler) + + # Remove console output + # We get the logger that Scrapy uses and remove all handlers that print to the console + scrapy_logger = logging.getLogger('scrapy') + for handler in scrapy_logger.handlers[:]: + scrapy_logger.removeHandler(handler) + + # (Optional) If you want to be extremely thorough, silence the engine too + logging.getLogger('twisted').handlers = [] + + spiderCrawler = process.create_crawler(HesitantSpider) + + process.crawl( + spiderCrawler, + start_urls=urls, + max_depth=1, + target_keywords=keywords, + skip_domains=skip_domains + ) + + if len(urls) == 0: + return [] + + try: + process.start() + except Exception as e: + print(f"Got here! Error {e}") + + if spiderCrawler.spider is not None: + print(f"Returning results of length for PID {process_id}: {len(spiderCrawler.spider.results)}") + return spiderCrawler.spider.results + + +if __name__ == "__main__": + + # Input URLs + file_urls = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.urls}" + logging.info(f"Reading list of base-urls from file: {file_urls}") + with open(file_urls, 'r', encoding='utf-8') as file_in: + urls = [line.rstrip() for line in file_in] + + # Normalize URLs + urls = [*map(normalize_url, urls)] + + # Keywords + file_keywords = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.keywords}" + logging.info(f"Reading list of keywords from file: {file_keywords}") + with open(file_keywords, 'r', encoding='utf-8') as file_in: + target_keywords = [line.rstrip() for line in file_in] + + # Skip domains + file_skip_domains = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.skip_domains}" + logging.info(f"Reading list of skip_domains from file: {file_skip_domains}") + with open(file_skip_domains, 'r', encoding='utf-8') as file_in: + skip_domains = [line.rstrip() for line in file_in] + + num_workers = min([len(urls), 6]) + batch_size = len(urls) // num_workers if len(urls) > num_workers else 1 + url_chunks = np.array_split(urls, num_workers) + + chunked_args = [] + + logfile = f"output/logs/log_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log" + + for i in range(0, num_workers): + chunked_args.append( + (url_chunks[i], target_keywords, skip_domains, i, logging.INFO, logfile) + ) + + print("# Workers:", num_workers) + + print("# Cores available:", multiprocessing.cpu_count()) + with multiprocessing.Pool(processes=num_workers) as pool: + results = sum(pool.starmap(spawn_spider_process, chunked_args), []) + + print("Results:", results) + print("#Results:", len(results)) diff --git a/src/util/__init__.py b/src/util/__init__.py index 46cd90e..bb2a8cc 100644 --- a/src/util/__init__.py +++ b/src/util/__init__.py @@ -1 +1,2 @@ -from .setup import setup \ No newline at end of file +from .setup import setup +from .urls import normalize_url \ No newline at end of file diff --git a/src/util/urls.py b/src/util/urls.py new file mode 100644 index 0000000..62cb112 --- /dev/null +++ b/src/util/urls.py @@ -0,0 +1,35 @@ +from urllib.parse import urlparse, urlunparse +import re + + +# Normalize URL to make sure crawler can handle it without issue +def normalize_url(url): + # Handle case where there is no scheme at all + if not re.match(r'^[a-zA-Z]+://', url): + url = 'https://' + url + + parsed = urlparse(url) + + # 2. Force HTTPS + scheme = 'https' + + # 3. Handle the domain (netloc) + netloc = parsed.netloc.lower() + + # Remove existing 'www.' to re-add cleanly + if netloc.startswith('www.'): + netloc = netloc[4:] + + netloc = 'www.' + netloc + + # Reconstruct URL + new_url = urlunparse(( + scheme, + netloc, + parsed.path, + parsed.params, + parsed.query, + parsed.fragment + )) + + return new_url From 2c65ebe78659184e084844d07a7c5b3dbf2020b6 Mon Sep 17 00:00:00 2001 From: lhaarman Date: Thu, 28 May 2026 08:53:49 +0000 Subject: [PATCH 3/3] temp commit; saving result --- src/analysis/analyze_results.py | 2 + src/crawl/HesitantCrawler.py | 4 +- src/crawl/scrapymodules/HesitantSpider.py | 161 ++++++++++++++++++---- src/crawl/scrapymodules/ScrapyResult.py | 6 +- src/main.py | 5 + src/main_scrapy.py | 45 ++++-- src/parse/HTML.py | 2 +- 7 files changed, 183 insertions(+), 42 deletions(-) diff --git a/src/analysis/analyze_results.py b/src/analysis/analyze_results.py index fb52cb1..62b5843 100644 --- a/src/analysis/analyze_results.py +++ b/src/analysis/analyze_results.py @@ -137,6 +137,8 @@ def __iter__(self): logging.debug(f"Total number of base-urls with scraped content: {len(get_baseurls(df=total))}.") logging.debug(f"Total number of pages downloaded: {total.shape[0]}.") + dfs.to_parquet("output/output.parquet") + gr = total.groupby(by='base_url', as_index=False)['url'].count() gr = gr.rename(columns={'url': 'pages', 'base_url': 'count'}) gr = gr.groupby(by='pages', as_index=False).count() diff --git a/src/crawl/HesitantCrawler.py b/src/crawl/HesitantCrawler.py index 2703494..9d07853 100644 --- a/src/crawl/HesitantCrawler.py +++ b/src/crawl/HesitantCrawler.py @@ -97,11 +97,11 @@ def skip_this_url(self, url: str) -> bool: url = url.rstrip('/') if url.endswith('/') else url # prevent duplicate crawl from '#' such as '#content', '#main', etc. - url = url.rstrip("#") if url.contains("#") else url + url = url.rstrip("#") if "#" in url else url if any([skip_domain in url for skip_domain in self.skip_domains]): logging.debug(f"Skip {url}, because domain is in skip-list") - return True # skip + return True # skip # Do not revisit pages if url in self._visited: diff --git a/src/crawl/scrapymodules/HesitantSpider.py b/src/crawl/scrapymodules/HesitantSpider.py index e535647..e7b35c5 100644 --- a/src/crawl/scrapymodules/HesitantSpider.py +++ b/src/crawl/scrapymodules/HesitantSpider.py @@ -1,18 +1,20 @@ from typing import List import scrapy import validators -from urllib.parse import urlparse, urljoin -import logging +from urllib.parse import urljoin, urlparse import re +import pandas as pd from .ScrapyResult import ScrapyResult +from parse import HTMLBodyParser + class HesitantSpider(scrapy.Spider): name = "hesitant-spider" - + # Define custom settings as a class attribute custom_settings = { "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", - "AUTOTHROTTLE_ENABLED": True, # Auto throttle to maximize speed without risking blocks + "AUTOTHROTTLE_ENABLED": True, # Auto throttle to maximize speed without risking blocks "AUTOTHROTTLE_START_DELAY": 1.0, # Start slow to "warm up" "AUTOTHROTTLE_MAX_DELAY": 10.0, # Never wait more than 10s "AUTOTHROTTLE_TARGET_CONCURRENCY": 1.0, # Aim for 1 request per worker at a time @@ -26,19 +28,37 @@ def __init__( add_sitemap_urls: bool = False, max_depth: int = 1, skip_domains: List[str] = [], + skip_paths: List[str] = [], + allowed_top_level_domains: List[str] = [".com"], + batch_size: int = 100, + output_file: str = "output.parquet", + max_jumps: int = 1, *args, **kwargs ): super(HesitantSpider, self).__init__(*args, **kwargs) - + self.start_urls = start_urls self.logger.debug(f"Init start_urls: {self.start_urls}") self.max_depth = max_depth self.logger.debug(f"Init max depth: {self.max_depth}") self.skip_domains = skip_domains self.logger.debug(f"Init skip domains: {self.skip_domains}") + self.skip_paths = skip_paths + self.logger.debug(f"Init skip domains: {self.skip_paths}") + self.allowed_top_level_domains = allowed_top_level_domains + self.logger.debug(f"Init allowed_top_level_domains: {self.allowed_top_level_domains}") self.target_keywords = target_keywords self.logger.debug(f"Init target keywords: {self.target_keywords}") + self.batch_size = batch_size + self.batch_counter = 0 + self.logger.debug(f"Init batch_size: {self.batch_size}") + self.max_jumps = max_jumps + + self.output_file = output_file + self.logger.debug(f"Init output file: {self.output_file}") + + self._htmlparser = HTMLBodyParser() self._unsupported = ( ".ics", ".mng", ".pct", ".bmp", ".gif", ".jpg", ".jpeg", ".png", ".pst", ".psp", ".tif", ".tiff", ".drw", ".dxf", ".eps", ".woff2", ".svg", ".mp3", ".wma", ".ogg", ".wav", ".ra", ".aac", ".mid", ".aiff", ".3gp", ".asf", ".asx", ".avi", ".mp4", @@ -55,73 +75,149 @@ def __init__( ".rss?download=true", ".zip?download=true", ".rar?download=true", ".msu?download=true", ".flv?download=true", ".dmg?download=true") self.logger.debug(f"URLs will be excluded if they contain any in path:{', '.join(self._unsupported)}") - + + self.batch = [] self.results = [] self.visited = set() - + if max_depth < 0: self.logger.debug("Only urls from starting_url can be found, max_depth < 0") + async def start(self): + for start_url in self.start_urls: + yield scrapy.Request( + url=start_url, + callback=self.parse, + meta={ + "base_url": start_url, + "current_start": start_url, + "depth": 0, + "jumps": 0 + } + ) + + def save_batch(self): + if len(self.batch) == 0: + self.logger.debug("Tried to save batch without any results..") + return + + df = pd.DataFrame({ + "base_url": [res.base_url for res in self.batch], + "url": [res.url for res in self.batch], + "first_keyword_hit": [res.first_keyword_hit for res in self.batch], + "content": [res.content for res in self.batch], + "crawl_depth": [res.crawl_depth for res in self.batch], + "schema_indicator": [res.schema_indicator for res in self.batch] # TODO now always false + }) + + df.to_parquet( + self.output_file.replace(".parquet", f"_{self.batch_counter}.parquet") # TODO name + ) + + self.batch_counter += 1 + + self.batch = [] + self.logger.debug("Saved batch to parquet") + def url_is_target(self, url: str) -> bool: + parsed_url = urlparse(url).path for keyword in self.target_keywords: - first_keyword_hit = re.search(keyword, url) + first_keyword_hit = re.search(keyword, parsed_url) if first_keyword_hit is not None: - return True + self.logger.debug(f"For {url} keyword hit: {first_keyword_hit.group(0)}") + return True, first_keyword_hit.group(0) + + return False, None def skip_this_url(self, url: str) -> bool: """Function to see if we have already visited url""" + # Do not revisit pages + if url in self.visited: + self.logger.debug(f"Skip {url}, because we have visited it before") + return True # skip + + # Only visit valid urls if not validators.url(url): return True + # Only visit pages with supported extensions if any(ext in url for ext in self._unsupported): self.logger.debug(f"Skip {url}, because extension is unsupported") return True + # Only visit pages on allowed top-level domains + url_netloc = urlparse(url).netloc.lower() + + if not any([url_netloc.endswith(toplevel_domain) for toplevel_domain in self.allowed_top_level_domains]): + self.logger.debug(f"Skip {url} with netloc {url_netloc}, because top-level domain is not in allowed list") + return True + # prevent duplicate crawl from trailing forward slash in URL url = url.rstrip('/') if url.endswith('/') else url # prevent duplicate crawl from '#' such as '#content', '#main', etc. url = url.rstrip("#") if "#" in url else url + # Skip domains on skip-list if any([skip_domain in url for skip_domain in self.skip_domains]): self.logger.debug(f"Skip {url}, because domain is in skip-list") return True # skip - # Do not revisit pages - if url in self.visited: - self.logger.debug(f"Skip {url}, because we have visited it before") - return True # skip + # skip pre-defined paths + for skip_path in self.skip_paths: + if any([path == skip_path for path in urlparse(url).path.split("/")]): + self.logger.debug(f"Skip {url} because path {urlparse(url).path} contains skip-path: {skip_path}") + return True + return False - async def start(self): - for start_url in self.start_urls: - yield scrapy.Request(url=start_url, callback=self.parse, meta={"depth": 0}) - def parse(self, response): - self.logger.debug(f"Parsing url: {response.url}") self.visited.add(response.url) - yield {"url": response.url, "html":response.text[:10]} current_depth = response.meta.get("depth", 0) - if not self.url_is_target(response.url) and current_depth >= self.max_depth: + + url_is_targeted, first_keyword_hit = self.url_is_target(response.url) + + if not url_is_targeted and current_depth >= self.max_depth: return + jumps = response.meta.get("jumps", 0) + + parsed_url = urlparse(response.url) + current_netloc = parsed_url.netloc.lower().rsplit(".", 1)[0] + meta_netloc = urlparse(response.meta.get("current_start")).netloc.lower().rsplit(".", 1)[0] + if current_netloc != meta_netloc: + self.logger.debug(f"Adding jump from {jumps} to {jumps + 1} going with base url: {meta_netloc} to {current_netloc}") + jumps += 1 + # TODO do not add jump if response is a HTTP 300 redirect + + if jumps > self.max_jumps: + self.logger.debug(f"Ending crawl path due to exceeding jumps ({jumps}/{self.max_jumps}) for {response.url}, base url: {response.meta.get("base_url")}") + return + + self.logger.debug(f"Parsing url: {response.url}, targeted: {url_is_targeted}, depth: {current_depth}, jumps: {jumps}") + # Process the current page - if self.url_is_target(response.url): + if url_is_targeted: # Add results - self.results.append( - ScrapyResult( + result = ScrapyResult( + base_url=str(response.meta.get("base_url")), url=response.url, status=response.status, - text=response.text[:1], + first_keyword_hit=first_keyword_hit, + content=self._htmlparser.parse(html=response.text), crawl_depth=current_depth ) - ) + self.batch.append(result) + self.results.append(result) + + if len(self.batch) >= self.batch_size: + self.save_batch() # Reset current depth because we found target at current page current_depth = 0 - + # Extract and follow links for link in response.css("a::attr(href)").getall(): url = urljoin(response.url, link) @@ -131,11 +227,16 @@ def parse(self, response): continue yield scrapy.Request( - url=url, - callback=self.parse, - meta={"depth": current_depth + 1} + url=url, + callback=self.parse, + meta={ + "base_url": response.meta.get("base_url"), + "current_start": f"{parsed_url.scheme}://{parsed_url.netloc}", + "depth": current_depth + 1, + "jumps": jumps + } ) def closed(self, reason): """Optional: Scrapy built-in method called when the spider finishes""" - print(f"Spider closed because of: {reason}. Total collected pages: {len(self.results)}") \ No newline at end of file + print(f"Spider closed because of: {reason}. Total collected pages: {len(self.batch)}") \ No newline at end of file diff --git a/src/crawl/scrapymodules/ScrapyResult.py b/src/crawl/scrapymodules/ScrapyResult.py index f1be36b..bba5c23 100644 --- a/src/crawl/scrapymodules/ScrapyResult.py +++ b/src/crawl/scrapymodules/ScrapyResult.py @@ -1,7 +1,11 @@ from typing import NamedTuple + class ScrapyResult(NamedTuple): + base_url: str url: str + first_keyword_hit: str status: str - text: str + content: str crawl_depth: int = 0 + schema_indicator: bool = False diff --git a/src/main.py b/src/main.py index 27c7fb3..ce2e3f4 100644 --- a/src/main.py +++ b/src/main.py @@ -37,10 +37,15 @@ def main(): logging.info("Config:") logging.info(OmegaConf.to_yaml(CONFIG)) + start_time = time.perf_counter() + main() logging.info("Exiting with no error") + end_time = time.perf_counter() + + print("Runtime: ", end_time - start_time) # # Read the output files by using the following syntax: # CONFIG = setup("../config/config.yaml") # df = pd.read_parquet(f"{CONFIG.output.output_dir}/20260304_080625", engine="pyarrow") diff --git a/src/main_scrapy.py b/src/main_scrapy.py index a154eb9..532b73b 100644 --- a/src/main_scrapy.py +++ b/src/main_scrapy.py @@ -4,6 +4,7 @@ import multiprocessing import sys from datetime import datetime +import time from scrapy.crawler import CrawlerProcess @@ -13,7 +14,7 @@ CONFIG = setup("config/config.yaml") -def spawn_spider_process(urls, keywords, skip_domains, process_id, log_level, logfile): +def spawn_spider_process(urls, keywords, skip_domains, process_id, log_level, logfile, output_file): print(f"Args: urls: {urls}, keywords: {keywords}, skip_domains: {skip_domains}, process_id: {process_id} ") print(f"Starting crawling process (PID: {process_id}, OSPID: {os.getpid()}) for {urls}!") project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) @@ -54,9 +55,18 @@ def spawn_spider_process(urls, keywords, skip_domains, process_id, log_level, lo process.crawl( spiderCrawler, start_urls=urls, - max_depth=1, + max_depth=2, target_keywords=keywords, - skip_domains=skip_domains + skip_domains=skip_domains, + output_file=output_file, + allowed_top_level_domains=[".com", ".nl", ".ai", ".de", ".be", ".eu", ".io"], + skip_paths=[ + "shop", "cart", "clients", "testimonials", "search", + "query", "calendar", "events", "archive", "news", + "blog", "media", "articles", "profile", "legal", + "tos", "products", "winkel", "winkelwagen", "archief", + "nieuws", "artikelen", "producten", "faq" + ] ) if len(urls) == 0: @@ -68,12 +78,12 @@ def spawn_spider_process(urls, keywords, skip_domains, process_id, log_level, lo print(f"Got here! Error {e}") if spiderCrawler.spider is not None: - print(f"Returning results of length for PID {process_id}: {len(spiderCrawler.spider.results)}") + spiderCrawler.spider.save_batch() + print(f"Returning results of length for PID {process_id} ({len(urls)} URLs: {urls}): {len(spiderCrawler.spider.results)} ({len(spiderCrawler.spider.visited)} visited)") return spiderCrawler.spider.results if __name__ == "__main__": - # Input URLs file_urls = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.urls}" logging.info(f"Reading list of base-urls from file: {file_urls}") @@ -95,7 +105,8 @@ def spawn_spider_process(urls, keywords, skip_domains, process_id, log_level, lo with open(file_skip_domains, 'r', encoding='utf-8') as file_in: skip_domains = [line.rstrip() for line in file_in] - num_workers = min([len(urls), 6]) + max_workers = 16 + num_workers = min([len(urls), max_workers]) batch_size = len(urls) // num_workers if len(urls) > num_workers else 1 url_chunks = np.array_split(urls, num_workers) @@ -103,16 +114,34 @@ def spawn_spider_process(urls, keywords, skip_domains, process_id, log_level, lo logfile = f"output/logs/log_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log" + time_part = datetime.now().strftime("%Y%m%d_%H%M%S") + if not os.path.exists(f"{CONFIG.output.output_dir}/{time_part}"): + os.makedirs(f"{CONFIG.output.output_dir}/{time_part}") + for i in range(0, num_workers): chunked_args.append( - (url_chunks[i], target_keywords, skip_domains, i, logging.INFO, logfile) + ( + url_chunks[i], + target_keywords, + skip_domains, + i, + logging.DEBUG, + logfile, + f"{CONFIG.output.output_dir}/{time_part}/worker_{i}.parquet" + ) ) print("# Workers:", num_workers) + start_time = time.perf_counter() + print("# Cores available:", multiprocessing.cpu_count()) with multiprocessing.Pool(processes=num_workers) as pool: results = sum(pool.starmap(spawn_spider_process, chunked_args), []) - print("Results:", results) + end_time = time.perf_counter() + + # print("Results:", results) print("#Results:", len(results)) + + print("Runtime: ", end_time - start_time) diff --git a/src/parse/HTML.py b/src/parse/HTML.py index a352f54..3237415 100644 --- a/src/parse/HTML.py +++ b/src/parse/HTML.py @@ -44,7 +44,7 @@ def parse(self, html: str) -> str: for tag in soup(self._disregard): tag.decompose() text = soup.get_text(separator="\n", strip=True) - logging.debug(f"First 100 characters of text extracted: {text[0:100]}") + #logging.debug(f"First 100 characters of text extracted: {text[0:100]}") return text except Exception as e: # Handle exceptions