python爬取抖⾳号和评论_Python爬取抖⾳APP,竟然只需要
⼗⾏代码
环境说明
环境:
python 3.7.1
centos 7.4
pip 10.0.1
部署
[root@localhost ~]#python3.7--versionPython3.7.1[root@localhost ~]#
[root@localhost ~]# pip3 install douyin
有时候因为⽹络原因会安装失败,这时重新执⾏上⾯的命令即可,直到安装完成。
导⼊douyin模块
[root@localhost ~]# python3.7>>>import douyin>>>
导⼊如果报错的话,可能douyin模块没有安装成功。
下⾯我们开始爬…爬抖⾳⼩视频和⾳乐咯
[root@localhost douyin]#python3.7dou.py
⼏分钟后…我们来看看爬的成果
可以看到视频配的⾳乐被存储成了 mp3 格式的⽂件,抖⾳视频存储成了 mp4 ⽂件。
嗯…不错,哈哈。
py脚本
作者说,能爬抖⾳上所有热门话题和⾳乐下的相关视频都爬取到,并且将爬到的视频下载下来,同时还要把视频所配的⾳乐也单独下载下来,不仅如此,所有视频的相关信息如发布⼈、点赞数、评论数、发布时间、发布⼈、发布地点等等信息都需要爬取下来,并存储到MongoDB 数据库。
importdouyinfromdouyin.structuresimportTopic, Music# 定义视频下载、⾳频下载、MongoDB 存储的处理器video_file_handler = douyin.handlers.VideoFileHandler(folder='./videos')music_file_handler =
douyin.handlers.MusicFileHandler(folder='./musics')#mongo_handler = douyin.handlers.MongoHandler()# 定义下载器,并将三个处理器当做参数传递#downloader = douyin.downloaders.VideoDownloader([mongo_handler, video_file_handler,
music_file_handler])downloader = douyin.downloaders.VideoDownloader([video_file_handler, music_file_handler])# 循环爬取抖⾳热榜信息并下载存储d():foriteminresult.data:# 爬取热门话题和热门⾳乐下⾯的所有视频,每个话题或⾳乐最多爬取 10 个相关视频。downloader.download(item.videos(max=10))
由于我这⾥没有mongodb所以,把这mongodb相关的配置给注释掉了。
====以下摘⾃作者====
代码解读
本库依赖的其他库有:
aiohttp:利⽤它可以完成异步数据下载,加快下载速度
dateparser:利⽤它可以完成任意格式⽇期的转化
motor:利⽤它可以完成异步 MongoDB 存储,加快存储速度下载app里的视频
requests:利⽤它可以完成最基本的 HTTP 请求模拟
tqdm:利⽤它可以进⾏进度条的展⽰
数据结构定义
如果要做⼀个库的话,⼀个很重要的点就是对⼀些关键的信息进⾏结构化的定义,使⽤⾯向对象的思维对某些对象进⾏封装,抖⾳的爬取也不例外。
在抖⾳中,其实有很多种对象,⽐如视频、⾳乐、话题、⽤户、评论等等,它们之间通过某种关系联系在⼀起,例如视频中使⽤了某个配乐,那么视频和⾳乐就存在使⽤关系;⽐如⽤户发布了视频,那么⽤户和视频就存在发布关系,我们可以使⽤⾯向对象的思维对每个对象进⾏封装,⽐如视频的话,就可以定义成如下结构:
classVideo(Base):def__init__(self, **kwargs):"""
init video object
:param kwargs:
"""super().__init__()        self.id = ('id')        self.desc = ('desc')        self.author = ('author')      self.music = ('music')        self.like_count = ('like_count')        selfment_count =
<('comment_count')        self.share_count = ('share_count')        self.hot_count =
<('hot_count')        ...        self.address = ('address')def__repr__(self):"""
video to str
:return: str
"""return'>'% (self.id, self.desc[:10].strip()ifself.descelseNone)
这⾥将⼀些关键的属性定义成 Video 类的⼀部分,包括 id 索引、desc 描述、author 发布⼈、music 配乐等等,其中 author 和 music 并不是简单的字符串的形式,它也是单独定义的数据结构,⽐如 author 就是 User 类型的对象,⽽ User 的定义⼜是如下结构:
classUser(Base):def__init__(self, **kwargs):"""
init user object
:param kwargs:
"""super().__init__()        self.id = ('id')        der = ('gender')        self.name = ('name')      ate_time = ('create_time')        self.birthday = ('birthday')        ...def__repr__(self):"""
user to str
:return:
"""return'>'% (self.alias, self.name)
所以说,通过属性之间的关联,我们就可以将不同的对象关联起来,这样显得逻辑架构清晰,⽽且我们也不⽤⼀个个单独维护字典来存储了,其实这就和 Scrapy ⾥⾯的 Item 的定义是类似的。
请求和重试
实现爬取的过程就不必多说了,这⾥⾯其实⽤到的就是最简单的抓包技巧,使⽤ Charles 直接进⾏抓包即可。抓包之后便可以观察到对应的接⼝请求,然后进⾏模拟即可。
所以问题就来了,难道我要⼀个接⼝写⼀个请求⽅法吗?另外还要配置 Headers、超时时间等等的内容,那岂不是太费劲了,所以,我们可以将请求的⽅法进⾏单独的封装,这⾥我定义了⼀个 fetch ⽅法:
def_fetch(url, **kwargs):"""
fetch api response
:param url: fetch url
:param kwargs: other requests params
:return: json of response
"""response = (url, **kwargs)ifresponse.status_code !=200:raiserequests.ConnectionError('Expected status
code 200, but got {}'.format(response.status_code))returnresponse.json()
这个⽅法留了⼀个必要参数,即 url,另外其他的配置我留成了 kwargs,也就是可以任意传递,传递
之后,它会依次传递给 requests 的请求⽅法,然后这⾥还做了异常处理,如果成功请求,即可返回正常的请求结果。
定义了这个⽅法,在其他的调⽤⽅法⾥⾯我们只需要单独调⽤这个 fetch ⽅法即可,⽽不需要再去关⼼异常处理,返回类型了。
好,那么定义好了请求之后,如果出现了请求失败怎么办呢?按照常规的⽅法,我们可能就会在外⾯套⼀层⽅法,然后记录调⽤ fetch ⽅法请求失败的次数,然后重新调⽤ fetch ⽅法进⾏重试,但这⾥可以告诉⼤家⼀个更好⽤的库,叫做 retrying,使⽤它我们可以通过定义⼀个装饰器来完成重试的操作。
⽐如我可以使⽤ retry 装饰器这么装饰 fetch ⽅法:
fromretryingimportretry@retry(stop_max_attempt_number=retry_max_number,
wait_random_min=retry_min_random_wait,wait_random_max=retry_max_random_wait,
retry_on_exception=need_retry)def_fetch(url, **kwargs):pass
这⾥使⽤了装饰器的四个参数:
stop_max_attempt_number:最⼤重试次数,如果重试次数达到该次数则放弃重试
wait_random_min:下次重试之前随机等待时间的最⼩值
wait_random_max:下次重试之前随机等待时间的最⼤值
retry_on_exception:判断出现了怎样的异常才重试
这⾥ retry_on_exception 参数指定了⼀个⽅法,叫做 need_retry,⽅法定义如下:
defneed_retry(exception):"""
need to retry
:param exception:
:return:
"""result = isinstance(exception, (requests.ConnectionError, requests.ReadTimeout))ifresult:        print('Exception',
type(exception),'occurred, ')returnresult
这⾥判断了如果是 requests 的 ConnectionError 和 ReadTimeout 异常的话,就会抛出异常进⾏重试,否则不予重试。
所以,这样我们就实现了请求的封装和⾃动重试,是不是⾮常 Pythonic?
下载处理器的设计
为了下载视频,我们需要设计⼀个下载处理器来下载已经爬取到的视频链接,所以下载处理器的输⼊就是⼀批批的视频链接,下载器接收到这些链接,会将其进⾏下载处理,并将视频存储到对应的位置,另外也可以完成⼀些信息存储操作。
在设计时,下载处理器的要求有两个,⼀个是保证⾼速的下载,另⼀个就是可扩展性要强,下⾯我们分别来针对这两个特点进⾏设计:
⾼速下载,为了实现⾼速的下载,要么可以使⽤多线程或多进程,要么可以⽤异步下载,很明显,后者是更有优势的。
扩展性强,下载处理器要能下载⾳频、视频,另外还可以⽀持数据库等存储,所以为了解耦合,我们可以将视频下载、⾳频下载、数据库存储的功能独⽴出来,下载处理器只负责视频链接的主要逻辑处理和分配即可。
为了实现⾼速下载,这⾥我们可以使⽤ aiohttp 库来完成,另外异步下载我们也不能⼀下⼦下载太多,不然⽹络波动太⼤,所以我们可以设置 batch 式下载,可以避免同时⼤量的请求和⽹络拥塞,主要的下载函数如下:
defdownload(self, inputs):"""
download video or video lists
:param data:
:return:
"""ifisinstance(inputs, types.GeneratorType):        temps = []forresultininputs:            print('Processing', result,'...')
temps.append(result)iflen(temps) == self.batch:                self.process_items(temps)                temps = []else:        inputs = inputsifisinstance(inputs, list)else[inputs]        self.process_items(inputs)
这个 download ⽅法设计了多种数据接收类型,可以接收⼀个⽣成器,也可以接收单个或列表形式的视频对象数据,接着调⽤了
process_items ⽅法进⾏了异步下载,其⽅法实现如下:
defprocess_items(self, objs):"""
process items
:param objs: objs
:return:
"""# define progress barwithtqdm(total=len(objs))asself.bar:# init event looploop = _event_loop()# get num of batchestotal_step = il(len(objs) / self.batch))# for every batchforstepinrange(total_step):            start, end = step
* self.batch, (step +1) * self.batch            print('Processing %d-%d of files'% (start +1, end))# get batch of objsobjs_batch =
objs[start: end]# define tasks and run looptasks =
[sure_future(self.process_item(obj))forobjinobjs_batch]fortaskintasks:
task.add_done_callback(self.update_progress)            loop.run_until_complete(asyncio.wait(tasks))
这⾥使⽤了 asyncio 实现了异步处理,并通过对视频链接进⾏分批处理保证了流量的稳定性,另外还使⽤了 tqdm 实现了进度条的显⽰。
我们可以看到,真正的处理下载的⽅法是 process_item,这⾥⾯会调⽤视频下载、⾳频下载、数据库存储的⼀些组件来完成处理,由于我
们使⽤了 asyncio 进⾏了异步处理,所以 process_item 也需要是⼀个⽀持异步处理的⽅法,定义如下:
asyncdefprocess_item(self, obj):"""
process item
:param obj: single obj
:return:
"""ifisinstance(obj, Video):        print('Processing', obj,'...')forhandlerinself.handlers:ifisinstance(handler,
Handler):awaithandler.process(obj)
这⾥我们可以看到,真正的处理逻辑都在⼀个个 handler ⾥⾯,我们将每个单独的功能进⾏了抽离,定义成了⼀个个 Handler,这样可以
实现良好的解耦合,如果我们要增加和关闭某些功能,只需要配置不同的 Handler 即可,⽽不需要去改动代码,这也是设计模式的⼀个解
耦思想,类似⼯⼚模式。
Handler 的设计
刚才我们讲了,Handler 就负责⼀个个具体功能的实现,⽐如视频下载、⾳频下载、数据存储等等,所以我们可以将它们定义成不同的
Handler,⽽视频下载、⾳频下载⼜都是⽂件下载,所以⼜可以利⽤继承的思想设计⼀个⽂件下载的 Handler,定义如下:
fromos.pathimportjoin,
existsfromosimportmakedirsfromdouyin.peimportmime_to_extimportaiohttpclassFileHand folder):"""
init save folder
:param folder:
"""super().__init__()        self.folder = folderifnotexists(self.folder):            makedirs(self.folder)asyncdef_process(self, obj,
**kwargs):"""
download to file
:param url: resource url
:param name: save name
:param kwargs:
:return:
"""print('Downloading', obj,'...')        kwargs.update({'ssl':False})
kwargs.update({'timeout':10})asyncwithaiohttp.ClientSession()(obj.play_url,
**kwargs)asresponse:ifresponse.status ==200:                    extension = mime_to_ext(('Content-Type'))
full_path = join(self.folder,'%s.%s'% (obj.id, extension))withopen(full_path,'wb')asf:
f.t.read())                    print('Downloaded file to', full_path)else:                    print('Cannot
download %s, response status %s'% (obj.id, response.status))asyncdefprocess(self, obj, **kwargs):"""
process obj
:param obj:
:
param kwargs:
:return:
"""returnawaitself._process(obj, **kwargs)
这⾥我们还是使⽤了 aiohttp,因为在下载处理器中需要 Handler ⽀持异步操作,这⾥下载的时候就是直接请求了⽂件链接,然后判断了⽂
件的类型,并完成了⽂件保存。
视频下载的 Handler 只需要继承当前的 FileHandler 即可:
fromdouyin.handlersimportFileHandlerfromdouyin.structuresimportVideoclassVideoFileHandler(FileHandler):asyncdefprocess( obj, **kwargs):"""
process video obj
:param obj:
:param kwargs:
:return:
"""ifisinstance(obj, Video):returnawaitself._process(obj, **kwargs)
这⾥其实就是加了类别判断,确保数据类型的⼀致性,当然⾳频下载也是⼀样的。
异步 MongoDB 存储
上⾯介绍了视频和⾳频处理的 Handler,另外还有⼀个存储的 Handler 没有介绍,那就是 MongoDB 存储,平常我们可能习惯使⽤PyMongo 来完成存储,但这⾥我们为了加速,需要⽀持异步操作,所以这⾥有⼀个可以实现异步 MongoDB 存储的库,叫做 Motor,其
实使⽤的⽅法差不太多,MongoDB 的连接对象不再是 PyMongo 的 MongoClient 了,⽽是 Motor 的 AsyncIOMotorClient,其他的配
置基本类似。
在存储时使⽤的是 update_one ⽅法并开启了 upsert 参数,这样可以做到存在即更新,不存在即插⼊的功能,保证数据的不重复性。
整个 MongoDB 存储的 Handler 定义如下:
_asyncioimportAsyncIOMotorClientfromdouyin.structuresimport*classMo conn_uri=None, db='douyin'):"""

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。