gateway学习——转发的过程源码解读(4)完
问题:
访问gateway的地址:localhost:8080/spring-nacos/user/info
是如何转发到 localhost:8081/spring-nacos/user/info 项⽬的
DispatcherHandler类就是gateway的请求⼊⼝,
(怎么知道这是⼊⼝,⼤概是:http请求——>netty线程模型,处理read事件——>然后调⽤DispatcherHandler.hanlder⽅法。这⼀块没去研究,百度到的)。
DispatcherHandler
先介绍⼀下这个类:
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
//1: 实现了 ApplicationContextAware 接⼝,setApplicationContext(ApplicationContext applicationContext)⽅法,
//2: 实现了WebHandler 接⼝,handle(ServerWebExchange exchange); ⽅法。
}
看下其实现该⽅法的逻辑:
1:setApplicationContext:
@Override
public void setApplicationContext(ApplicationContext applicationContext){
//初始化策略
initStrategies(applicationContext);
}
//这⾥初始化是给属性handlerMappings ,handlerAdapters ,resultHandlers 赋值。
protected void initStrategies(ApplicationContext context){
/
/这⾥是获取所有的 HandlerMapping 的Bean,
//例如:RoutePredicateHandlerMapping, 是在:GatewayAutoConfiguration中⽤@Bean注⼊
//RequestMappingHandlerMapping, 是在:WebFluxAutoConfiguration中内部类:EnableWebFluxConfiguration的RequestMappingHandlerAdapter--->ateRequestMappingHandlerAdapter();
//RouterFunctionMapping, 与上⾯ RequestMappingHandlerMapping 实在同⼀个类中: @Bean RouterFunctionMapping
//SimpleUrlHandlerMapping 是在 WebFluxConfigurationSupport 的 @Bean的HandlerMapping
Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class,true,false);
//排序
ArrayList<HandlerMapping> mappings =new ArrayList<>(mappingBeans.values());
AnnotationAwareOrderComparator.sort(mappings);
//定义为不可以修改
this.handlerMappings = Collections.unmodifiableList(mappings);
//获取 HandlerAdapter ,有三个。
// RequestMappingHandlerAdapter 是在 WebFluxConfigurationSupport  的@Bean RequestMappingHandlerAdapter
// HandlerFunctionAdapter 是在 WebFluxConfigurationSupport  的@Bean HandlerFunctionAdapter
// SimpleHandlerAdapter 是在 WebFluxConfigurationSupport  的@Bean SimpleHandlerAdapter
Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerAdapter.class,true,false);
this.handlerAdapters =new ArrayList<>(adapterBeans.values());
AnnotationAwareOrderComparator.sort(this.handlerAdapters);
// 这⾥获取:HandlerResultHandler,都是在 WebFluxConfigurationSupport类中注⼊的Bean
// ResponseEntityResultHandler
// ResponseBodyResultHandler
// ViewResolutionResultHandler
// ServerResponseResultHandler
Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerResultHandler.class,true,false);
AnnotationAwareOrderComparator.sultHandlers);
}
2:handle(ServerWebExchange exchange)⽅法
@Override
public Mono<Void>handle(ServerWebExchange exchange){
if(this.handlerMappings ==null){
return createNotFoundError();
}
//明天再将主流程,很重要
return Flux.fromIterable(this.handlerMappings)//这⾥创建⼀个流
.concatMap(mapping -> Handler(exchange))// 转换流后并进⾏合并
.next()//插⼊消息,不太懂
.switchIfEmpty(createNotFoundError())
.flatMap(handler ->invokeHandler(exchange, handler))// flatMap转换。
.
flatMap(result ->handleResult(exchange, result));
}
2.Handler(exchange)⽅法
RoutePredicateHandlerMapping,RequestMappingHandlerMapping,RouterFunctionMapping,SimpleUrlHandlerMapping
这四个类都会进⼊Handler(ServerWebExchange exchange)
public Mono<Object>getHandler(ServerWebExchange exchange){
return getHandlerInternal(exchange).map(handler ->{
if(logger.isDebugEnabled()){
logger.LogPrefix()+"Mapped to "+ handler);
}
// 这⾥就是 cors的配置判断了,应该是判断是否满⾜跨域设置。
ServerHttpRequest request = Request();
if(hasCorsConfigurationSource(handler)|| CorsUtils.isPreFlightRequest(request)){
CorsConfiguration config =(sConfigurationSource !=CorsConfiguration(exchange):null);
CorsConfiguration handlerConfig =getCorsConfiguration(handler, exchange);
config =(config !=null? configbine(handlerConfig): handlerConfig);
if(!sProcessor.process(config, exchange)|| CorsUtils.isPreFlightRequest(request)){
return REQUEST_HANDLED_HANDLER;
}
}
return handler;
});
}
2.1.1: getHandlerInternal(exchange)
这个⽅法每个⼦类都有实现,以下分别开始讲。
2.1.1.1: HandlerInternal
protected Mono<?>getHandlerInternal(ServerWebExchange exchange){
// don't handle requests on management port if set and different than server port
if(this.managementPortType == DIFFERENT &&this.managementPort !=null
&& Request().getURI().getPort()==this.managementPort){
pty();
}
//这⼀步设置属性 org.springframework.cloud.gateway.support.ServerWebExchangeUtils.gatewayHan
dlerMapper = RoutePredicateHandlerMapping  Attributes().put(GATEWAY_HANDLER_MAPPER_ATTR,getSimpleName());
//检查路由:⽅法逻辑下⾯详解
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r ->{
if(logger.isDebugEnabled()){
logger.debug(
"Mapping ["+getExchangeDesc(exchange)+"] to "+ r);
}
2.1.1.1.0:把Route  put进去。
return Mono.just(webHandler);
}).pty().then(Mono.fromRunnable(()->{
if(logger.isTraceEnabled()){
+getExchangeDesc(exchange)+"]");
}
})));
}
lookupRoute(exchange):检查路由:
protected Mono<Route>lookupRoute(ServerWebExchange exchange){
// uteLocator是CachingRouteLocator,
// getRoutes()= RouteDefinitionRouteLocator,
// 这些都是@Bean注⼊的。
// A: Routes()就会进⼊到: Routes(),
Routes()
// individually filter routes so that filterWhen error delaying is not a
.concatMap(route -> Mono.just(route).filterWhen(r ->{
// add the current route we are testing
//添加属性,
/
/这⾥就是根据请求和断⾔的条件匹配路由
// E:点的 r.getPredicate().apply(exchange)。
//这⾥的r.getPredicate() 就是B1点⽣成的 AsyncPredicate<ServerWebExchange>。然后执⾏其 apply⽅法:
Predicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> (
"Error applying predicate for route: "+ Id(),
e))
.onErrorResume(e -> pty()))
.next()
.map(route ->{
if(logger.isDebugEnabled()){
logger.debug("Route matched: "+ Id());
}
//验证路由,暂⽆实现逻辑
validateRoute(route, exchange);
return route;
});
}
A点的:Routes()讲解:
@Override
public Flux<Route>getRoutes(){
Flux<Route> routes =RouteDefinitions()
.map(this::convertToRoute);
// RouteDefinitions()
// uteDefinitionLocator是 CompositeRouteDefinitionLocator 对象,
//所以 RouteDefinitions(),复合类中会得到Routes(), //Routes(),
//uteDefinitions,
//DiscoveryClientRouteDefinitionLocator.serviceInstances.
}
this::convertToRoute讲解:
//RouteDefinition转化成 Route对象
private Route convertToRoute(RouteDefinition routeDefinition){
//B: 合并断⾔对象,
AsyncPredicate<ServerWebExchange> predicate =combinePredicates(routeDefinition);
//C:获取gateway的过滤器
List<GatewayFilter> gatewayFilters =getFilters(routeDefinition);
//D:⽣成Route对象
return Route.async(routeDefinition).asyncPredicate(predicate)
springcloud难学吗
.replaceFilters(gatewayFilters).build();
}
B:合并断⾔对象
B:合并断⾔对象
private AsyncPredicate<ServerWebExchange>combinePredicates(
RouteDefinition routeDefinition){
//这⾥获取 RouteDefinition.predicates的值,
//那这个属性从哪⾥赋值的呢?就是配置⽂件中的配置:
//spring.utes.predicates属性
List<PredicateDefinition> predicates = Predicates();
//B1:检查第⼀个断⾔对象,⼤概意思就是根据断⾔的名字去判断是否有其对应的路由断⾔⼯⼚实现类RoutePredicateFactory,然后⽣成 AsyncPredicate对象,具体代码不列出来了,这个lookup⾥⾯返回的是: return factory.applyAsync(config) ,执⾏的就是ReadBodyRoutePredicateFactory类的applyAsync(Confi g config)⽅法,这个⽅法右返回了,new AsyncPredicate<ServerWebExchange>(){
apply⽅法:
}
AsyncPredicate<ServerWebExchange> predicate =lookup(routeDefinition,
<(0));
//这⾥是检查剩下的断⾔对象
for(PredicateDefinition andPredicate : predicates.subList(1,
predicates.size())){
AsyncPredicate<ServerWebExchange> found =lookup(routeDefinition,
andPredicate);
//这⾥的结构有点意思,个⼈理解是 (((left1,right1),right2),right3)
predicate = predicate.and(found);
}
//⽐如有 predicate1,predicate2,predicate3,predicate4
//⽣成的结构则是:
(left            right)
(left            right)
(left      right)
(((predicate1,predicate2),predicate3),predicate4)
return predicate;
}
C:获取gateway的过滤器
private List<GatewayFilter>getFilters(RouteDefinition routeDefinition){
List<GatewayFilter> filters =new ArrayList<>();
//这⾥就是获取默认的 spring.cloud.gateway.default-filters属性
if(!DefaultFilters().isEmpty()){
filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
new ArrayList<>(DefaultFilters())));
}
//这⾥是获取 spring.utes.filters的配置。
if(!Filters().isEmpty()){
filters.addAll(Id(),
new ArrayList<>(Filters())));
}
/
/进⾏排序。
AnnotationAwareOrderComparator.sort(filters);
return filters;
}
D:⽣成Route对象,这个对象带有断⾔器与过滤器,id,uri,order,metadata等信息
return new Route(this.id,this.der, predicate,
this.adata);
E:r.getPredicate().apply(exchange);这⾥会执⾏到 ReadBodyRoutePredicateFactory.applyAsync()⽅法中返回的AsyncPredicate.apply()⽅法,
public Publisher<Boolean>apply(ServerWebExchange exchange){
Class inClass = InClass();
//精简代码,这⾥主要是执⾏
//例如 predicate配置的是 -Path,这个config.predicate就会是PathRoutePredicateFactory.apply()⽅法中返回的st()⽅法。
st(cachedBody)
}
E1:讲解这个Config对象的是怎么来的?
是在lookup(RouteDefinition route,PredicateDefinition predicate)中创建的
//这⾥就会获取到相对应的 PathRoutePredicateFactory。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。