抖⾳python上的代码视频_资深程序员:⼗⾏Python代码教你抖音python入门教程
爬取抖⾳视频!程序员接私活的网站
环境说明
环境:
python 3.7.1
centos 7.4
pip 10.0.1
部署
[root@localhost ~]# python3.7 --version
Python 3.7.1
[root@localhost ~]#
[root@localhost ~]# pip3 install douyin
有时候因为⽹络原因会安装失败,这时重新执⾏上⾯的命令即可,直到安装完成。
导⼊douyin模块
[root@localhost ~]# python3.7
>>>import douyin
>>>
导⼊如果报错的话,可能douyin模块没有安装成功。
下⾯我们开始爬…爬抖⾳⼩视频和⾳乐咯
[root@localhost douyin]# python3.7 dou.py
⼏分钟后…我们来看看爬的成果
可以看到视频配的⾳乐被存储成了 mp3 格式的⽂件,抖⾳视频存储成了 mp4 ⽂件。
嗯…不错,哈哈。
py脚本
作者说,能爬抖⾳上所有热门话题和⾳乐下的相关视频都爬取到,并且将爬到的视频下载下来,同时还要把视频所配的⾳乐也单独下载下来,不仅如此,所有视频的相关信息如发布⼈、点赞数、评论数、发布时间、发布⼈、发布地点等等信息都需要爬取下来,并存储到MongoDB 数据库。
import douyin
from douyin.structures import Topic, Music
# 定义视频下载、⾳频下载、MongoDB 存储的处理器
video_file_handler = douyin.handlers.VideoFileHandler(folder='./videos')
music_file_handler = douyin.handlers.MusicFileHandler(folder='./musics')
#mongo_handler = douyin.handlers.MongoHandler()
# 定义下载器,并将三个处理器当做参数传递
#downloader = douyin.downloaders.VideoDownloader([mongo_handler, video_file_handler, music_
file_handler])
downloader = douyin.downloaders.VideoDownloader([video_file_handler, music_file_handler])
# 循环爬取抖⾳热榜信息并下载存储
for result in d():
for item in result.data:
# 爬取热门话题和热门⾳乐下⾯的所有视频,每个话题或⾳乐最多爬取 10 个相关视频。
downloader.download(item.videos(max=10))
由于我这⾥没有mongodb所以,把这mongodb相关的配置给注释掉了。
代码解读
本库依赖的其他库有:
aiohttp:利⽤它可以完成异步数据下载,加快下载速度
dateparser:利⽤它可以完成任意格式⽇期的转化
motor:利⽤它可以完成异步 MongoDB 存储,加快存储速度
requests:利⽤它可以完成最基本的 HTTP 请求模拟
tqdm:利⽤它可以进⾏进度条的展⽰
数据结构定义
如果要做⼀个库的话,⼀个很重要的点就是对⼀些关键的信息进⾏结构化的定义,使⽤⾯向对象的思维对某些对象进⾏封装,抖⾳的爬取也不例外。
在抖⾳中,其实有很多种对象,⽐如视频、⾳乐、话题、⽤户、评论等等,它们之间通过某种关系联系在⼀起,例如视频中使⽤了某个配乐,那么视频和⾳乐就存在使⽤关系;⽐如⽤户发布了视频,那么⽤户和视频就存在发布关系,我们可以使⽤⾯向对象的思维对每个对象进⾏封装,⽐如视频的话,就可以定义成如下结构:
class Video(Base):
def __init__(self, **kwargs):
"""
init video object
:param kwargs:
"""
super().__init__()
self.id = ('id')
self.desc = ('desc')
self.author = ('author')
self.music = ('music')
self.like_count = ('like_count')
selfment_count = ('comment_count')
self.share_count = ('share_count')
self.hot_count = ('hot_count')
...
self.address = ('address')
def __repr__(self):
"""
video to str
:return: str
"""
return '>' % (self.id, self.desc[:10].strip() if self.desc else None)
这⾥将⼀些关键的属性定义成 Video 类的⼀部分,包括 id 索引、desc 描述、author 发布⼈、music 配乐等等,其中 author 和 music 并不是简单的字符串的形式,它也是单独定义的数据结构,⽐如 author 就是 User 类型的对象,⽽ User 的定义⼜是如下结构:
class User(Base):
def __init__(self, **kwargs):
"""
init user object
:param kwargs:
"""
super().__init__()
self.id = ('id')
self.name = ('name')
分页查询实现方法>算24点的诀窍ate_time = ('create_time')
self.birthday = ('birthday')
...
def __repr__(self):
"""
user to str
:return:
"""
return '>' % (self.alias, self.name)
所以说,通过属性之间的关联,我们就可以将不同的对象关联起来,这样显得逻辑架构清晰,⽽且我们也不⽤⼀个个单独维护字典来存储了,其实这就和 Scrapy ⾥⾯的 Item 的定义是类似的。
请求和重试
实现爬取的过程就不必多说了,这⾥⾯其实⽤到的就是最简单的抓包技巧,使⽤ Charles 直接进⾏抓包即可。抓包之后便可以观察到对应的接⼝请求,然后进⾏模拟即可。
所以问题就来了,难道我要⼀个接⼝写⼀个请求⽅法吗?另外还要配置 Headers、超时时间等等的内容,那岂不是太费劲了,所以,我们可以将请求的⽅法进⾏单独的封装,这⾥我定义了⼀个 fetch ⽅法:
def _fetch(url, **kwargs):
"""
fetch api response
:param url: fetch url
:param kwargs: other requests params
:return: json of response
"""
response = (url, **kwargs)
if response.status_code != 200:
raise requests.ConnectionError('Expected status code 200, but got {}'.format(response.status_code))
return response.json()
这个⽅法留了⼀个必要参数,即 url,另外其他的配置我留成了 kwargs,也就是可以任意传递,传递之后,它会依次传递给 requests 的请求⽅法,然后这⾥还做了异常处理,如果成功请求,即可返回正常的请求结果。
定义了这个⽅法,在其他的调⽤⽅法⾥⾯我们只需要单独调⽤这个 fetch ⽅法即可,⽽不需要再去关⼼异常处理,返回类型了。
好,那么定义好了请求之后,如果出现了请求失败怎么办呢?按照常规的⽅法,我们可能就会在外⾯套⼀层⽅法,然后记录调⽤ fetch ⽅法请求失败的次数,然后重新调⽤ fetch ⽅法进⾏重试,但这⾥可以告诉⼤家⼀个更好⽤的库,叫做 retrying,使⽤它我们可以通过定义⼀个装饰器来完成重试的操作。
java面试⽐如我可以使⽤ retry 装饰器这么装饰 fetch ⽅法:
from retrying import retry
@retry(stop_max_attempt_number=retry_max_number, wait_random_min=retry_min_random_wait,
wait_random_max=retry_max_random_wait, retry_on_exception=need_retry)
def _fetch(url, **kwargs):
pass
这⾥使⽤了装饰器的四个参数:
stop_max_attempt_number:最⼤重试次数,如果重试次数达到该次数则放弃重试
wait_random_min:下次重试之前随机等待时间的最⼩值
wait_random_max:下次重试之前随机等待时间的最⼤值
retry_on_exception:判断出现了怎样的异常才重试
这⾥ retry_on_exception 参数指定了⼀个⽅法,叫做 need_retry,⽅法定义如下:
def need_retry(exception):
"""
need to retry
:param exception:
:return:
"""
result = isinstance(exception, (requests.ConnectionError, requests.ReadTimeout))
if result:
print('Exception', type(exception), 'occurred, ')
return result
这⾥判断了如果是 requests 的 ConnectionError 和 ReadTimeout 异常的话,就会抛出异常进⾏重试,否则不予重试。
所以,这样我们就实现了请求的封装和⾃动重试,是不是⾮常 Pythonic?
下载处理器的设计
为了下载视频,我们需要设计⼀个下载处理器来下载已经爬取到的视频链接,所以下载处理器的输⼊就是⼀批批的视频链接,下载器接收到这些链接,会将其进⾏下载处理,并将视频存储到对应的位置,另外也可以完成⼀些信息存储操作。
在设计时,下载处理器的要求有两个,⼀个是保证⾼速的下载,另⼀个就是可扩展性要强,下⾯我们分别来针对这两个特点进⾏设计:
⾼速下载,为了实现⾼速的下载,要么可以使⽤多线程或多进程,要么可以⽤异步下载,很明显,后者是更有优势的。
扩展性强,下载处理器要能下载⾳频、视频,另外还可以⽀持数据库等存储,所以为了解耦合,我们可以将视频下载、⾳频下载、数据库存储的功能独⽴出来,下载处理器只负责视频链接的主要逻辑处理和分配即可。
为了实现⾼速下载,这⾥我们可以使⽤ aiohttp 库来完成,另外异步下载我们也不能⼀下⼦下载太多,不然⽹络波动太⼤,所以我们可以设置 batch 式下载,可以避免同时⼤量的请求和⽹络拥塞,主要的下载函数如下:
def download(self, inputs):
"""
download video or video lists
:param data:
:return:
"""
if isinstance(inputs, types.GeneratorType):
temps = []
for result in inputs:
print('Processing', result, '...')
temps.append(result)
if len(temps) == self.batch:
self.process_items(temps)
temps = []
obtrusiveelse:
inputs = inputs if isinstance(inputs, list) else [inputs]
self.process_items(inputs)
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论