__init__.py
baiduspider.parser.__init__
special
⚓︎
Parser
⚓︎
__init__(self)
special
⚓︎
百度搜索解析器
Source code in baiduspider\parser\__init__.py
def __init__(self) -> None:
"""百度搜索解析器"""
super().__init__()
self.webSubParser = WebSubParser()
parse_baike(self, content)
⚓︎
解析百度百科搜索的页面源代码.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
content |
str |
已经转换为UTF-8编码的百度百科搜索HTML源码 |
required |
Returns:
Type | Description |
---|---|
dict |
dict: 解析后的结果 |
Source code in baiduspider\parser\__init__.py
def parse_baike(self, content: str) -> dict:
"""解析百度百科搜索的页面源代码.
Args:
content (str): 已经转换为UTF-8编码的百度百科搜索HTML源码
Returns:
dict: 解析后的结果
"""
code = self._minify(content)
# 创建BeautifulSoup对象
soup = (
BeautifulSoup(code, "html.parser")
.find("div", class_="body-wrapper")
.find("div", class_="searchResult")
)
# 获取百科总数
total = int(
soup.find("div", class_="result-count")
.text.strip("百度百科为您找到相关词条约")
.strip("个")
)
# 获取所有结果
container = soup.findAll("dd")
results = []
for res in container:
# 链接
url = "https://baike.baidu.com" + self._format(
res.find("a", class_="result-title")["href"]
)
# 标题
title = self._format(res.find("a", class_="result-title").text)
# 简介
des = self._format(res.find("p", class_="result-summary").text)
# 更新日期
upd_date = self._format(res.find("span", class_="result-date").text)
# 生成结果
results.append(
{"title": title, "des": des, "upd_date": upd_date, "url": url}
)
return {"results": results, "total": total}
parse_jingyan(self, content)
⚓︎
解析百度经验搜索的页面源代码.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
content |
str |
已经转换为UTF-8编码的百度经验搜索HTML源码 |
required |
Returns:
Type | Description |
---|---|
dict |
dict: 解析后的结果 |
Source code in baiduspider\parser\__init__.py
def parse_jingyan(self, content: str) -> dict:
"""解析百度经验搜索的页面源代码.
Args:
content (str): 已经转换为UTF-8编码的百度经验搜索HTML源码
Returns:
dict: 解析后的结果
"""
# 最小化代码
code = self._minify(content)
bs = BeautifulSoup(code, "html.parser")
total = int(
bs.find("div", class_="result-num")
.text.split("约", 1)[-1]
.split("个", 1)[0]
.replace(",", "")
)
# 加载搜索结果
data = bs.find("div", class_="search-list").findAll("dl")
results = []
for res in data:
# 标题
title = self._format(res.find("dt").find("a").text)
# 链接
url = "https://jingyan.baidu.com/" + res.find("dt").find("a")["href"]
# 简介
des = self._format(
res.find("dd")
.find("div", class_="summary")
.find("span", class_="abstract")
.text
)
# 获取发布日期和分类,位于`<span class="cate"/>`中
_ = res.find("dd").find("div", class_="summary").find("span", class_="cate")
tmp = self._format(_.text).split("-")
# 发布日期
pub_date = self._format(tmp[1]).replace("/", "-")
# 分类
category = self._format(tmp[-1]).strip("分类:").split(">")
# 发布者
publisher = {
"name": self._format(_.find("a").text),
"url": "https://jingyan.baidu.com" + _.find("a")["href"],
}
# 支持票数
votes = int(
self._format(
res.find("dt").find("span", class_="succ-times").text
).strip("得票")
)
# 是否为原创经验
try:
res.find("span", class_="i-original").text
original = True
except:
original = False
# 是否为优秀经验
try:
res.find("span", class_="i-good-exp").text
outstanding = True
except:
outstanding = False
# 生成结果
result = {
"title": title,
"url": url,
"des": des,
"pub_date": pub_date,
"category": category,
"votes": votes,
"publisher": publisher,
"is_original": original,
"is_outstanding": outstanding,
}
results.append(result) # 加入结果到集合中
# 获取分页
# pages_ = bs.find("div", class_="pager-wrap").findAll("a", class_="pg-btn")
# if not pages_:
# return {"results": results, "pages": 1}
# if "下一页" in pages_[-1].text:
# pages_ = pages_[:-1]
# pages = int(self._format(pages_[-1].text))
return {"results": results, "total": total}
parse_news(self, content)
⚓︎
解析百度资讯搜索的页面源代码.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
content |
str |
已经转换为UTF-8编码的百度资讯搜索HTML源码 |
required |
Returns:
Type | Description |
---|---|
dict |
dict: 解析后的结果 |
Source code in baiduspider\parser\__init__.py
def parse_news(self, content: str) -> dict:
"""解析百度资讯搜索的页面源代码.
Args:
content (str): 已经转换为UTF-8编码的百度资讯搜索HTML源码
Returns:
dict: 解析后的结果
"""
bs = BeautifulSoup(self._format(content), "html.parser")
# 搜索结果总数
total = int(
bs.find("div", id="wrapper_wrapper")
.find("span", class_="nums")
.text.split("资讯", 1)[-1]
.split("篇", 1)[0]
.replace(",", "")
)
# 搜索结果容器
data = (
bs.find("div", id="content_left")
.findAll("div")[1]
.findAll("div", class_="result-op")
)
results = []
for res in data:
# 标题
title = self._format(res.find("h3").find("a").text)
# 链接
url = res.find("h3").find("a")["href"]
# 简介
des = (
res.find("div", class_="c-span-last")
.find("span", class_="c-color-text")
.text
)
_ = res.find("div", class_="c-span-last")
# 作者
author = _.find("span", class_="c-gap-right").text
# 发布日期
try:
date = _.find("span", class_="c-color-gray2").text
except AttributeError:
date = None
# 封面图片
try:
cover = res.find("div", class_="c-img-radius-large").find("img")["src"]
except:
cover = None
# 生成结果
result = {
"title": title,
"author": author,
"date": date,
"des": des,
"url": url,
"cover": cover,
}
results.append(result) # 加入结果
# 获取所有页数
# pages_ = bs.find("div", id="page").findAll("a")
# # 过滤页码
# if "< 上一页" in pages_[0].text:
# pages_ = pages_[1:]
# if "下一页 >" in pages_[-1].text:
# pages_ = pages_[:-1]
return {"results": results, "total": total}
parse_pic(self, content)
⚓︎
解析百度图片搜索的页面源代码.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
content |
str |
已经转换为UTF-8编码的百度图片搜索HTML源码 |
required |
Returns:
Type | Description |
---|---|
dict |
dict: 解析后的结果 |
Source code in baiduspider\parser\__init__.py
@handle_err
def parse_pic(self, content: str) -> dict:
"""解析百度图片搜索的页面源代码.
Args:
content (str): 已经转换为UTF-8编码的百度图片搜索HTML源码
Returns:
dict: 解析后的结果
"""
# 从JavaScript中加载数据
# 因为JavaScript很像JSON(JavaScript Object Notation),所以直接用json加载就行了
# 还有要预处理一下,把函数和无用的括号过滤掉
error = None
try:
data = json.loads(
content.split("flip.setData('imgData', ")[1]
.split("flip.setData(")[0]
.split("]);")[0]
.replace(");", "")
.replace("<\\/strong>", "</strong>")
.replace("\\'", "'")
.replace('\\"', "'"),
strict=False,
)
except Exception as err:
error = err
if type(err) in [IndexError, AttributeError]:
raise ParseError("Invalid HTML content.")
finally:
if error:
raise ParseError(str(error))
soup = BeautifulSoup(content, "html.parser")
total = int(
soup.find("div", id="resultInfo")
.text.split("约")[-1]
.split("张")[0]
.replace(",", "")
)
results = []
for _ in data["data"][:-1]:
if _:
# 标题
title = str(_["fromPageTitle"]).encode("utf-8").decode("utf-8")
# 去除标题里的HTML
title = unescape(self._remove_html(title))
# 链接
url = _["objURL"]
# 来源域名
host = _["fromURLHost"]
# 生成结果
result = {"title": title, "url": url, "host": host}
results.append(result) # 加入结果
# 获取分页
# bs = BeautifulSoup(content, "html.parser")
# pages_ = bs.find("div", id="page").findAll("span", class_="pc")
# pages = []
# for _ in pages_:
# pages.append(int(_.text))
return {
"results": results,
# 取最大页码
# "pages": max(pages),
"total": total,
}
parse_video(self, content)
⚓︎
解析百度视频搜索的页面源代码.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
content |
str |
已经转换为UTF-8编码的百度视频搜索HTML源码 |
required |
Returns:
Type | Description |
---|---|
dict |
dict: 解析后的结果 |
Source code in baiduspider\parser\__init__.py
def parse_video(self, content: str) -> dict:
"""解析百度视频搜索的页面源代码.
Args:
content (str): 已经转换为UTF-8编码的百度视频搜索HTML源码
Returns:
dict: 解析后的结果
"""
bs = BeautifulSoup(content, "html.parser")
# 锁定结果div
data = bs.findAll("div", class_="video_short")
if len(data) == 0:
return {"results": None}
results = []
for res in data:
__ = res.find("div", class_="video_small_intro")
_ = __.find("a")
# 标题
title = self._format(_.text)
# 链接
url = _["href"]
# 封面图片链接
img = res.find("img", class_="border-radius")["src"].rsplit("?", 1)[0]
# 时长
length_ = res.find("span", class_="video_play_timer").text
_ = [int(i) for i in length_.split(":")]
if len(_) < 3:
length_ = time(minute=_[0], second=_[1])
else:
length_ = time(_[0], _[1], _[2])
# 简介
try:
des = __.find("div", class_="c-color-text").text
except AttributeError:
des = None
# 来源
try:
origin = self._format(__.find("span", class_="wetSource").text).strip(
"来源:"
)
except AttributeError:
origin = None
# 发布时间
pub_time: str = __.findAll("span", class_="c-font-normal")[-1].text.strip(
"发布时间:"
)
try:
__ = [int(i) for i in pub_time.split("-")]
except ValueError:
__ = self._convert_time(pub_time, True)
pub_time = datetime(__[0], __[1], __[2])
# 生成结果
result = {
"title": title,
"url": url,
"img": img,
"length": length_,
"des": des,
"origin": origin,
"pub_time": pub_time,
}
results.append(result) # 加入结果
return {"results": results}
parse_web(self, content, exclude)
⚓︎
解析百度网页搜索的页面源代码.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
content |
str |
已经转换为UTF-8编码的百度网页搜索HTML源码. |
required |
exclude |
list |
要屏蔽的控件. |
required |
Returns:
Type | Description |
---|---|
dict |
dict: 解析后的结果 |
Source code in baiduspider\parser\__init__.py
def parse_web(self, content: str, exclude: list) -> dict:
"""解析百度网页搜索的页面源代码.
Args:
content (str): 已经转换为UTF-8编码的百度网页搜索HTML源码.
exclude (list): 要屏蔽的控件.
Returns:
dict: 解析后的结果
"""
soup = BeautifulSoup(content, "html.parser")
if soup.find("div", id="content_left") is None:
return {"results": [], "pages": 0, "total": 0}
# 获取搜索结果总数
tmp1 = soup.findAll("div", class_="result-molecule")
idx_ = 0
ele = None
while ele is None and idx_ < len(tmp1):
tmp = tmp1[idx_].findAll("span")
found = False
for t in tmp:
if "百度为您找到相关结果" in t.text:
ele = t
found = True
break
if found:
break
idx_ += 1
num = int(
str(ele.text).strip("百度为您找到相关结果").strip("约").strip("个").replace(",", "")
)
# 定义预结果(运算以及相关搜索)
pre_results = []
# 预处理新闻
if "news" not in exclude:
news = soup.find("div", class_="result-op", srcid="19")
news_detail = self.webSubParser.parse_news_block(news)
# 预处理短视频
if "video" not in exclude:
video = soup.find("div", class_="op-short-video-pc")
video_results = self.webSubParser.parse_video_block(video)
# 预处理运算
if "calc" not in exclude:
calc = soup.find("div", class_="op_new_cal_screen")
# 预处理相关搜索
if "related" not in exclude:
try:
_related = soup.findAll("table")[-1].findAll("td")
except AttributeError:
_related = []
related = []
# 一个一个append相关搜索
for _ in _related:
if _.text:
related.append(self._format(_.text))
# 预处理百科
if "baike" not in exclude:
baike = soup.find("div", class_="c-container", tpl="bk_polysemy")
baike = self.webSubParser.parse_baike_block(baike)
# 预处理贴吧
if "tieba" not in exclude:
tieba = BeautifulSoup(content, "html.parser").find("div", srcid="10")
tieba = self.webSubParser.parse_tieba_block(tieba)
if "music" not in exclude:
music = BeautifulSoup(content, "html.parser").find(
"div", class_="result-op", tpl="yl_music_song"
)
music = self.webSubParser.parse_music_block(music)
# 预处理博客
article_tags = BeautifulSoup(content, "html.parser").findAll("article")
if "blog" not in exclude:
blog = None
for tmp in article_tags:
if tmp["class"][-1].startswith("open-source-software-blog"):
blog = tmp
break
blog = self.webSubParser.parse_blog_block(blog)
# 预处理码云
if "gitee" not in exclude:
gitee = None
for tmp in article_tags:
if tmp["class"][-1].startswith("osc-gitee"):
gitee = tmp
break
gitee = self.webSubParser.parse_gitee_block(gitee)
# 加载贴吧
if "tieba" not in exclude and tieba:
pre_results.append(dict(type="tieba", result=tieba))
# 加载博客
if "blog" not in exclude and blog:
pre_results.append(dict(type="blog", result=blog))
# 加载码云
if "gitee" not in exclude and gitee:
pre_results.append(dict(type="gitee", result=gitee))
# 加载搜索结果总数
# 已经移动到根字典中
# if num != 0:
# pre_results.append(dict(type="total", result=num))
# 加载运算
if "calc" not in exclude and calc:
pre_results.append(
dict(
type="calc",
process=str(
calc.find("p", class_="op_new_val_screen_process")
.find("span")
.text
),
result=str(
calc.find("p", class_="op_new_val_screen_result")
.find("span")
.text
),
)
)
# 加载相关搜索
if "related" not in exclude and related:
pre_results.append(dict(type="related", results=related))
# 加载资讯
if "news" not in exclude and news_detail:
pre_results.append(dict(type="news", results=news_detail))
# 加载短视频
if "video" not in exclude and video_results:
pre_results.append(dict(type="video", results=video_results))
# 加载百科
if "baike" not in exclude and baike:
pre_results.append(dict(type="baike", result=baike))
# 加载音乐
if "music" not in exclude and music:
pre_results.append(dict(type="music", result=music))
# 预处理源码
soup = BeautifulSoup(content, "html.parser")
results = soup.findAll("div", class_="result")
res = []
for result in results:
des = None
try:
result["tpl"]
except:
continue
soup = BeautifulSoup(self._minify(str(result)), "html.parser")
# 链接
href = soup.find("a").get("href").strip()
# 标题
title = self._format(str(soup.find("a").text))
# 时间
try:
time = self._format(
soup.findAll("div", class_="c-abstract")[0]
.find("span", class_="newTimeFactor_before_abs")
.text
)
except (AttributeError, IndexError):
time = None
try:
# 简介
des = soup.find_all("div", class_="c-abstract")[0].text
soup = BeautifulSoup(str(result), "html.parser")
des = self._format(des).lstrip(str(time)).strip()
except IndexError:
try:
des = des.replace("mn", "")
except (UnboundLocalError, AttributeError):
des = None
if time:
time = time.split("-")[0].strip()
# 因为百度的链接是加密的了,所以需要一个一个去访问
# 由于性能原因,分析链接部分暂略
# if href is not None:
# try:
# # 由于性能原因,这里设置1秒超时
# r = requests.get(href, timeout=1)
# href = r.url
# except:
# # 获取网页失败,默认换回原加密链接
# href = href
# # 分析链接
# if href:
# parse = urlparse(href)
# domain = parse.netloc
# prepath = parse.path.split('/')
# path = []
# for loc in prepath:
# if loc != '':
# path.append(loc)
# else:
# domain = None
# path = None
try:
result["tpl"]
except:
pass
is_not_special = (
result["tpl"]
not in [
"short_video_pc",
"sp_realtime_bigpic5",
"bk_polysemy",
"tieba_general",
"yl_music_song",
]
and result.find("article") is None
)
domain = None
if is_not_special: # 确保不是特殊类型的结果
# 获取可见的域名
try:
domain = (
result.find("div", class_="c-row")
.find("div", class_="c-span-last")
.find("div", class_="se_st_footer")
.find("a", class_="c-showurl")
.text
)
except Exception:
try:
domain = (
result.find("div", class_="c-row")
.find("div", class_="c-span-last")
.find("p", class_="op-bk-polysemy-move")
.find("span", class_="c-showurl")
.text
)
except Exception:
try:
domain = (
result.find("div", class_="se_st_footer")
.find("a", class_="c-showurl")
.text
)
except:
domain = None
if domain:
domain = domain.replace(" ", "")
# 百度快照
snapshot = result.find("a", class_="kuaizhao")
if snapshot is not None:
snapshot = snapshot["href"]
# 加入结果
if title and href and is_not_special:
res.append(
{
"title": title,
"des": des,
"origin": domain,
"url": href,
"time": time,
"snapshot": snapshot,
"type": "result",
}
)
soup = BeautifulSoup(content, "html.parser")
soup = BeautifulSoup(str(soup.findAll("div", id="page")[0]), "html.parser")
# 分页
# pages_ = soup.findAll("span", class_="pc")
# pages = []
# for _ in pages_:
# pages.append(int(_.text))
# # 如果搜索结果仅有一页时,百度不会显示底部导航栏
# # 所以这里直接设置成1,如果不设会报错`TypeError`
# if not pages:
# pages = [1]
# 设置最终结果
result = pre_results
result.extend(res)
return {
"results": result,
# 最大页数
# "pages": max(pages),
"total": num,
}
parse_wenku(self, content)
⚓︎
解析百度文库搜索的页面源代码。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
content |
str |
已经转换为UTF-8编码的百度文库搜索API接口JSON数据 |
required |
Returns:
Type | Description |
---|---|
dict |
dict: 解析后的结果 |
Source code in baiduspider\parser\__init__.py
def parse_wenku(self, content: str) -> dict:
"""解析百度文库搜索的页面源代码。
Args:
content (str): 已经转换为UTF-8编码的百度文库搜索API接口JSON数据
Returns:
dict: 解析后的结果
"""
results = []
pages = 0
_ = json.loads(content)
if _["status"]["msg"] != "success":
raise RuntimeError
for res in _["data"]["normalResult"]:
info = res["docInfo"]
author = res["authorInfo"]
title = (
info["title"]
.replace("<em>", "")
.replace("</em>", "")
.replace(" - 百度文库", "")
)
des = info["content"].replace("<em>", "").replace("</em>", "")
pub_date = strftime("%Y-%m-%d", localtime(int(info["createTime"])))
page_num = info["pageNum"]
score = info["qualityScore"]
downloads = info["downloadCount"]
url = info["url"]
is_vip = info["flag"] == 28
u_name = author["uname"]
u_url = f"https://wenku.baidu.com/u/{u_name}?uid={author['uid']}"
results.append(
{
"title": title,
"des": des,
"pub_date": pub_date,
"pages": page_num,
"quality": score,
"downloads": downloads,
"url": url,
"is_vip": is_vip,
"uploader": {"name": u_name, "url": u_url},
}
)
pages = math.ceil(
(_["data"]["total"] - len(_["data"]["normalResult"])) / 10 + 1
)
return {"results": results, "pages": pages}
parse_zhidao(self, content)
⚓︎
解析百度知道搜索的页面源代码.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
content |
str |
已经转换为UTF-8编码的百度知道搜索HTML源码 |
required |
Returns:
Type | Description |
---|---|
dict |
dict: 解析后的结果 |
Source code in baiduspider\parser\__init__.py
def parse_zhidao(self, content: str) -> dict:
"""解析百度知道搜索的页面源代码.
Args:
content (str): 已经转换为UTF-8编码的百度知道搜索HTML源码
Returns:
dict: 解析后的结果
"""
bs = BeautifulSoup(self._minify(content), "html.parser")
# 搜索结果总数
total = int(
bs.find("div", class_="wgt-picker")
.find("span", class_="f-lighter")
.text.split("共", 1)[-1]
.split("条结果", 1)[0]
.replace(",", "")
)
# 所有搜索结果
list_ = bs.find("div", class_="list").findAll("dl")
results = []
for item in list_:
# 屏蔽企业回答
if "ec-oad" in item["class"]:
continue
# print(item.prettify() + '\n\n\n\n\n\n\n')
# 标题
title = item.find("dt").text.strip("\n")
# 链接
try:
url = item.find("dt").find("a")["href"]
except KeyError:
url = item.find("dt").find("a")["data-href"]
if item.find("dd", class_="video-content") is not None:
# 问题
__ = item.find("dd", class_="summary")
question = __.text.strip("问:") if __ is not None else None
item = item.find("div", class_="right")
tmp = item.findAll("div", class_="video-text")
# # 简介
# des = self._format(tmp[2].text)
answer = None
# 回答者
answerer = tmp[0].text.strip("\n").strip("回答:\u2002")
# 发布日期
date = self._format(tmp[1].text.strip("时间:"))
# 回答总数
count = None
# 赞同数
try:
agree = int(tmp[2].text.strip("获赞:\u2002").strip("次"))
except ValueError:
agree = 0
answer = tmp[2].text.strip()
type_ = "video"
else:
# 回答
__ = item.find("dd", class_="answer")
answer = __.text.strip("答:") if __ is not None else None
# 问题
__ = item.find("dd", class_="summary")
question = __.text.strip("问:") if __ is not None else None
tmp = item.find("dd", class_="explain").findAll("span", class_="mr-8")
# 发布日期
date = (
item.find("dd", class_="explain").find("span", class_="mr-7").text
)
# 回答总数
try:
count = int(str(tmp[-1].text).strip("\n").strip("个回答"))
except:
count = None
# 回答者
answerer = tmp[-2].text.strip("\n").strip("回答者:\xa0")
# 赞同数
__ = item.find("dd", class_="explain").find("span", class_="ml-10")
agree = int(__.text.strip()) if __ is not None else 0
type_ = "normal"
# 生成结果
result = {
"title": title,
"question": question,
"answer": answer,
"date": date,
"count": count,
"url": url,
"agree": agree,
"answerer": answerer,
# "type": type_
}
results.append(result) # 加入结果
# 获取分页
# wrap = bs.find("div", class_="pager")
# pages_ = wrap.findAll("a")[:-2]
# if "下一页" in pages_[-1].text:
# pages = pages_[-2].text
# else:
# pages = pages_[-1].text
return {
"results": results,
# 取最大页码
# "pages": int(pages),
"total": total,
}