java8+redis+springboot分布式布隆过滤器实现
分布式BloomFilter实现
BloomFilter
请求到达正式业务之前, 判断该请求是否有效
维护⼀个⼤的bit数组, 把有效key的⼀次或多次的hash索引位置标志已存在. 当有请求进来时, 计算进来的key的hash索引, 判断每⼀个索引的值是否为true.
常⽤于处理缓存穿透问题
选⽤redis实现的好处
部署⾼可⽤节点时, 减少每⼀个节点的开销
redis可以持久化, 避免因服务器宕机导致需要重新灌数据的开销
redis中可实现的数组长度更长,⼤数据量下,hash索引可散列的范围更⼤
实现
索引个数,bit数组长度
1. 使⽤google中根据原有数据长度计算索引个数, bit数组长度的⽅法
hash算法
1. 使⽤⼀个key衍⽣出多个key, 根据对bit长度求余的hash算法
initBloomFilter⽅法
1. 根据数据的长度, 计算hash个数,和bit数组长度, 将以上信息和容错率信息保存到redis中(这样可以实现多个过滤器)
2. 使⽤通道将信息,所有数据压到redis中,使⽤通道可以提⾼效率,减少⽹络开销(测试时初始数据,可以减少75%时间)
代码
import org.springframework.beans.factory.annotation.Autowired;
import org.tion.RedisConnection;
import org.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.List;
@Component
public class BloomFilterUtil {
/**
* 布隆过滤器信息key的后缀
*/
private final static String BLOOM_FILTER_INFO_SUFFIX =":info";
/**
* 布隆过滤器信息容错率的key
*/
private final static String FPP ="fpp";
/**
/**
* 布隆过滤器信息bit数组长度的key
*/
private final static String NUM_BITS ="numBits";
/**
* 布隆过滤器信息hash次数的key
*/
private final static String NUM_HASH_FUNCTIONS ="numHashFunctions";
/**
* 布隆过滤器数组key的后缀
*/
private final static String BLOOM_FILTER_BIT_ARRAY_SUFFIX =":bitarray";
/**
* 注⼊redistemplate
*/
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 添加⼀个值
* @param bloomFilterName bloomFilter名称
* @param key 添加的key
* @throws Exception 没有到当前过滤器
*/
public void put(String bloomFilterName, String key)throws Exception {
long[] hashIndexArray =hash(bloomFilterName, key);
StringBuilder sbName =new StringBuilder(bloomFilterName);
sbName.append(BLOOM_FILTER_BIT_ARRAY_SUFFIX);springboot推荐算法
for(long hashIndex : hashIndexArray){
redisTemplate.opsForValue().String().intern(), hashIndex,true);
}
}
/**
* 判断当前key是否可能存在
* @param bloomFilterName bloomFilter名称
* @param key 判断的key
* @return 是否可能存在
* @throws Exception 没有到bloomFilter
*/
public boolean mightContain(String bloomFilterName, String key)throws Exception {
boolean keyIsContain =false;
long[] hashIndexArray =hash(bloomFilterName, key);
StringBuilder sbName =new StringBuilder(bloomFilterName);
sbName.append(BLOOM_FILTER_BIT_ARRAY_SUFFIX);
for(long hashIndex : hashIndexArray){
if(keyIsContain = redisTemplate.opsForValue().String().intern(), hashIndex)){ return keyIsContain;
}
}
return keyIsContain;
}
/**
* 计算key的hash
* 根据名称拿到bloomFilter信息
* @param key
* @return
*/
private long[]hash(String bloomFilterName, String key)throws Exception {
long numBits;
int numHashFunctions;
int numHashFunctions;
StringBuilder sbName =new StringBuilder(bloomFilterName);
sbName.append(BLOOM_FILTER_INFO_SUFFIX);
Object numBitsString = redisTemplate.opsForHash().String().intern(), NUM_BITS);
Object numHashFunctionsString = redisTemplate.opsForHash().String().intern(), NUM_HASH_FUNCTIONS);
if(ObjectUtils.isEmpty(numBitsString)|| ObjectUtils.isEmpty(numHashFunctionsString)){
// redis中不到当前BloomFilter信息
throw new Exception();
}else{
numBits = Integer.String());
numHashFunctions = Integer.String());
}
return hash(key, numHashFunctions, numBits);
}
/**
* 计算key的hash(动态key + 求余法)
* @return
*/
private long[]hash(String key,int numHashFunctions,long numBits){
long[] hashIndexArray =new long[numHashFunctions];
for(int i =0, j = hashIndexArray.length; i < j; i++){
hashIndexArray[i]=(key.hashCode()+ numHashFunctions)% numBits;
}
return hashIndexArray;
}
/**
* 计算bit数组长度
* 来⾃google
*
* @param expectedInsertions 已有数据长度
* @param fpp 容错率
* @return 每个hash索引个数
*/
static long optimalNumOfBits(long expectedInsertions,double fpp){
if(fpp ==0.0D){
fpp =4.9E-324D;
}
return(long)((double)(-expectedInsertions)* Math.log(fpp)/(Math.log(2.0D)* Math.log(2.0D)));
}
/**
* 计算每个key的index个数
* 来⾃google
*
* @param expectedInsertions 已有数据长度
* @param numBits bit数组长度
* @return 每个key的index个数
*/
static int optimalNumOfHashFunctions(long expectedInsertions,long numBits){
return Math.max(1,(int) und((double) numBits /(double) expectedInsertions * Math.log(2.0D)));
}
/**
* 默认容错率0.03
*/
public boolean initBloomFilter(String bloomFilterName, List<String> dataList){
return initBloomFilter(bloomFilterName, dataList,0.03d);
}
/**
* 初始化数据
* @param bloomFilterName 过滤器名称
* @param dataList 数据id列表
* @param fpp 容错率
* @param fpp 容错率
* @return 成功失败
*/
public boolean initBloomFilter(String bloomFilterName, List<String> dataList,double fpp){
int numHashFunctions;
long expectedInsertions, numBits;
expectedInsertions = dataList.size();
numBits =optimalNumOfBits(expectedInsertions, fpp);
numHashFunctions =optimalNumOfHashFunctions(expectedInsertions, numBits);
/
/ 1.executePipelined 重写⼊参 RedisCallback 的doInRedis⽅法
List<Object> resultList = utePipelined((RedisConnection connection)->{
// 2.connection 打开管道
connection.openPipeline();
// 3.1 保存当前bloomFilter信息
connection.hSet((bloomFilterName + BLOOM_FILTER_INFO_SUFFIX).getBytes(), Bytes(), String.valueOf(fpp).getBytes());
connection.hSet((bloomFilterName + BLOOM_FILTER_INFO_SUFFIX).getBytes(), Bytes(), String.valueOf(numBits).getBytes());
connection.hSet((bloomFilterName + BLOOM_FILTER_INFO_SUFFIX).getBytes(), NUM_Bytes(), String.valueOf(numHash Functions).getBytes());
// 3.connection 给本次管道内添加要⼀次性执⾏的多条命令
for(String key : dataList){
long[] hashIndexArray =hash(key, numHashFunctions, numBits);
for(long hashIndex : hashIndexArray){
connection.setBit((bloomFilterName + BLOOM_FILTER_BIT_ARRAY_SUFFIX).getBytes(), hashIndex,true);
}
}
// 4.关闭管道不需要close 否则拿不到返回值
connection.closePipeline();
// 这⾥⼀定要返回null,最终pipeline的执⾏结果,才会返回给最外层
return null;
});
return true;
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论