前言
对于爬虫来说,当你的访问频率达到了目标网站的预警值时,就可能触发目标网站的反爬机制。而封禁访问者ip就是很常见的一个反爬机制。
当ip被封禁后,从此ip发出的请求将不能得到正确的响应。这种时候,我们就需要一个代理ip池。
什么是代理ip池?
通俗地比喻一下,它就是一个池子,里面装了很多代理ip。它有如下的行为特征:
- 1.池子里的ip是有生命周期的,它们将被定期验证,其中失效的将被从池子里面剔除。
- 2.池子里的ip是有补充渠道的,会有新的代理ip不断被加入池子中。
- 3.池子中的代理ip是可以被随机取出的。
这样,代理池中始终有多个不断更换的、有效的代理ip,且我们可以随机从池子中取出代理ip,然后让爬虫程序使用代理ip访问目标网站,就可以避免爬虫被ban的情况。
今天,我们就来说一下如何构建自己的代理ip池。而且,我们要做一个比较灵活的代理池,它提供两种代理方式:
- 1.每次都通过http接口提取一个随机代理ip,然后在爬虫中使用此代理ip(大部分代理ip池服务都是这种形式)
- 2.使用squid3代理做请求转发,爬虫设置好squid3代理的地址,每次请求将由squid3自动转发给代理池中的代理
项目已经放到了github上,不想看原理、只想应用的可以直接移步github:open_proxy_pool
地址:https://github.com/AaronJny/open_proxy_pool
原理请往下看。
转载请注明出处:https://blog.csdn.net/aaronjny/article/details/87865942
代理池结构
代理池的组件可以大致描述如下:
- 1.代理IP的获取/补充渠道,定期把获取到的代理ip加入到代理池中
- 2.代理ip的验证机制,定期验证代理池中ip的有效性,并删除掉所有失效的ip
- 3.一个web服务,用以提供获取一个随机代理的api
- 4.squid3的维持脚本,它定期获取代理池中的可用ip,更新squid中的可转发代理列表
- 5.一个调度器,程序的入口,用来协调各组件的运行
如果不是很理解,没关系,请往下看,我会细说。
环境说明
为了实现代理IP池,我们如下的软件环境(列举主要部分):
- 1.redis服务器,用以存放代理池相关数据
- 2.flask,用以实现提取单个随机代理的api
- 3.squid3,用以实现代理转发
组件1-获取代理ip的渠道
我们有很多种渠道获取代理ip。笼统一点来说,可以分为两类,免费代理和收费代理。
免费代理,顾名思义嘛,最大的优点就是免费,不需要什么成本,网上搜一下就能找到。缺点也很明显,免费代理毕竟是免费的,所以质量根本不能保证,大部分无法使用,能用的多数也速度奇慢。
收费代理的质量相对来说就好多了,不同平台的代理质量和价格上都有些出入,可以自行比较。
个人学习的话,如果真的资金优先,可以考虑采集免费代理;如果资金相对充裕,可以花钱买一天或一周的代理使用,价格也不贵。我是比较推荐收费代理的,因为免费代理的质量真的不敢恭维。
企业商用的话,优先考虑收费代理吧,会稳定很多。
我选择的代理服务商是站大爷(http://ip.zdaye.com/),声明一下,我真的没收广告费啊= =。坦言说,站大爷的代理质量只能算一般,不过也够用。有几家的质量比它要好一些,不过好的有限。让我选择站大爷的最大原因是,它支持账号密码访问的模式。
没用过收费代理的朋友可能不清楚,使用收费代理平台的接口,从平台上批量提取代理ip或使用代理时,一般都是要绑定你的机器ip的。比如,你的机器ip是123.123.123.123,你就需要事先在平台上把ip绑定为123.123.123.123,这样,你只能通过IP为123.123.123.123的机器从平台提取ip,提取出的ip也只能由ip为123.123.123.123的机器使用,其他ip的机器都不行。当我们有多台机器的时候,就会非常尴尬了,毕竟不能给每台机器都买一次代理吧,很不划算。
在站大爷上面,除了绑定ip这个方法外,还可以选择使用账号+密码提取/使用代理,选择这个方法的话就不再收到IP地址的限制。讲道理,有点舒服啊= =
我先面的编码以站大爷为例,使用其他代理服务的可自行编写相关脚本,原理和逻辑都是相通的,部分细节上针对处理即可。
购买的细节我也不说了,如果需要购买的话,直接去官网购入短效优质代理
即可。
先放出这部分的完整代码,附有注释。
# -*- coding: utf-8 -*-
# @File : get_ip.py
# @Author: AaronJny
# @Date : 18-12-14 上午10:44
# @Desc : 从指定网站上获取代理ip,
# 我目前在使用站大爷,就以站大爷为例
import requests
import time
import utils
import settings
from gevent.pool import Pool
from gevent import monkey
monkey.patch_all()
class ZdyIpGetter:
""" 从`站大爷`上提取代理ip的脚本,使用其他代理服务的可自行编写相关脚本, 原理和逻辑都是相通的,部分细节上需要针对处理 """
def __init__(self):
# 购买服务时,网站给出的提取ip的api,替换成自己的
self.api_url = 'http://xxxxxxxxxxxxxxxxxxxxxxxxxx'
self.logger = utils.get_logger(getattr(self.__class__, '__name__'))
self.proxy_list = []
self.good_proxy_list = []
self.pool = Pool(5)
self.server = utils.get_redis_client()
def check_proxy(self, proxy):
""" 检查代理是否可用, 并将可用代理加入到指定列表中 :param proxy: :return: """
if settings.USE_PASSWORD:
tmp_proxy = '{}:{}@{}'.format(settings.USERNAME, settings.PASSWORD, proxy)
else:
tmp_proxy = '{}'.format(proxy)
proxies = {
'http': 'http://' + tmp_proxy,
'https': 'https://' + tmp_proxy,
}
try:
# 验证代理是否可用时,访问的是ip138的服务
resp = requests.get('http://2019.ip138.com/ic.asp', proxies=proxies, timeout=10)
# self.logger.info(resp.content.decode('gb2312'))
# 判断是否成功使用代理ip进行访问
assert proxy.split(':')[0] in resp.content.decode('gb2312')
self.logger.info('[GOOD] - {}'.format(proxy))
self.good_proxy_list.append(proxy)
except Exception as e:
self.logger.info('[BAD] - {} , {}'.format(proxy, e.args))
def get_proxy_list(self):
""" 提取一批ip,筛选出可用的部分 注:当可用ip小于两个时,则保留全部ip(不论测试成功与否) :return: """
while True:
try:
res = requests.get(self.api_url, timeout=10).content.decode('utf8')
break
except Exception as e:
self.logger.error('获取代理列表失败!重试!{}'.format(e))
time.sleep(1)
if len(res) == 0:
self.logger.error('未获取到数据!')
elif 'bad' in res:
self.logger.error('请求失败!')
# 检测未考虑到的异常情况
elif res.count('.') != 15:
self.logger.error(res)
else:
self.logger.info('开始读取代理列表!')
for line in res.split():
if ':' in line:
self.proxy_list.append(line.strip())
self.pool.map(self.check_proxy, self.proxy_list)
self.pool.join()
# 当本次检测可用代理数量小于2个时,则认为检测失败,代理全部可用
if len(self.good_proxy_list) < 2:
self.good_proxy_list = self.proxy_list.copy()
self.logger.info('>>>> 完成! <<<<')
def save_to_redis(self):
""" 将提取到的有效ip保存到redis中, 供其他组件访问 :return: """
for proxy in self.good_proxy_list:
self.server.zadd(settings.IP_POOL_KEY, int(time.time()) + settings.PROXY_IP_TTL, proxy)
def fetch_new_ip(self):
""" 获取一次新ip的整体流程控制 :return: """
self.proxy_list.clear()
self.good_proxy_list.clear()
self.get_proxy_list()
self.save_to_redis()
def main(self):
""" 周期获取新ip :return: """
start = time.time()
while True:
# 每 settings.FETCH_INTERVAL 秒获取一批新IP
if time.time() - start >= settings.FETCH_INTERVAL:
self.fetch_new_ip()
start = time.time()
time.sleep(2)
if __name__ == '__main__':
ZdyIpGetter().main()
说一下这里面的关键部分:
- 1.如何保存代理池相关数据?
- 从平台上获取的ip是有生命周期的,一般几分钟后就会失效,所以我们需要用类似于字典的形式保存代理IP和它的过期时间
- 为了更好地容错,我们将从平台上提取到的ip的生命周期统一设置为
settings.PROXY_IP_TTL
,而代理的可用时间一般是大于settings.PROXY_IP_TTL
(我默认设置的是60s)。 - 为了保证处理效率,实际使用的redis数据结构并非散列表(类似于python中的字典),而是zset(有序集合,可以为集合里面的每个元素设置一个分数,并能够分数来筛选区间内的元素)。这里,代理ip是zset中的元素,过期时间是元素的分数,参考上面的
save_to_redis(self)
代码。
- 2.如何验证提取到的ip是否可用?
- 提取到的ip有些可能是不能用的,所以我先进行了验证,再将有效的加入到代理池中
- 使用ip138的接口验证代理是否生效
- 校验之后,如果可用的ip非常少或全部失败,我倾向于认为是检验手段出了问题,并认为此批ip均为正常的,加入到代理池中
组件2-检验并清理过期ip
因为我给每个加入代理池的ip都设置了过期时间,所以检查代理ip是否有效这个操作,也并非真的去检验ip本身,而是检查它的过期时间。
我们需要清除掉过期时间<当前时间的ip,而zset可以快速实现此操作。
# -*- coding: utf-8 -*-
# @File : delele_ip.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:15
# @Desc : 过期ip清理器
import utils
import settings
import time
class ExpireIpCleaner:
def __init__(self):
self.logger = utils.get_logger(getattr(self.__class__, '__name__'))
self.server = utils.get_redis_client()
def clean(self):
""" 清理代理池中的过期ip :return: """
self.logger.info('开始清理过期ip')
# 计算清理前代理池的大小
total_before = int(self.server.zcard(settings.IP_POOL_KEY))
# 清理
self.server.zremrangebyscore(settings.IP_POOL_KEY, 0, int(time.time()))
# 计算清理后代理池的大小
total_after = int(self.server.zcard(settings.IP_POOL_KEY))
self.logger.info('完毕!清理前可用ip {},清理后可用ip {}'.format(total_before, total_after))
def main(self):
""" 周期性的清理过期ip :return: """
while True:
self.clean()
self.logger.info('*' * 40)
time.sleep(settings.CLEAN_INTERVAL)
if __name__ == '__main__':
ExpireIpCleaner().main()
定期进行检测和清理,很简单,没有什么需要说的。
组件3-获取随机ip的web接口
不得不说,使用flask开发简单的接口真的是太舒服了,简洁而快速。这个web服务提供两个小功能:
- 1.获取一个随机的可用代理ip
- 2.查看当前代理池中可用的代理ip的数量
# -*- coding: utf-8 -*-
# @File : web_api.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:22
# @Desc : 提供http接口的web程序
import utils
import settings
import flask
import random
import time
redis_client = utils.get_redis_client()
ip_pool_key = settings.IP_POOL_KEY
app = flask.Flask(__name__)
@app.route('/random/')
def random_ip():
""" 获取一个随机ip :return: """
# 获取redis中仍可用的全部ip
proxy_ips = redis_client.zrangebyscore(ip_pool_key, int(time.time()),
int(time.time()) + settings.PROXY_IP_TTL * 10)
if proxy_ips:
ip = random.choice(proxy_ips)
# 如果ip需要密码访问,则添加
if settings.USE_PASSWORD:
ip = '{}:{}@{}'.format(settings.USERNAME, settings.PASSWORD, ip.decode('utf8'))
return ip
else:
return ''
@app.route('/total/')
def total_ip():
""" 统计池中可用代理的数量 :return: """
total = redis_client.zcard(ip_pool_key)
if total:
return str(total)
else:
return '0'
def main():
""" 程序运行入口 :return: """
app.run('0.0.0.0', port=settings.API_WEB_PORT)
if __name__ == '__main__':
app.run('0.0.0.0', port=settings.API_WEB_PORT)
都很简单,就不细说了。
组件4-squid的维持、更新脚本
处理http的接口外,我们还可以使用squid做代理转发,这样,在爬虫程序中就不需要再频繁地更换代理IP地址,直接填上squid的地址,它会自动帮你转发给其他代理ip。
这个脚本提供如下功能:
- 1.从代理池中读取所有可用代理ip,作为可转发的代理列表写入到squid的配置文件中,并通过命令使squid重新加载配置文件。这样,squid一直使用最新可用的那些代理ip。
- 2.当squid服务异常时,通过命令杀死所有squid进程,并重新开启,保证服务正常运行
下面的代码中使用了名为squid.conf
的文件,此文件在github上,是关于squid的一些配置。如果需要对squid进行深度定制,需要自行修改这个文件。
# -*- coding: utf-8 -*-
# @File : squid_keeper.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:27
# @Desc : 维持squid3使用可用ip的脚本
import utils
import settings
import time
import os
import subprocess
class SquidKeeper:
def __init__(self):
self.logger = utils.get_logger(getattr(self.__class__, '__name__'))
self.server = utils.get_redis_client()
self.ip_pool_key = settings.IP_POOL_KEY
# 区别对待使用密码和不使用密码的配置模板
if settings.USE_PASSWORD:
self.peer_conf = "cache_peer %s parent %s 0 no-query proxy-only login={}:{} never_direct allow all round-robin weight=1 connect-fail-limit=2 allow-miss max-conn=5\n".format(
settings.USERNAME, settings.PASSWORD)
else:
self.peer_conf = "cache_peer %s parent %s 0 no-query proxy-only never_direct allow all round-robin weight=1 connect-fail-limit=2 allow-miss max-conn=5\n"
def read_new_ip(self):
""" 从redis中读取全部有效ip :return: """
self.logger.info('读取代理池中可用ip')
proxy_ips = self.server.zrangebyscore(settings.IP_POOL_KEY, int(time.time()),
int(time.time()) + settings.PROXY_IP_TTL * 10)
return proxy_ips
def update_conf(self, proxy_list):
""" 根据读取到的代理ip,和现有配置文件模板, 生成新的squid配置文件并重新加载,让squid使用最新的ip。 :param proxy_list: :return: """
self.logger.info('准备加载到squid中')
with open('squid.conf', 'r') as f:
squid_conf = f.readlines()
squid_conf.append('\n# Cache peer config\n')
for proxy in proxy_list:
ip, port = proxy.decode('utf8').split(':')
squid_conf.append(self.peer_conf % (ip, port))
with open('/etc/squid/squid.conf', 'w') as f:
f.writelines(squid_conf)
failed = os.system('squid -k reconfigure')
# 这是一个容错措施
# 当重新加载配置文件失败时,会杀死全部相关进行并重试
if failed:
self.logger.info('squid进程出现问题,查找当前启动的squid相关进程...')
p = subprocess.Popen("ps -ef | grep squid | grep -v grep | awk '{print $2}'", shell=True,
stdout=subprocess.PIPE, universal_newlines=True)
p.wait()
result_lines = [int(x.strip()) for x in p.stdout.readlines()]
self.logger.info('找到如下进程:{}'.format(result_lines))
if len(result_lines):
for proc_id in result_lines:
self.logger.info('开始杀死进程 {}...'.format(proc_id))
os.system('kill -s 9 {}'.format(proc_id))
self.logger.info('全部squid已被杀死,开启新squid进程...')
os.system('service squid restart')
time.sleep(3)
self.logger.info('重新加载ip...')
os.system('squid -k reconfigure')
self.logger.info('当前可用IP数量 {}'.format(len(proxy_list)))
def main(self):
""" 周期性地更新squid的配置文件, 使其使用最新的代理ip :return: """
while True:
proxy_list = self.read_new_ip()
self.update_conf(proxy_list)
self.logger.info('*' * 40)
time.sleep(settings.SQUID_KEEPER_INTERVAL)
if __name__ == '__main__':
SquidKeeper().main()
组件5-调度器
调度器是程序的入口,也是对以上各个组件的控制和整合。
它的主要功能是:
- 1.使用子进程分别开启各个组件
- 2.在某个组件异常退出后,重启它
- 3.接收到终止信号时,关闭所有存活的组件进程后再退出
# -*- coding: utf-8 -*-
# @File : scheduler.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:41
# @Desc : 调度中心,所有组件在这里被统一启动和调度
import utils
import settings
from get_ip import ZdyIpGetter
from delele_ip import ExpireIpCleaner
from web_api import app
from squid_keeper import SquidKeeper
from multiprocessing import Process
import time
class Scheduler:
logger = utils.get_logger('Scheduler')
@staticmethod
def fetch_ip():
"""
获取新ip的进程
:return:
"""
while True:
try:
ZdyIpGetter().main()
except Exception as e:
print(e.args)
@staticmethod
def clean_ip():
"""
定期清理过期ip的进程
:return:
"""
while True:
try:
ExpireIpCleaner().main()
except Exception as e:
print(e.args)
@staticmethod
def squid_keep():
"""
维持squid使用最新ip的进程
:return:
"""
while True:
try:
SquidKeeper().main()
except Exception as e:
print(e.args)
@staticmethod
def api():
"""
提供web接口的进程
:return:
"""
app.run('0.0.0.0', settings.API_WEB_PORT)
def run(self):
process_list = []
try:
# 只启动打开了开关的组件
if settings.IP_GETTER_OPENED:
# 创建进程对象
fetch_ip_process = Process(target=Scheduler.fetch_ip)
# 并将组件进程加入到列表中,方便在手动退出的时候杀死
process_list.append(fetch_ip_process)
# 开启进程
fetch_ip_process.start()
if settings.EXPIRE_IP_CLEANER_OPENED:
clean_ip_process = Process(target=Scheduler.clean_ip)
process_list.append(clean_ip_process)
clean_ip_process.start()
if settings.SQUID_KEEPER_OPENED:
squid_keep_process = Process(target=Scheduler.squid_keep)
process_list.append(squid_keep_process)
squid_keep_process.start()
if settings.WEB_API_OPENED:
api_process = Process(target=Scheduler.api)
process_list.append(api_process)
api_process.start()
# 一直执行,直到收到终止信号
while True:
time.sleep(1)
except KeyboardInterrupt:
# 收到终止信号时,关闭所有进程后再退出
self.logger.info('收到终止信号,正在关闭所有进程...')
for process in process_list:
if process.is_alive():
process.terminate()
self.logger.info('关闭完成!结束程序!')
if __name__ == '__main__':
Scheduler().run()
公用方法和配置
将各组件公用的方法和配置抽取出来,做了集中。
# -*- coding: utf-8 -*-
# @File : utils.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:07
# @Desc :
from redis import StrictRedis, ConnectionPool
import settings
import logging
def get_redis_client():
""" 获取一个redis连接 :return: """
server_url = settings.REDIS_SERVER_URL
return StrictRedis(connection_pool=ConnectionPool.from_url(server_url))
def get_logger(name=__name__):
""" 获取一个logger,用以格式化输出信息 :param name: :return: """
logger = logging.getLogger(name)
logger.handlers.clear()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s: - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# 使用StreamHandler输出到屏幕
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
涉及到的所有配置,可以根据情况进行修改:
# -*- coding: utf-8 -*-
# @File : settings.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:13
# @Desc :
# 代理池redis键名
IP_POOL_KEY = 'open_proxy_pool'
# redis连接,根据实际情况进行配置
REDIS_SERVER_URL = 'redis://:your_password@your_host:port/db_name'
# api对外端口
API_WEB_PORT = 9102
# 代理是否需要通过密码访问,当此项为False时可无视USERNAME和PASSWORD的配置
USE_PASSWORD = True
# 用户名
# 注意:用户名密码是指代理服务方提供给你,用以验证访问授权的凭证。
# 无密码限制时可无视此项,并将USE_PASSWORD改为False
USERNAME = 'your_username'
# 密码
PASSWORD = 'your_password'
# ***********功能组件开关************
# 打开web api功能,不使用web api的话可以关闭
WEB_API_OPENED = True
# 打开squid代理转发服务的维持脚本,不使用squid的话可以关闭
SQUID_KEEPER_OPENED = True
# 打开清理过期ip的脚本,如果池内的代理ip永远不会失效的话可以关闭
EXPIRE_IP_CLEANER_OPENED = True
# 打开定时获取ip并检查的脚本,如果不需要获取新ip的话可以关闭
IP_GETTER_OPENED = True
# ***********************************
# 清理代理ip的频率,如下配置代表每两次之间间隔6秒
CLEAN_INTERVAL = 6
# 获取代理ip的频率,根据api的请求频率限制进行设置
# 比如`站大爷`的频率限制是10秒一次,我就设置成了12秒
FETCH_INTERVAL = 12
# squid从redis中加载新ip的频率
SQUID_KEEPER_INTERVAL = 12
# 代理ip的生命周期,即一个新ip在多久后将被删除,单位:秒
PROXY_IP_TTL = 60
运行
到这里,编码就完成了。打开终端,切换到项目根目录,输入python3 scheduler.py
运行即可。建议使用screen后台运行。
给出一个运行的截图(有机器在调用接口,我把ip隐藏了):
结尾
感谢支持,有问题欢迎拍砖~
今天的文章如何构建一个自己的代理ip池分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/32418.html