python实现配置文件传参的多线程爬虫

迷你定向网页抓取器

明确要求

这个迷你定向网页抓取器是基于如下需求来实现的网络爬虫的功能,实现对种子链接的广度优先抓取,并把URL格式符合特定pattern的网页保存到磁盘上。

要求:

  1. 需要支持命令行参数处理。具体包含: -h(帮助)、-v(版本)、-c(配置文件)

  2. 需要按照广度优先的顺序抓取网页

  3. 单个网页抓取或解析失败,不能导致整个程序退出。需要在日志中记录下错误原因并继续

  4. 当程序完成所有抓取任务后,必须优雅退出。

  5. 从HTML提取链接时需要处理相对路径和绝对路径。

  6. 网页存储时每个网页单独存为一个文件,以URL为文件名。注意对URL中的特殊字符,需要做转义。

  7. 要求支持多线程并行抓取。

  8. 代码的可读性和可维护性好。注意模块、类、函数的设计和划分

  9. PS Python CM委员会为大家提供测试抓取网站: http://pycm.baidu.com:8081


程序说明

Py-files:

run_main.py : this file is the execute-file of the project

mini_spider.py : this file is to start multi crawling-threads

config_args.py : this file is to load configurations from spider.conf

Url.py : this file is the class for url

crawl_thread.py: this file is a unit of crawling-thread

html_parse.py : this file is a class for parsing html to extract urls

downloader.py : this file is a class for downloading a page log.py : this file is used for logging

Cfg-files:

urls : this file save seed-urls (depth - 0)

spider.conf : this file save normal configurations for crawling

Dirs:

log : this dir is used for saving log-files

output : this dir is used for saving Url-page

test : this dir contains all unittest-py

How to run:

1: change into this dir

2: run python run_main.py -c spider.conf or python run_main.p


代码走读

spider.conf配置文件
1
2
3
4
5
6
7
8
[spider]
url_list_file = ./urls
output_directory = ./output
max_depth = 1
crawl_interval = 0.3
crawl_timeout = 2
target_url = .*.(gif|png|jpg|bmp)$
thread_count = 8
run_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import argparse
import logging

import mini_spider
import log

if __name__ == '__main__':
"""
主程序,程序入口
"""
log.init_log('./log/mini_spider')
logging.info('%-35s' % ' * miniSpider is starting ... ')
# ********************** start ***********************
# 1.set args for the program(设置参数)
parser = argparse.ArgumentParser(description = 'This is a mini spider program!')
parser.add_argument('-v',
'--version',
action='version',
version='%(prog)s 1.0.0')

parser.add_argument('-c',
'--config_file',
action='store',
dest='CONF_PATH',
default='spider.conf',
help='Set configuration file path')

args = parser.parse_args()

#2.create an instance of miniSpider and start crawling
mini_spider_inst = mini_spider.MiniSpider(args.CONF_PATH)
init_success = mini_spider_inst.initialize()
if init_success:
mini_spider_inst.pre_print()
mini_spider_inst.run_threads()

# ********************* end **************************
logging.info('%-35s' % ' * miniSpider is ending ...')

可以看到主程序其实就是两步,第一步是传入参数(配置文件),第二步是多线程爬虫(另外写的爬虫类)

先来看传入参数:

这里使用的是argparse模块,argparse的使用demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--verbosity", help="increase output verbosity")
args = parser.parse_args()
if args.verbosity:
print("verbosity turned on")

运行和输出
$ python3 prog.py --verbosity 1
verbosity turned on

$ python3 prog.py
无输出(这里--verbosity是可选参数)

$ python3 prog.py --help
usage: prog.py [-h] [--verbosity VERBOSITY]
options:
-h, --help show this help message and exit
--verbosity VERBOSITY
increase output verbosity

$ python3 prog.py --verbosity
usage: prog.py [-h] [--verbosity VERBOSITY]
prog.py: error: argument --verbosity: expected one argument

argparse中dest的用法:

dest:如果提供dest,例如dest=“CONF_PATH”,那么可以通过args.CONF_PATH访问该参数。

再来看多线程爬虫:

先是mini_spider模块调用生成MiniSpider类对象

mini_spider_inst = mini_spider.MiniSpider(args.CONF_PATH)

再是调用initialize()的方法,init_success = mini_spider_inst.initialize(),就是将配置文件的内容赋值给类变量。

最后就是多线程掉起,mini_spider_inst.run_threads()

mini_spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import Queue
import threading
import os
import logging
import re

import termcolor

import url_object
import config_args
import crawl_thread

class MiniSpider(object):
"""
This class is a crawler-master-class for operating serveral crawling threads
Attributes:
checking_url : 存放待爬URL的队列
checked_url : 存放已经爬取过URL的队列
config_file_path : 配置文件路径
error_url : 存放访问出错URL的队列
lock : 线程锁
"""
def __init__(self, config_file_path='spider.conf'):
"""
Initialize variables
"""
self.checking_url = Queue.Queue(0)
self.checked_url = set()
self.error_url = set()
self.config_file_path = config_file_path
self.lock = threading.Lock()

def initialize(self):
"""
Initialize ConfigArgs parameters
Returns:
True / False : 相关配置文件正常返回True,否则返回False
"""
config_arg = config_args.ConfigArgs(self.config_file_path) # 从配置文件中读取数据
is_load = config_arg.initialize()
if not is_load:
self.program_end('there is no conf file !')
return False

self.url_list_file = config_arg.get_url_list_file()
self.output_dir = config_arg.get_output_dir()
self.max_depth = config_arg.get_max_depth()
self.crawl_interval = config_arg.get_crawl_interval()
self.crawl_timeout = config_arg.get_crawl_timeout()
self.target_url = config_arg.get_target_url()
self.thread_count = config_arg.get_thread_count()
self.tag_dict = config_arg.get_tag_dict()
self.url_pattern = re.compile(self.target_url)

seedfile_is_exist = self.get_seed_url()
return seedfile_is_exist

def pre_print(self):
"""
MiniSpider 创建时显示配置信息
"""
print termcolor.colored('* MiniSpider Configurations list as follows:', 'green')
print termcolor.colored('* %-25s : %s' % ('url_list_file :',
self.url_list_file),
'green'
)

print termcolor.colored('* %-25s : %s' % ('output_directory:',
self.output_dir),
'green'
)

print termcolor.colored('* %-25s : %s' % ('max_depth :',
self.max_depth),
'green'
)

print termcolor.colored('* %-25s : %s' % ('crawl_interval :',
self.crawl_interval),
'green'
)

print termcolor.colored('* %-25s : %s' % ('crawl_timeout :',
self.crawl_timeout),
'green'
)

print termcolor.colored('* %-25s : %s' % ('target_url :',
self.target_url),
'green'
)

print termcolor.colored('* %-25s : %s' % ('thread_count :',
self.thread_count),
'green'
)

def get_seed_url(self):
"""
get seed url from seedUrlFile
Returns:
True / False : 存在种子文件返回True, 否则返回 False
"""
if not os.path.isfile(self.url_list_file):
logging.error(' * seedfile is not existing !!!')
self.program_end('there is no seedfile !')
return False

with open(self.url_list_file, 'rb') as f:
lines = f.readlines()

for line in lines:
if line.strip() == '':
continue

url_obj = url_object.Url(line.strip(), 0)
self.checking_url.put(url_obj)
return True

def program_end(self, info):
"""
退出程序的后续信息输出函数
Args:
info : 退出原因信息
Returns:
none
"""
print termcolor.colored('* crawled page num : {}'.format(len(self.checked_url)), 'green')
logging.info('crawled pages num : {}'.format(len(self.checked_url)))
print termcolor.colored('* error page num : {}'.format(len(self.error_url)), 'green')
logging.info('error page num : {}'.format(len(self.error_url)))
print termcolor.colored('* finish_reason :' + info, 'green')
logging.info('reason of ending :' + info)
print termcolor.colored('* program is ended ... ', 'green')
logging.info('program is ended ... ')

def run_threads(self):
"""
设置线程池,并启动线程
"""
args_dict = {}
args_dict['output_dir'] = self.output_dir
args_dict['crawl_interval'] = self.crawl_interval
args_dict['crawl_timeout'] = self.crawl_timeout
args_dict['url_pattern'] = self.url_pattern
args_dict['max_depth'] = self.max_depth
args_dict['tag_dict'] = self.tag_dict

for index in xrange(self.thread_count):
thread_name = 'thread - %d' % index
# CrawlerThread类继承了threading.Thread
thread = crawl_thread.CrawlerThread(thread_name,
self.process_request,
self.process_response,
args_dict)

thread.setDaemon(True) # MainThread 结束,子线程也立马结束,怎么做呢
thread.start()
print termcolor.colored(("第%s个线程开始工作") % index, 'yellow')
logging.info(("第%s个线程开始工作") % index)

self.checking_url.join()
self.program_end('normal exits ')

def is_visited(self, url_obj):
"""
check new url_obj if visited (including Checked_Url and Error_Url)
Args:
url_obj : Url 对象
Returns:
True / False - 若访问过则返回 True ,否则返回 False
"""
checked_url_list = self.checked_url.union(self.error_url)

for checked_url_ in checked_url_list:
if url_obj.get_url() == checked_url_.get_url():
return True

return False

def process_request(self):
"""
线程任务前期处理的回调函数:
负责从任务队列checking_url中取出url对象
Returns:
url_obj : 取出的url-object 对象
"""
url_obj = self.checking_url.get()
return url_obj

def process_response(self, url_obj, flag, extract_url_list=None):
"""
线程任务后期回调函数:
解析HTML源码,获取下一层URLs 放入checking_url
Args:
extract_url_list : 返回抽取出的urls集合
url_obj : 被下载页面所处的url链接对象
flag : 页面下载具体情况的返回标志
- 0 : 表示下载成功且为非pattern页面
- 1 : 表示下载成功且为符合pattern的图片
- -1 : 表示页面下载失败
- 2 : depth >= max_depth 的非target - URL
"""
if self.lock.acquire():
if flag == -1:
self.error_url.add(url_obj)

elif flag == 0:
self.checked_url.add(url_obj)
# link add into Checking_Url
for ex_url in extract_url_list:
next_url_obj = url_object.Url(ex_url, int(url_obj.get_depth()) + 1)
if not self.is_visited(next_url_obj):
self.checking_url.put(next_url_obj)

elif flag == 1:
self.checked_url.add(url_obj)
self.checking_url.task_done()
self.lock.release()
crawl_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import threading
import logging
import urllib
import re
import time
import os

import downloader
import html_parser

class CrawlerThread(threading.Thread):
"""
This class is a crawler thread for crawling pages by breadth-first-crawling
Attributes:
process_request : 前期回调函数
process_response: 后期回调函数
output_dir : 存放target 目录
crawl_interval : 爬取间隔
crawl_timeout : 爬取时间延迟
target_url : 目标文件链接格式
max_depth : 爬取最大深度
tag_dict : 链接标签字典
"""
def __init__(self, name, process_request, process_response, args_dict):

super(CrawlerThread, self).__init__(name=name)
self.process_request = process_request
self.process_response = process_response
self.output_dir = args_dict['output_dir']
self.crawl_interval = args_dict['crawl_interval']
self.crawl_timeout = args_dict['crawl_timeout']
self.url_pattern = args_dict['url_pattern']
self.max_depth = args_dict['max_depth']
self.tag_dict = args_dict['tag_dict']

def run(self):
"""
线程执行的具体内容
"""
while 1:
url_obj = self.process_request()
time.sleep(self.crawl_interval)

logging.info('%-12s : get a url in depth : ' %
threading.currentThread().getName() + str(url_obj.get_depth()))

if self.is_target_url(url_obj.get_url()):
flag = -1
if self.save_target(url_obj.get_url()):
flag = 1
self.process_response(url_obj, flag)
continue

if url_obj.get_depth() < self.max_depth:
downloader_obj = downloader.Downloader(url_obj, self.crawl_timeout)
response, flag = downloader_obj.download() #flag = 0 or -1

if flag == -1: # download failed
self.process_response(url_obj, flag)
continue

if flag == 0: # download sucess
content = response.read()
url = url_obj.get_url()
soup = html_parser.HtmlParser(content, self.tag_dict, url)
extract_url_list = soup.extract_url()

self.process_response(url_obj, flag, extract_url_list)
else:
flag = 2 # depth > max_depth 的正常URL
self.process_response(url_obj, flag)

def is_target_url(self, url):
"""
判断url 是否符合TargetUrl的形式
Args:
url : 被用来判断的url
Returns:
True/False : 符合返回True 否则返回False
"""
found_aim =self.url_pattern.match(url)
if found_aim:
return True
return False

def save_target(self, url):
"""
save targetUrl-page into outputDir
Args:
response : 页面返回file-object
Returns:
none
"""
if not os.path.isdir(self.output_dir):
os.mkdir(self.output_dir)

file_name = urllib.quote_plus(url)
if len(file_name) > 127:
file_name = file_name[-127:]
target_path = "{}/{}".format(self.output_dir, file_name)
try:
urllib.urlretrieve(url, target_path)
return True
except IOError as e:
logging.warn(' * Save target Faild: %s - %s' % (url, e))
return False

在多线程的调用中这里是自定义一个类CrawlerThread,继承了父类threading.Thread。

派生类中重写父类threading.Thread的run()方法,其他方法(除了构造函数)都不应在子类中被重写,换句话说,在子类中只有_init_()和run()方法被重写。使用线程的时候先生成一个子线程类的对象,然后对象调用start()方法就可以运行线程啦(start调用run)

参考Github:https://github.com/DrCubic/MiniSpider

总结

文章中介绍了python程序通过配置文件传参数,多线程广度优先爬取符合要求的网页


觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

python实现配置文件传参的多线程爬虫
http://yuting0907.github.io/2022/07/15/python实现配置文件传参的多线程爬虫/
作者
Echo Yu
发布于
2022年7月15日
许可协议