Swift对象创建(PUTObject)过程详解——基于副本策略(⼀)swift中创建对象,即PUT object,根据选定的存储策略将对象内容写⼊⾄相应的服务器(object server)。我们重点关注object controller和object servers之间的通信过程,其它从略。
在proxy-server上对client发送来的HTTP请求进⾏解析、wsgi环境变量进⾏设置、认证及相应错误处理过程从略。唯⼀需要说明的是,对外部client 通过HTTP请求发送来的对象(object),swift会将其具体内容存放于环境变量env中,即:
对象创建最终会定位到BaseObjectController类中的PUT⽅法:
@public
@cors_validation
@delay_denial
def PUT(self, req):
"""HTTP PUT request handler."""
if req.if_none_match is not None and '*' not in req.if_none_match:
# Sending an etag with if-none-match isn't currently supported
return HTTPBadRequest(request=req, content_type='text/plain',
body='If-None-Match only supports *')
container_info = ainer_info(
self.account_name, ainer_name, req)
policy_index = ('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = _object_ring(policy_index)
container_nodes = container_info['nodes']
container_partition = container_info['partition']
partition, nodes = _nodes(
self.account_name, ainer_name, self.object_name)
# pass the policy index to storage nodes via req header
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
req.acl = container_info['write_acl']
# is request authorized
if 'swift.authorize' viron:
aresp = viron['swift.authorize'](req)
if aresp:
return aresp
if not container_info['nodes']:
return HTTPNotFound(request=req)
# update content type in case it is missing
self._update_content_type(req)
# check constraints on object name and request headers
error_response = check_object_creation(req, self.object_name) or \
check_content_type(req)
if error_response:
return error_response
self._update_x_timestamp(req)
# check if versioning is enabled and handle copying previous version
self._handle_object_versions(req)
# check if request is a COPY of an existing object
source_header = ('X-Copy-From')
if source_header:
error_response, req, data_source, update_response = \
self._handle_copy_request(req)
if error_response:
return error_response
else:
reader = viron['wsgi.input'].read
data_source = iter(lambda: reader(self.app.client_chunk_size), '')
update_response = lambda req, resp: resp
# check if object is set to be automaticaly deleted (i.e. expired)
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
# add special headers to be handled by storage nodes
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,
delete_at_container, delete_at_part, delete_at_nodes)
# send object to storage nodes
resp = self._store_object(
req, data_source, nodes, partition, outgoing_headers)
return update_response(req, resp)
下⾯我们就从这七⼗来⾏代码来详细探讨此PUT操作究竟做了什么。其中错误处理和有些有明显注释的代码不深究,专注于主要流程。
1.获取node及partition信息
要执⾏PUT object,⾸先便要获取关于此object的必要信息,它从哪⾥来(前⽂提到过,env中直接暂存对象内容)、它要放到哪⾥去,同时也要知道它所在container的相应信息,因为PUT object会增加container内容,影响其元数据信息,故需要对相应存放container信息的数据表进⾏更新,这必然要涉及到存放container信息数据表partition及具体存放结点信息。
10-18⾏,是这⼀过程的具体代码。我们可以看到,继承⾃Controller类的container_info⽅法被⾸先调⽤,此函数相当于读取container元数据信息缓存,返回⼀个字典,包含相应container的partition、nodes及policy_index信息。即某⼀个具体container(容器)的所在区、存放结点列表、所使⽤的存储策略(副本or纠删码)。
还会根据object所在container具体存储策略,获得其object ring,然后根据ring中定义的映射规则,取得该object所在区及存放结点列表(line. 17-18)。
2. 认证、更新、检查
认证(line. 26-29),根据env中'swift.authorize'存放的认证函数,校验token,确认请求的合法性。
更新请求信息(line. 21-23,35,43),更新HTTP请求中相应信息,包括content_type(内容类型)、timestamp(时间戳)等等。
检查,包括检查object命名与请求是否合法(line. 38-41),检查object版本信息(line. 46),检查是否从已有对象拷贝(line. 49-58),检查是否设定⾃动删除时间(line. 61-62)。
虽然没有研究过关于对象⾃动删除的代码,但我猜想对象的⾃动删除要额外启动服务进程(类似于swift-object-server之类),定期对所有servers上object delete_time进⾏轮询,到达相应截⽌时间后删除相应对象。
还有⼀点需要注意的便是,若要创建的object不是从已有对象拷贝,将定义⼀个对object内容进⾏读取的迭代器data_source(line. 56-57)。
3.⽣成请求头
虽然⽣成请求头和接下来的上传数据在上⾯的PUT代码中,均只封装成⼀个函数,但它们会调⽤其它许多函数。这两个流程也是PUT object 操作的核⼼。
先附上65⾏_backend_request()函数的源码:
def _backend_requests(self, req, n_outgoing,
container_partition, containers,
delete_at_container=None, delete_at_partition=None,
delete_at_nodes=None):
headers = [ate_request_headers(req,additional=req.headers)
for _junk in range(n_outgoing)]
for i, container in enumerate(containers):
i = i % len(headers)
headers[i]['X-Container-Partition'] = container_partition
headers[i]['X-Container-Host'] = csv_append(
headers[i].get('X-Container-Host'),
'%(ip)s:%(port)s' % container)
headers[i]['X-Container-Device'] = csv_append(
headers[i].get('X-Container-Device'),
container['device'])
for i, node in enumerate(delete_at_nodes or []):
i = i % len(headers)
headers[i]['X-Delete-At-Container'] = delete_at_container
headers[i]['X-Delete-At-Partition'] = delete_at_partition
headers[i]['X-Delete-At-Host'] = csv_append(
headers[i].get('X-Delete-At-Host'),
'%(ip)s:%(port)s' % node)
headers[i]['X-Delete-At-Device'] = csv_append(
headers[i].get('X-Delete-At-Device'),
node['device'])
return headers
这个函数的⽬的在于,针对要上传的object所在container的存放信息,⽣成若⼲个HTTP请求头,⽤于object上传成功后container信息的更新。object存放在多少个node上,便⽣成多少相应的HTTP请求头,即len(nodes)。可以预料,对container信息的更新,将会在接收PUT object的若⼲个object-server上并发。
在_backend_request()函数中,⾸先⽣成最基本的HTTP请求头列表headers,headers = [header_for_node1, header_for_node2,......],其中的每⼀个元素都是⼀个完整的HTTP请求头字典,最初的请求头仅包括时间戳、x-trans-id、user-agent等信息。
接下来的两个for循环,完成的⼯作类似,第⼀个for循环将container更新所需要的信息平均分配给head
ers列表中的header;第⼆个for循环将定时删除所需要的信息平均分配给headers中的header。我们重点讲讲第⼀个for循环,第⼆个,类⽐即可。
line. 9的取余操作,保证了将container信息更新平均分配⾄存放该object的结点进⾏并发,若container信息存放的结点数⼩于存放object的结点,那么headers列表中后⾯的⼀些header将只有基本初始化信息,接收这些header的node不⽤负责container信息的更新。
line. 11进⾏container partition信息的设定,每个container的partition信息均相同,所以不⽤担⼼它们会打架(被覆盖)。
line. 12-14和line. 15-17分别设定header中需要更新的container信息所在的主机地址(X-Container-Host)和设备名(X-Container-Device)。container信息是通过数据表存放的,所以container信息的更新最终会化为对相应服务器相应设备上数据表的update操作。csv_append函数会将不同Host和Device⽤逗号隔开,相应node接收到这样的header后,将会利⽤两者信息定位到所有相应数据表,完成update。
通过这整个for循环,将每个存放object的node更新container信息的进⾏了分配和设定,简单明了。
4.上传数据
_store_object()是PUT流程中最复杂也最重要的函数,直接负责向相应的存储结点上传数据。值得⼀提的是,使⽤纠删码作为存储策略的话,将会重定义_store_object()函数。这⾥仅就原有_store_object()进⾏探讨,重定义的_store_object()留待以后讨论。_store_object()执⾏具体流程如下:    def _store_object(self, req, data_source, nodes, partition,
outgoing_headers):
policy_index = ('X-Backend-Storage-Policy-Index')
policy = _by_index(policy_index)
if not nodes:
return HTTPNotFound()
# RFC2616:8.2.3 disallows 100-continue without a body
if (t_length > 0) or req.is_chunked:
expect = True
else:
expect = False
conns = self._get_put_connections(req, nodes, partition,
outgoing_headers, policy, expect)
min_conns = quorum_size(len(nodes))
try:
# check that a minimum number of connections were established and
# meet all the correct conditions set in the request
self._check_failure_put_connections(conns, req, nodes, min_conns)
# transfer data
self._transfer_data(req, data_source, conns, nodes)
# get responses
statuses, reasons, bodies, etags = self._get_put_responses(
req, conns, nodes)
except HTTPException as resp:
return resp
finally:
for conn in conns:
conn.close()
if len(etags) > 1:
self.(
_('Object servers returned %s mismatched etags'), len(etags))
object to
return HTTPServerError(request=req)
etag = etags.pop() if len(etags) else None
resp = self.best_response(req, statuses, reasons, bodies,
_('Object PUT'), etag=etag)
resp.last_modified = il(
float(Timestamp(req.headers['X-Timestamp'])))
return resp
其中除第⼀个操作获取存储策略之外,其它每个流程都对应⼀个具体的函数。_get_put_connections建⽴从proxy-server到存放object的node之间的http连接,其中利⽤eventlet中的GreenPile实现并发:
pile = GreenPile(len(nodes))
for nheaders in outgoing_headers:
if expect:
nheaders['Expect'] = '100-continue'
pile.spawn(self._connect_put_node, node_iter, partition,
req.swift_entity_path, nheaders,
self.app.logger.thread_locals)
具体connection的建⽴,在_connect_put_node()中:
with ConnectionTimeout(_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'PUT', path, headers)
其中,node_iter迭代器在不同的nodes之间进⾏遍历,创建链接失败即跳过,保证min_conns个链接建⽴成功即可PUT数据。但这⾥有两个问题:(1)node_iter如何在多个_get_put_node间起作⽤,难道是和协程的实际顺序执⾏有关?(2)创建链接数满⾜min_conns可传输数据,但要保证数据指定冗余,数据必须存放在所链接所有结点上,那么链接不成功那些结点上数据什么时候以何种⽅式上传?
(关于这个问题,研究了半天之后,发现它和swift中采⽤的NWR策略相关,⾄于剩余冗余的上传初步估计是额外的数据同步服务直接执⾏对象的拷贝)接下来对建⽴的connections进⾏检查,查看其返回状态,及_check_min_conn()验证成功创建的连接是否满⾜最低需求,错误则直接返回异常状态码。
_transfer_data()在上传数据的过程中直接使⽤了继承⾃GreenPool类的ContextPool类,ContextPool类增添了退出时杀死正在运⾏的协程的⽅
法。_transfer_data()中使⽤队列(Queue)定义缓冲区暂存上传的数据,其中使⽤绿⾊线程池(ContextPool)定义多个协程执⾏_send_file()将Queue中chunk传输⾄建⽴的HTTP链接,每个storage _transfer_data()后⾯的过程便是将上传的object数据写⼊conns队列中。node和proxy之间建⽴的HTTP链接conn都拥有独⽴的队列,副本模式下数据同时写⼊活动的诸多conns中队列。
for conn in list(conns):
if not conn.failed:
conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk)
if req.is_chunked else chunk)
最后获得响应,从获得的诸多response中选取⼀定数量的(quorum_size)最优响应,⽣成对⽤户的响应返回,PUT object⾄此终结。
通过对PUT Object这⼀过程的学习,发现⾃⼰对ring部分代码⾮常陌⽣,有些地⽅完全弄不明⽩,映射关系和node管理完全靠ring的原理猜测。⽐如三副本存取且整个系统仅有⼀个storage node,那么系统会报错吗?如果正常运⾏node及node_iter⼜是何种状态?
特别是后期,我还打算对ring进⾏功能扩展,代码都没弄懂就谈扩展,⽆异痴⼈说梦。
还有,NWR策略相关内容还要进⼀步研究,整理出博⽂。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。