【Grpc(⼀)】Java何如理解StreamObserver?
刚开始接触Grpc时,桩代码⾥有许多StreamObserver类型,不太清楚是怎么⽤的,这⾥做⼀个记录。
⾸先看下StreamObserver接⼝定义:
public interface StreamObserver<V> {
void onNext(V value);
void onError(Throwable t);
void onCompleted();
}
可以看到,这是⼀个泛型化的回调接⼝,并且从命名上看,该接⼝使⽤了观察者模式。提到回调,或者提到观察者模式,⾸先可以想到,这⾥⾯涉及到两个⾓⾊:观察者和被观察者。被观察者负责在某⼀事件发⽣时调⽤回调函数通知观察者,观察者负责提供回调函数。对于需要被观察的事件,这⾥是作为回调接⼝⾥的泛型变量存在的。这就是这个接⼝的使⽤流程。下⾯结合Grpc⾥的⼏个使⽤场景来看下。
1.unary模式服务端⽅法返回值例⼦:
我们可以看⼀个简单的unary模式的返回值:
@Override
public void getByKey(MyRequest request, StreamObserver<MyResponse> responseObserver) {
int key = Key();
String value = ""; // 计算逻辑
}
该实现⽅法⾥定义了⼀个MyResponse类型的返回值,这个返回值是被关注的对象。⽅法参数⾥有⼀个StreamObserver<MyResponse>的变量,该变量就是回调函数。回调函数由谁提供?也就是观察者是谁?答案是Grpc框架。可以理解为Grpc框架层在关注MyResponse类型的返回值的⽣成,然后使⽤
协议层及io层做数据发送。我们可以⼤致看⼀下回调函数的实现,⽐如onNext⽅法:
@Override
public void onNext(RespT response) {
if (cancelled) {
if (onCancelHandler == null) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
}
return;
}
checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed");
if (!sentHeaders) {
call.sendHeaders(new Metadata());
sentHeaders = true;
}
call.sendMessage(response);
}
实现类是ServerCallStreamObserverImpl,其中的onNext、onError以及onComplete⽅法均会调⽤内部的ServerCall实例发送消息。具体发送逻辑实现这⾥不做深⼊,总之可以明确的是,这⾥的StreamObserver回调接⼝,其实现逻辑就是将消息发送⾄客户端,这就是观察者的逻辑。再看下被观察者,也就是StreamObserver回调接⼝的调⽤⽅。其实就是实现类⾥返回值的⽣成的逻辑,我们需要根据request取到参数,然后⽣成返回值,调⽤StreamObserver回调接⼝,来通知Grpc框架层发送返回值。⾄此服务端实现⽅法⾥的StreamObserver已经清晰了:被观察的对象就是返回值,Grpc框架层是观察者,提供发送逻辑作为回调函数,实现类是被观察者,每⼀次返回值的⽣成都会调⽤回调函数通知Grpc。还有⼀点,StreamObserver接⼝的定义其实和stream息息相关。我们知道stream模式意味
着可以在⼀个连接中发送多条消息,所以该接⼝提供了onNext回调函数,该函数可以被多次调⽤,每⼀次对onNext的调⽤都代表⼀条消息的发送。如果全部发送完了或者发送出错,那么就需要调⽤onError或者onComplete来告知对⽅本次stream已经结束。所以该接⼝的设计也与stream的概念也完全契合。
2.流式客户端例⼦:
客户端流式与双端流式类似,这⾥只看客户端流式。先看客户端代码:
private static void clientStream() {
StreamObserver<MyResponse> responseData = new StreamObserver<MyResponse>() {
@Override
public void onNext(MyResponse response) {
System.out.println("res: " + Value());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
StreamObserver<MyRequest> requestData = ByKeyClientStream(responseData);
for (int i = 0; i < 10; i++) {
}
}
可以看到,客户端侧,需要创建两个observer分别⽤于处理请求与返回值。初次看可能⽐较懵逼,⼊参数返回值类型,并且返回了⼀个请求接下来具体看下,对于请求observer,回调接⼝由Grpc框架层提供,实现类见CallToStreamObserverAdapter,也就是观察者,所做的事情就是将请求数据发送到服务端。回调函数的调⽤⽅,也即被观察者,是我们提供的业务代码,作⽤就是⽣成请求数据,并调⽤回调函数。这个与unary模式下服务端返回值是类似的。对于返回值observer,回调函数由我们来提供,也即观察者,作⽤是处理返回值。回调函数调⽤⽅是Grpc框架,监听到有返回数据时,就调⽤回调函数,通知客户端处理返回值。
服务端侧代码:
@Override
public StreamObserver<MyRequest> getByKeyClientStream(StreamObserver<MyResponse> responseObserver) {
return new StreamObserver<MyRequest>() {
int count = 0;
@Override
public void onNext(MyRequest myRequest) {
System.out.println("recv: " + Key());
count++;
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
};
}
⽅法返回值是请求类型的observer。回调函数有我们来提供,作⽤是处理接收到的请求,也就是观察者逻辑。回调函数的调⽤⽅式Grpc框架层,监听到有请求到达时,就会调⽤我们提供的回调函数,通知处理请求。对于返回值的observer与前边类似。
围绕着观察者与被观察者两个⾓⾊看待Grpc⾥的调⽤⽅式,就会清晰很多。
总结:
1.StreamObserver接⼝使⽤了观察者模式的概念,与stream流模式完全契合,⼀次onnext调⽤代表str
eam内⼀次发送;
2.对于需要接受数据的场景,Grpc框架是被观察者;对于需要发送数据的场景,Grpc框架是观察者;
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论