tangled
alpha
login
or
join now
garrison.tngl.sh
/
anyscraper
0
fork
atom
an async framework for scraping and crawling the web
0
fork
atom
overview
issues
pulls
pipelines
respect status 429; default html scraper
garrison.tngl.sh
7 months ago
d8b7eaf3
c14b5925
+140
-43
6 changed files
expand all
collapse all
unified
split
anyscraper
crawler.py
robotstxt.py
scrape.py
url.py
pyproject.toml
uv.lock
+63
-14
anyscraper/crawler.py
···
3
3
Optional,
4
4
Callable,
5
5
Awaitable,
6
6
-
Iterable,
7
7
-
AsyncIterator,
6
6
+
Tuple,
8
7
Any,
9
8
Self,
10
9
)
···
32
31
status: int
33
32
message: str
34
33
34
34
+
@dataclass
35
35
+
class FetchSetTempDelay:
36
36
+
retry_after: float
37
37
+
status: int = 429
38
38
+
35
39
36
40
async def fetch(
37
41
url: Url, method: str = "GET", json: Any = None
38
38
-
) -> FetchResult | FetchError:
42
42
+
) -> FetchResult | FetchError | FetchSetTempDelay:
39
43
"""
40
44
safely fetches text for crawling purposes
41
45
on failure, logs details and returns None
42
46
"""
43
47
try:
44
48
async with aiohttp.ClientSession() as session:
45
45
-
fetch_log.info(f"fetching {url}")
46
49
async with session.request(method, str(url), json=json) as response:
47
47
-
if response.status != 200:
48
48
-
fetch_log.warn(f"status {response.status} at {url}")
50
50
+
fetch_log.warn(f"status {response.status} at {url}")
51
51
+
if response.status == 429:
52
52
+
# too many retries
53
53
+
retry_after = float(response.headers['retry-after'])
54
54
+
return FetchSetTempDelay(retry_after=retry_after)
55
55
+
elif response.status != 200:
49
56
text = await response.text()
50
57
return FetchError(status=response.status, message=text)
51
58
52
52
-
mime = response.headers["content-type"]
59
59
+
mime = response.headers.get("content-type", "text/plain")
53
60
text = await response.text()
54
61
55
62
return FetchResult(mime=mime, text=text)
56
63
except Exception as e:
57
64
fetch_log.error(f"{e}")
58
58
-
return FetchError(status=-1, message=f"{e}")
65
65
+
return FetchError(status=-1, message=f"{e.__class__.__name__}: {e}")
59
66
60
67
61
68
@dataclass
···
64
71
text: str
65
72
crawled: datetime
66
73
url: Url
67
67
-
source_url: Url
74
74
+
source_url: Optional[Url]
68
75
69
76
70
77
@dataclass(frozen=True)
71
78
class DomainInfo:
72
79
robots: RobotsTxt
80
80
+
# TODO why not just switch storing the delay_s to just storing the next
81
81
+
# allowed fetch time? this would be flexible, and allow dumping to db easily
73
82
crawl_delay_s: int
74
83
last_fetched: Optional[datetime] = None
75
84
76
76
-
def delay_needed(self, at: datetime) -> Optional[int]:
85
85
+
def delay_needed(self, at: datetime) -> Optional[float]:
77
86
if self.last_fetched is None or self.crawl_delay_s == 0.0:
78
87
return None
79
88
···
84
93
return None
85
94
return needed
86
95
96
96
+
def with_delay(self, new_delay_s: int):
97
97
+
return DomainInfo(
98
98
+
robots=self.robots,
99
99
+
crawl_delay_s=new_delay_s,
100
100
+
last_fetched=self.last_fetched,
101
101
+
)
102
102
+
87
103
88
104
@dataclass(frozen=True)
89
105
class QueuedUrl:
···
114
130
order_queued_url: Callable[
115
131
[QueuedUrl, QueuedUrl], bool
116
132
] = QueuedUrl.default_order,
117
117
-
default_crawl_delay_s: float = 0.5,
133
133
+
default_crawl_delay_s: int = 1,
118
134
):
119
135
# these should not be touched during run
120
136
self.user_agent = user_agent
···
124
140
self.default_crawl_delay_s = default_crawl_delay_s
125
141
126
142
self.domains: Dict[Domain, DomainInfo] = {}
143
143
+
# tracks 429 crawl delays
144
144
+
self.domain_delays: Dict[Domain, Tuple[datetime, float]] = {}
127
145
# TODO MRSW
128
146
self.domains_lock = asyncio.Lock()
129
147
···
148
166
149
167
return info
150
168
169
169
+
async def add_retry_delay(self, domain: Domain, delay_s: float):
170
170
+
now = datetime.now()
171
171
+
async with self.domains_lock:
172
172
+
if domain in self.domain_delays:
173
173
+
prev_dt, prev_delay_s = self.domain_delays[domain]
174
174
+
if (now.timestamp() + delay_s) > (prev_dt.timestamp() + prev_delay_s):
175
175
+
self.domain_delays[domain] = (now, delay_s)
176
176
+
else:
177
177
+
self.domain_delays[domain] = (now, delay_s)
178
178
+
151
179
async def get_domain_info(self, domain: Domain) -> DomainInfo:
152
180
"""fetches or retrieves stored domain info for a url"""
153
181
async with self.domains_lock:
154
182
if domain in self.domains:
155
183
# already parsed robots
156
156
-
return self.domains[domain]
184
184
+
info = self.domains[domain]
185
185
+
186
186
+
if domain in self.domain_delays:
187
187
+
from_dt, delay_s = self.domain_delays[domain]
188
188
+
passed_s = datetime.now().timestamp() - from_dt.timestamp()
189
189
+
needed_s = delay_s - passed_s
190
190
+
if needed_s > 0:
191
191
+
return info.with_delay(int(needed_s))
192
192
+
else:
193
193
+
# delay expired
194
194
+
del self.domain_delays[domain]
195
195
+
196
196
+
return info
157
197
158
198
# must parse robots
159
199
res = await fetch(domain.at("robots.txt"))
···
166
206
disable: /
167
207
"""
168
208
elif res.status == 200:
209
209
+
assert isinstance(res, FetchResult)
169
210
text = res.text
170
211
171
212
info = await self.configure_domain(domain, text)
···
233
274
if delay_needed is not None:
234
275
await asyncio.sleep(delay_needed)
235
276
236
236
-
return await fetch(url, method, json)
277
277
+
res = await fetch(url, method, json)
278
278
+
if isinstance(res, FetchSetTempDelay):
279
279
+
await self.add_retry_delay(url.domain, res.retry_after)
280
280
+
return FetchError(status=429, message="too many requests")
281
281
+
282
282
+
return res
237
283
238
284
async def _crawl(self, worker_name: str, queued: QueuedUrl):
239
285
"""
···
245
291
res = await fetch(queued.url, method=queued.method, json=queued.json)
246
292
if isinstance(res, FetchError):
247
293
return
294
294
+
elif isinstance(res, FetchSetTempDelay):
295
295
+
await self.add_retry_delay(queued.url.domain, res.retry_after)
296
296
+
return
248
297
249
298
crawl_res = CrawlResult(
250
299
mime=res.mime,
···
253
302
source_url=queued.source_url,
254
303
crawled=datetime.now(),
255
304
)
256
256
-
await self.crawlback(self, crawl_res, *self.crawlback_kwargs)
305
305
+
await self.crawlback(self, crawl_res, **self.crawlback_kwargs)
257
306
258
307
async def _worker(self, name: str):
259
308
while True:
+3
-3
anyscraper/robotstxt.py
···
41
41
self.sitemaps = []
42
42
self.disallow = []
43
43
self.allow = []
44
44
-
self.crawl_delay: Optional[float] = None
44
44
+
self.crawl_delay: Optional[int] = None
45
45
46
46
self.load_internet_rules(
47
47
(m.group(1).lower(), m.group(2)) for m in ROBOTS_RE.finditer(text)
···
94
94
continue
95
95
96
96
try:
97
97
-
self.crawl_delay = float(target)
97
97
+
self.crawl_delay = int(target)
98
98
except:
99
99
pass
100
100
else:
···
110
110
case "disallow":
111
111
self.disallow.append(target)
112
112
case "crawl-delay":
113
113
-
self.crawl_delay = float(target)
113
113
+
self.crawl_delay = int(target)
114
114
115
115
def save_rules(self) -> Iterator[Tuple[str, str]]:
116
116
"""useful for dumping to db"""
+49
-12
anyscraper/scrape.py
···
2
2
helpful scrapers for tasks common across websites
3
3
"""
4
4
5
5
-
from typing import Iterator, Tuple
6
6
-
from collections import namedtuple, Counter
5
5
+
from typing import cast, Iterator, AsyncIterator, Tuple, List
6
6
+
from collections import Counter
7
7
+
from dataclasses import dataclass
7
8
from bs4 import BeautifulSoup
8
8
-
from .crawler import Crawler
9
9
+
from bs4.element import PageElement
10
10
+
from .crawler import Crawler, FetchResult, FetchError
9
11
from .url import Url, Domain
10
12
from .logger import Logger
11
13
12
14
log_anchors = Logger("scrape-anchors")
13
15
log_sitemap = Logger("scrape-sitemap")
14
16
17
17
+
@dataclass
18
18
+
class HtmlScrape:
19
19
+
rel_links: List[Url]
20
20
+
anchor_hrefs: List[Url]
21
21
+
22
22
+
def is_lang(tag, lang):
23
23
+
actual = tag.get('lang')
24
24
+
return actual is None or actual == lang
15
25
16
16
-
def anchors(text: str) -> Iterator[Url]:
17
17
-
"""parses <a href="..."> urls from html"""
18
18
-
log_anchors.error("unimplemented")
26
26
+
def html(text: str, domain: Domain, lang="en") -> HtmlScrape:
27
27
+
"""
28
28
+
parses common link types from raw html
29
29
+
domain is used for relative links
30
30
+
"""
31
31
+
soup = BeautifulSoup(text, 'lxml')
32
32
+
33
33
+
rel_links = []
34
34
+
for link_tag in soup.select('link[rel][href]'):
35
35
+
if not is_lang(link_tag, lang):
36
36
+
continue
37
37
+
38
38
+
if link_tag.get('rel') == "next":
39
39
+
url = Url.from_quoted_str(cast(str, link_tag.get('href')), domain)
40
40
+
rel_links.append(url)
19
41
42
42
+
anchor_hrefs = []
43
43
+
for a_tag in soup.select('a[href]'):
44
44
+
if not is_lang(a_tag, lang):
45
45
+
continue
46
46
+
47
47
+
url = Url.from_quoted_str(cast(str, a_tag.get('href')), domain)
48
48
+
anchor_hrefs.append(url)
49
49
+
50
50
+
return HtmlScrape(rel_links=rel_links, anchor_hrefs=anchor_hrefs)
20
51
21
52
# TODO support multiple ways of discovering sitemap: https://www.standard-sitemap.org/deployment.php#linking
22
53
···
31
62
def _parse_sitemap_0_9(soup: BeautifulSoup, lang: str) -> Iterator[Tuple[str, str]]:
32
63
"""yields ("sitemap" | "url", url_string)"""
33
64
for sitemap in soup.find_all("sitemap"):
34
34
-
yield ("sitemap", sitemap.loc.string)
65
65
+
yield ("sitemap", sitemap.loc.string) # type: ignore
35
66
36
67
for urlset_url in soup.find_all("url"):
37
37
-
yield ("url", urlset_url.loc.string)
68
68
+
yield ("url", urlset_url.loc.string) # type: ignore
38
69
39
70
40
71
# TODO this should probably be redesigned to allow threaded handling of stupid large sitemaps
41
72
async def sitemap(
42
73
c: Crawler, domain: Domain, lang: str = "en"
43
43
-
) -> Iterator[Tuple[Url, Url]]:
74
74
+
) -> AsyncIterator[Tuple[Url, Url]]:
44
75
"""
45
76
recursively finds urls from a domain's sitemap(s) as specified in their
46
77
robots.txt (if they exist). super helpful for seeding a new domain.
···
63
94
64
95
res = await c.fetch(sitemap_url)
65
96
if res.status != 200:
97
97
+
assert isinstance(res, FetchError)
66
98
log_sitemap.warn(
67
99
f"on fetching {sitemap_url}: (status {res.status}) {res.message}"
68
100
)
69
101
continue
102
102
+
103
103
+
assert isinstance(res, FetchResult)
70
104
71
105
try:
72
106
soup = BeautifulSoup(res.text, "xml")
73
107
74
108
# check sitemap xmlns
75
75
-
first_el = next(iter(soup))
76
109
xmlns = SITEMAP_SCHEMA_HTTP_SITEMAPS_0_9
77
77
-
if "xmlns" in first_el:
78
78
-
xmlns = Url.from_str(first_el["xmlns"])
110
110
+
111
111
+
xmlns_el = soup.find(attrs={'name':'xmlns'})
112
112
+
if xmlns_el is not None:
113
113
+
assert xmlns_el is PageElement
114
114
+
xmlns = Url.from_str(xmlns_el.get("xmlns"))
79
115
80
116
result_iter = []
81
117
if xmlns in [
···
104
140
url = Url.from_quoted_str(url_str)
105
141
except Exception:
106
142
log_sitemap.warn(f"bad url in sitemap: {url_str}")
143
143
+
continue
107
144
108
145
match kind:
109
146
case "url":
+23
-12
anyscraper/url.py
···
1
1
from typing import Optional
2
2
from urllib.parse import urlparse
3
3
+
from dataclasses import dataclass
3
4
import regex as re
4
5
5
6
CONTROL_RE = re.compile(r"(%[0-9a-f]{2})", flags=re.IGNORECASE)
···
61
62
class Domain:
62
63
def __init__(self, url: str):
63
64
res = urlparse(url)
64
64
-
self.clean = f"{res.scheme}://{res.netloc}/"
65
65
+
scheme = "http" if res.scheme == '' else res.scheme
66
66
+
self.clean = f"{scheme}://{res.netloc}/"
65
67
66
68
def __eq__(self, other):
67
69
return isinstance(other, Domain) and other.clean == self.clean
···
75
77
def at(self, route: str):
76
78
if route[0] != "/":
77
79
route = "/" + route
78
78
-
return Url.from_parts(self, route)
80
80
+
return Url(self, route)
79
81
80
82
83
83
+
@dataclass
81
84
class Url:
82
82
-
@classmethod
83
83
-
def from_parts(cls, domain: Domain, route: str):
84
84
-
self = cls.__new__(cls)
85
85
-
self.domain = domain
86
86
-
self.route = route
87
87
-
return self
85
85
+
"""represents a url or a domain with a rule"""
86
86
+
87
87
+
domain: Domain
88
88
+
route: str
88
89
89
90
@classmethod
90
90
-
def from_str(cls, url_str: str):
91
91
+
def from_str(cls, url_str: str, domain: Optional[Domain]=None):
91
92
"""parses url"""
92
93
res = urlparse(url_str)
93
94
95
95
+
if res.netloc != '':
96
96
+
domain = Domain(url_str)
97
97
+
94
98
route = res.path
95
99
if len(res.query) > 0:
96
100
route += "?" + res.query
97
101
98
98
-
return Url.from_parts(Domain(url_str), route)
102
102
+
assert isinstance(domain, Domain)
103
103
+
return Url(domain, route)
99
104
100
105
@classmethod
101
101
-
def from_quoted_str(cls, url_str: str):
106
106
+
def from_quoted_str(cls, url_str: str, domain: Optional[Domain]=None):
102
107
"""parses url that may contain %XX quotes"""
103
103
-
return Url.from_str(unquote_urllike(url_str))
108
108
+
return Url.from_str(unquote_urllike(url_str), domain=domain)
109
109
+
110
110
+
def matches(self, other) -> bool:
111
111
+
"""check if this url matches other, assuming other.route is a rule"""
112
112
+
assert isinstance(other, Url)
113
113
+
return self.domain == other.domain \
114
114
+
and match_rule(other.route, self.route) is not None
104
115
105
116
def __eq__(self, other):
106
117
return (
+1
-1
pyproject.toml
···
1
1
[project]
2
2
-
name = "anyscrape"
2
2
+
name = "anyscraper"
3
3
version = "0.1.0"
4
4
description = "webscraping util"
5
5
requires-python = ">=3.13"
+1
-1
uv.lock
···
70
70
]
71
71
72
72
[[package]]
73
73
-
name = "anyscrape"
73
73
+
name = "anyscraper"
74
74
version = "0.1.0"
75
75
source = { virtual = "." }
76
76
dependencies = [