异步httpclient(httpasyncclient)的使⽤与总结
参考:
1. 前⾔
应⽤层的⽹络模型有同步与异步。同步意味当前线程是阻塞的,只有本次请求完成后才能进⾏下⼀次请求;异步意味着所有的请求可以同时塞⼊缓冲区,不阻塞当前的线程;
httpclient在4.x之后开始提供基于nio的异步版本httpasyncclient,httpasyncclient借助了Java并发库和nio进⾏封装(虽说NIO是同步⾮阻塞IO,但是HttpAsyncClient提供了回调的机制,与netty类似,所以可以模拟类似于AIO的效果),其调⽤⽅式⾮常便捷,但是其中也有许多需要注意的地⽅。2. pom⽂件
本⽂依赖4.1.2,当前最新的客户端版本是4.1.3maven repository 地址
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>4.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.2</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
3. 简单的实例
public class TestHttpClient {
public static void main(String[] args){
RequestConfig requestConfig = RequestConfig.custom()
.
setConnectTimeout(50000)
.setSocketTimeout(50000)
.setConnectionRequestTimeout(1000)
.build();
//配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
Runtime().availableProcessors())
.setSoKeepAlive(true)
.build();
//设置连接池⼤⼩
ConnectingIOReactor ioReactor=null;
try {
ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException e) {
e.printStackTrace();
}
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor); connManager.setMaxTotal(100);
connManager.setDefaultMaxPerRoute(100);
final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
//构造请求
String url = "127.0.0.1:9200/_bulk";
HttpPost httpPost = new HttpPost(url);
StringEntity entity = null;
try {
String a = "{ \"index\": { \"_index\": \"test\", \"_type\": \"test\"} }\n" +
"{\"name\": \"上海\",\"age\":33}\n";
entity = new StringEntity(a);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
httpPost.setEntity(entity);
//start
client.start();
//异步请求
while(true){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Back implements FutureCallback<HttpResponse>{
private long start = System.currentTimeMillis();
Back(){
}
public void completed(HttpResponse httpResponse) {
try {
System.out.println("cost is:"+(System.currentTimeMillis()-start)+":"+Entity())); } catch (IOException e) {
e.printStackTrace();
}
}
public void failed(Exception e) {
}
public void cancelled() {
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
71
72
73
74
75
76
77
78
79
80
81
82
83
4. ⼏个重要的参数
4.1 TimeOut(3个)的设置
ConnectTimeout : 连接超时,连接建⽴时间,三次握⼿完成时间。
SocketTimeout : 请求超时,数据传输过程中数据包之间间隔的最⼤时间。
ConnectionRequestTimeout : 使⽤连接池来管理连接,从连接池获取连接的超时时间。
在实际项⽬开发过程中,这三个值可根据具体情况设置。
(1) 下⾯针对ConnectionRequestTimeout的情况进⾏分析
实验条件:设置连接池最⼤连接数为1,每⼀个异步请求从开始到回调的执⾏时间在100ms以上;
实验过程:连续发送2次请求
public class TestHttpClient {
public static void main(String[] args){
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(50000)
.setSocketTimeout(50000)
.
setConnectionRequestTimeout(10)//设置为10ms
.build();
//配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
Runtime().availableProcessors())
.setSoKeepAlive(true)
.build();
//设置连接池⼤⼩
ConnectingIOReactor ioReactor=null;
try {
ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException e) {
e.printStackTrace();
}
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor); connManager.setMaxTotal(1);//最⼤连接数设置1
connManager.setDefaultMaxPerRoute(1);//per route最⼤连接数设置1
final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
//构造请求
String url = "127.0.0.1:9200/_bulk";
List<HttpPost> list = new ArrayList<HttpPost>();
for(int i=0;i<2;i++){
HttpPost httpPost = new HttpPost(url);
StringEntity entity = null;
String a = "{ \"index\": { \"_index\": \"test\", \"_type\": \"test\"} }\n" +
"{\"name\": \"上海\",\"age\":33}\n";
entity = new StringEntity(a);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
httpPost.setEntity(entity);
list.add(httpPost);
}
client.start();
for(int i=0;i<2;i++){
<(i), new Back());
}
while(true){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Back implements FutureCallback<HttpResponse>{
private long start = System.currentTimeMillis();
Back(){
}
public void completed(HttpResponse httpResponse) {
try {
System.out.println("cost is:"+(System.currentTimeMillis()-start)+":"+Entity())); } catch (IOException e) {
e.printStackTrace();
}
}
public void failed(Exception e) {
e.printStackTrace();
}
public void cancelled() {
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
reactor线程模型的特点15
16
17
18
19
20
21

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。