⽤elasticsearch聚合函数实现distinct查询1.等价的sql
distinct查询SELECT DISTINCT field1,field2 FROM st_type
等价于
SELECT field1,field2 FROM st_type GROUP BY field1,field2
2.⽽group by的查询,在es中我们可以⽤Aggregation(聚合)去实现,等价的DSL查询语句如下:
POST /test_index/test_type/_search
{
"from": 0,
"size": 0,
"aggregations": {
"field1": {
"terms": {
"field": "field1",
"size": 2147483647
},
"aggregations": {
"field2": {
"terms": {
"field": "field2",
"size": 2147483647
}
}
}
}
}
}
3.java的实现:
llect.ArrayListMultimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Stack;
import java.util.stream.Collectors;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.s.Terms;
import org.elasticsearch.search.s.TermsBuilder;
/**
*
* @author zhongchenghui
*/
public class EsSearch {
private static final String CLUSTER_NAME = "test_cluster";
private static final String ES_ADDRESSES = "192.168.12.1,192.168.12.2,192.168.12.3";
private static final String INDEX_NAME = "test_index";
private static final Client ES_CLIENT = wInstance(CLUSTER_NAME, ES_ADDRESSES);
/**
* 根据多个字段group by的查询
*
* @param type
* @param groupColumnsNames
* @return
* @throws Exception
*/
public List<Map<String, String>> getRowsByGroup(String type, groupColumnsNames) throws Exception {
TermsBuilder allTermsBuilder = createTermsAggregationBuilder(groupColumnsNames);
SearchResponse response = createSearchRequestBuilder(type, groupColumnsNames).setSize(0).addAggregation(allTermsBuilder).execute().actionGet(); List<Map<String, String>> rows = new ArrayList<>();
Terms agg = Aggregations().get(groupColumnsNames[0]);
int i = 0;
rows.addAll(getFlatBucket(i, bucket, groupColumnsNames));
});
return rows;
}
/**
* 逐层创建AggregationBuilder
* (此处Integer.MAX_VALUE是逐层分组的最⼤组数)
* @param groupColumnsNames
* @return
* @throws Exception
*/
private TermsBuilder groupColumnsNames) {
TermsBuilder allTermsBuilder = s(groupColumnsNames[0]).field(groupColumnsNames[0]).size(Integer.MAX_VALUE);
TermsBuilder tmpTermsBuilder = allTermsBuilder;
for (int i = 1; i < groupColumnsNames.length; i++) {
TermsBuilder termsBuilder = s(groupColumnsNames[i]).field(groupColumnsNames[i]).size(Integer.MAX_VALUE);
tmpTermsBuilder.subAggregation(termsBuilder);
tmpTermsBuilder = termsBuilder;
}
return allTermsBuilder;
}
/**
* 创建查询请求的Builder
*
* @param type
* @param groupColumnsNames
* @return
* @throws Exception
*/
private SearchRequestBuilder createSearchRequestBuilder(String type, columnNames) {
SearchRequestBuilder searchRequestBuilder = ES_CLIENT.prepareSearch(INDEX_NAME).setTypes(type).setSize(50000);
if (Null(columnNames) && columnNames.length > 0) {
searchRequestBuilder.addFields(columnNames);
}
return searchRequestBuilder;
}
/**
* ⽤堆栈将respone中的聚合结果拉平返回(⼴度优先遍历)
*
* @param layer
* @param bucket
* @param groupColumnsNames
*/
private List<Map<String, String>> getFlatBucket(int layer, Terms.Bucket bucket, groupColumnsNames) {
ArrayListMultimap<BucketNode, BucketNode> bucketRowMultimap = ate();
Stack<BucketNode> nodeStack = new Stack<>();
BucketNode bucketNode = new BucketNode(layer, groupColumnsNames[layer], bucket);
nodeStack.add(bucketNode);
bucketRowMultimap.put(bucketNode, bucketNode);
while (!nodeStack.isEmpty()) {
bucketNode = nodeStack.pop();
List<BucketNode> childerNodes = getChildrenBucketNodes(bucketNode, groupColumnsNames);
if (childerNodes != null && !childerNodes.isEmpty()) {
List<BucketNode> parentRoute = veAll(bucketNode);
for (BucketNode child : childerNodes) {
nodeStack.push(child);
bucketRowMultimap.putAll(child, parentRoute);
bucketRowMultimap.put(child, child);
}
}
}
return convertToRows(bucketRowMultimap.asMap().values());
}
/**
* 获得下⼀层Bucket的节点列表
*
* @param parentNode
* @param groupColumnsNames
* @return
*/
private List<BucketNode> getChildrenBucketNodes(BucketNode parentNode, groupColumnsNames) {
int currentLayer = parentNode.layer + 1;
if (currentLayer < groupColumnsNames.length) {
String currentAggName = groupColumnsNames[currentLayer];
Terms currentAgg = Aggregations().get(currentAggName);
if (Null(currentAgg)) {
Buckets().stream().map(bucket -> new BucketNode(currentLayer, currentAggName, bucket)).List()); }
}
return null;
}
private List<Map<String, String>> convertToRows(Collection<Collection<BucketNode>> bucketRoutes) {
return bucketRoutes.stream().map(bucketRoute -> convertToRow(bucketRoute)).List());
}
private Map<String, String> convertToRow(Collection<BucketNode> bucketRoute) {
Map<String, String> row = new HashMap<>();
bucketRoute.stream().forEach(bucketNode -> row.put(bucketNode.aggName, KeyAsString()));
return row;
}
class BucketNode {
int layer;
String aggName;
Terms.Bucket bucket;
public BucketNode(int layer, String aggName, Terms.Bucket bucket) {
BucketNode.this.layer = layer;
BucketNode.this.aggName = aggName;
BucketNode.this.bucket = bucket;
}
public String toString() {
return "BucketNode{" + "layer=" + layer + ", aggName=" + aggName + ", bucket_key=" + KeyAsString() + '}';
}
}
}
import java.InetAddress;
import java.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import urrent.ConcurrentHashMap;
import org.elasticsearch.client.Client;
import org.ansport.TransportClient;
import org.elasticsearchmon.settings.Settings;
import ansport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author zhongchenghui
*/
public class ESClientFactory {
private static final Logger LOGGER = Logger(ESClientFactory.class);
private static final ConcurrentHashMap<String, Client> CLIENT_CACHE = new ConcurrentHashMap<>();
public static Client newInstance(String clusterName, String hostStr) {
Client client = (clusterName);
if (client == null) {
Map<String, Integer> addressMap = ESAddressMap(hostStr);
Settings settings = Settings.settingsBuilder().put("cluster.name", clusterName).build();
TransportClient newClient = TransportClient.builder().settings(settings).build();
addressMap.keySet().forEach((host) -> {
try {
newClient.addTransportAddress(new ByName(host), (host))); } catch (UnknownHostException ex) {
<("Init ES client failure,cluster name is:{},Error:{}", clusterName, ex);
}
});
client = newClient;
CLIENT_CACHE.put(clusterName, newClient);
}
return client;
}
private static Map<String, Integer> getESAddressMap(String hostStr) {
Map<String, Integer> hostMap = new HashMap<>();
String[] hosts = hostStr.split(",");
for (String host : hosts) {
String[] hostPort = im().split(":");
Integer port = hostPort.length < 2 ? 9300 : Integer.valueOf(hostPort[1]);
hostMap.put(hostPort[0], port);
}
return hostMap;
}
}
a.因为实现的⽅式是⼀层层往下聚合,当es中的document出现field1的字段为null的时候,该条件就不会往下聚合,即使该document 的field2字段有值。(可⽤指定字符代替null来解决这个问题)
b.不适合于数据量太⼤表,3中的代码要求最⼤每个字段的分组数不能⼤于Integer.MAX_VALUE
c.返回的字段只能是参与group by的字段
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论