PythonNumpy⾼效数据处理(⼀)——获取数组B中每个元素在数组A中的索引
Python Numpy⾼效数据处理(⼀)——获取数组B中每个元素在数组A中的索引值
Python Numpy⾼效数据处理(⼀)
做算法的,在⼯作中经常需要处理⼤量的数据,python中的for循环实在是太蠢了,写这篇博客主要是记录⼀下平时在处理⼤批量数据时,抛弃了for循环的新⽅法。本⽂基于python3.7
获取数组B中每个元素在数组A中的索引值
引⽤头
import numpy as np
import time
传统⽅法:for循环⾥使⽤list的index功能:
如下:
def get_index_from_array2(from_array, purpose_array):
from_list =list(from_array)
p_idx = np.in1d(purpose_array, from_array)#筛选出 purpose_array 存在于 from_array 中存在的项,防⽌不到索引⽽报错
purpose_idx_in_from =-np.ones(purpose_array.shape).astype(int)#初始化待返回的索引数组为-1,长度对应purpose_array
in_len = p_idx.sum()#得到筛选之后的总数量
tmp_p_array = purpose_array[p_idx]#筛选之后的数组
tmp_idx = np.zeros(in_len).astype(int)#临时索引,长度对应筛选之后的数组数组全部赋值为1
for i in range(in_len):#循环得到所有可获取的索引值
tmp_idx[i]= from_list.index(tmp_p_array[i])
purpose_idx_in_from[p_idx]= tmp_idx            #待返回的索引数组赋值
return purpose_idx_in_from
针对所有可能的情况,A可能包含B,也可能不完全包含B,所以先做了筛选,只针对筛选出来的部分查询索引值
调⽤测试:
purpose_array = np.array(['b','t','d','g','f','f','g','b','g','f','p','d','f','r','g','w','t','d','e','b','e'])
from_array = np.array(['e','c','f','a','e','g','f','a','b','d','d','e'])
idx = get_index_from_array2(from_array, purpose_array)
print(idx)
pos_idx =(idx !=-1)
test_a = purpose_array[pos_idx]
test_b = from_array[idx[pos_idx]]
print((test_a == test_b).all())
测试中,去掉了索引是-1的部分,剩余部分进⾏⽐对,运⾏结果如下:
[ 8 -1  9  5  2  2  5  8  5  2 -1  9  2 -1  5 -1 -1  9  0  8  0]
True
结果完全⼀致!
改进1:采⽤map函数
直接上代码:
def get_index_from_array3(from_array, purpose_array):
purpose_array = np.array(purpose_array)
from_array = np.array(from_array)
from_list =list(from_array)
p_idx = np.in1d(purpose_array, from_array)
purpose_idx_in_from =-np.ones(purpose_array.shape).astype(int)
tmp_p_array = purpose_array[p_idx]
tmp_idx =map(from_list.index, tmp_p_array)
purpose_idx_in_from[p_idx]=list(tmp_idx)
return purpose_idx_in_from
和上⾯没有太多区别,只是for循环⽤map代替,测试结果⼀致,不过多赘述
改进2:采⽤numpy数组操作,避开循环
还是先上代码:
def get_index_from_array(from_array, purpose_array):
purpose_array = np.array(purpose_array)
from_array = np.array(from_array)
purpose_idx_in_from =-np.ones(purpose_array.shape).astype(int)#初始化待返回的索引数组为-1,长度对应purpose_array
p_idx = np.in1d(purpose_array, from_array)#筛选出 purpose_array 存在于 from_array 中存在的项
union_array = np.hstack((from_array, purpose_array[p_idx]))#合并from_array 和从 purpose_array 中筛选出来的数组
_, union_idx, union_inv = np.unique(union_array, return_index=True, return_inverse=True)#unique函数得到索引值
purpose_idx_in_from[p_idx]= union_idx[union_inv[len(from_array):]]#待返回的索引数组赋值
return purpose_idx_in_from
这⾥介绍⼀下numpy中的unique函数,⾮常好⽤:
unique函数默认返回⼀个包含不重复的数组,并且包含原数组全部元素。该函数包含三个可选参
数,return_index,return_inverse,return_count。看个例⼦:
from_array = np.array(['e','c','f','a','e','g','f','a','b','d','d','e'])
test = np.unique(from_array,True,True,True)
print(test)
返回结果:
(array(['a', 'b', 'c', 'd', 'e', 'f', 'g'], dtype='<U1'),
array([3, 8, 1, 9, 0, 2, 5], dtype=int64),
array([4, 2, 5, 0, 4, 6, 5, 0, 1, 3, 3, 4], dtype=int64),
array([2, 1, 1, 2, 3, 2, 1], dtype=int64))
返回了四个数组:
第⼀个数组a,包含原数组所有元素的不重复的数组;
第⼆个数组b,每个元素在原数组中第⼀次出现的索引,和a中的元素⼀⼀对应,即from_array[b]等于 a,
例如‘d’在原数组中在索引9的位置第⼀次出现,‘f’则是在索引2的位置第⼀次出现;
第三个数组c,原数组映射到a中的索引值。即a[c]等于from_array;
第四个数组d,统计a中元素在原数组中出现的次数。
这⾥⾯b和c很有意思,能玩出很多花样来,有兴趣可以⾃⼰研究⼀下
回到原来的代码,这⾥主要的思想就是,把purpose_array中的元素合并到from_array的后⾯,合并之前需要清洗掉from_array中不存在的数据。这样对联合后的数组进⾏unique操作,不会影响到from_array部分的结果。我们可以对⽐看⼀下:
purpose_array = np.array(['b','t','d','g','f','f','g','b','g','f','p','d','f','r','g','w','t','d','e','b','e'])
from_array = np.array(['e','c','f','a','e','g','f','a','b','d','d','e'])
test = np.unique(from_array, return_index=True, return_inverse=True)
print(test)
p_idx = np.in1d(purpose_array, from_array)
union_array = np.hstack((from_array, purpose_array[p_idx]))
test2 = np.unique(union_array, return_index=True, return_inverse=True)
print(test2)
结果如下:
(array(['a', 'b', 'c', 'd', 'e', 'f', 'g'], dtype='<U1'), array([3, 8, 1, 9, 0, 2, 5], dtype=int64), array([4, 2, 5, 0, 4, 6, 5, 0, 1, 3, 3, 4], dtype=int64), array([2, 1, 1, 2, 3, 2, 1], dtype=int64))
(array(['a', 'b', 'c', 'd', 'e', 'f', 'g'], dtype='<U1'), array([3, 8, 1, 9, 0, 2, 5], dtype=int64), array([4, 2, 5, 0, 4, 6, 5, 0, 1, 3, 3, 4, 1, 3, 6, 5, 5, 6, 1, 6, 5, 3,
5, 6, 3, 4, 1, 4], dtype=int64))
因为新的联合数组没有增加新的元素,也没有打乱from_array的顺序,所以得到的a、b数组还是不变。from_array[b]等于 a依然成⽴。⽽根据第三个数组的特性,a[c]等于原数组union_array,该数组包含from_array和purpose_array[p_idx]两部分,取后半部分就是purpose_array了。即c2 = c[len(from_array) :],a[c2]等于purpose_array[p_idx]。
即from_array[b][c2]等于筛选过的purpose_array!⽬标达成,我们就是要from_array到purpose_array的索引。
该索引值即b[c2]
测试⼀下:
a, b, c = test2
c2 = c[len(from_array):]
index = b[c2]
print(from_array[index])
print(purpose_array[p_idx])
结果:
['b' 'd' 'g' 'f' 'f' 'g' 'b' 'g' 'f' 'd' 'f' 'g' 'd' 'e' 'b' 'e']
['b' 'd' 'g' 'f' 'f' 'g' 'b' 'g' 'f' 'd' 'f' 'g' 'd' 'e' 'b' 'e']
结果⼀致!
效率对⽐
贴上完整测试代码,⽬标数组长度为100000,数字范围0~9999
索引数组长度为20000,数字范围相同
以此来制造尽量复杂的情况,测试效率和结果是否正确
purpose_array = np.random.randint(0,10000,100000)
from_array = np.random.randint(0,10000,20000)
t1 = time.time()
purpose_idx_in_from1 = get_index_from_array(from_array, purpose_array)
t2 = time.time()
print(t2 - t1)
print(purpose_idx_in_from1)
idx = purpose_idx_in_from1[purpose_idx_in_from1 !=-1]
a = from_array[idx]
b = purpose_array[purpose_idx_in_from1 !=-1]
print((a == b).all(),'\n')
t1 = time.time()
purpose_idx_in_from2 = get_index_from_array2(from_array, purpose_array)
t2 = time.time()
print(t2 - t1)
print(purpose_idx_in_from2)
idx = purpose_idx_in_from2[purpose_idx_in_from2 !=-1]
a = from_array[idx]
b = purpose_array[purpose_idx_in_from2 !=-1]
print((a == b).all(),'\n')
t1 = time.time()
purpose_idx_in_from3 = get_index_from_array3(from_array, purpose_array)
t2 = time.time()
print(t2 - t1)
print(purpose_idx_in_from3)
idx = purpose_idx_in_from3[purpose_idx_in_from3 !=-1]
a = from_array[idx]
b = purpose_array[purpose_idx_in_from3 !=-1]
print((a == b).all(),'\n')
print((purpose_idx_in_from1 == purpose_idx_in_from2).all())
print((purpose_idx_in_from1 == purpose_idx_in_from3).all())
运⾏结果如下:
0.032914161682128906
[ 1820    -1 14529 ... 12893    -1 11580]
True
12.497856140136719
[ 1820    -1 14529 ... 12893    -1 11580]
True
12.462677478790283
[ 1820    -1 14529 ... 12893    -1 11580]
True
True
True
⾸先,三者结果测试正确,且三者完全⼀致
效率⽅⾯,传统⽅法则⽤了接近12.5秒(未使⽤多线程并⾏),map函数的⽅法让我感到意外,居然没有提⾼。⽽⽤我⾃⼰的⽅法,只花了0.033秒,提升了三四百倍的时间,果然是能不⽤for循环,千⽅百计也不要⽤。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。