使⽤springcloudgateway搭建⽹关(分流,限流,熔断)Spring Cloud Gateway
Spring Cloud Gateway 是 Spring Cloud 的⼀个全新项⽬,该项⽬是基于 Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的⽹关,它旨在为微服务架构提供⼀种简单有效的统⼀的 API 路由管理⽅式。
Spring Cloud Gateway 作为 Spring Cloud ⽣态系统中的⽹关,⽬标是替代 Netflix Zuul,其不仅提供统⼀的路由⽅式,并且基于 Filter 链的⽅式提供了⽹关基本的功能,例如:安全,监控/指标,和限流。
相关概念:
Route(路由):这是⽹关的基本构建块。它由⼀个 ID,⼀个⽬标 URI,⼀组断⾔和⼀组过滤器定义。如果断⾔为真,则路由匹配。
Predicate(断⾔):这是⼀个 Java 8 的 Predicate。输⼊类型是⼀个 ServerWebExchange。我们可以使⽤它来匹配来⾃ HTTP 请求的任何内容,例如headers 或参数。
Filter(过滤器):这是org.springframework.cloud.gateway.filter.GatewayFilter的实例,我们可以使⽤它修改请求和响应。
⼯作流程:
客户端向 Spring Cloud Gateway 发出请求。如果 Gateway Handler Mapping 中到与请求相匹配的路由,将其发送到 Gateway Web Handler。Handler 再通过指定的过滤器链来将请求发送到我们实际的服务执⾏业务逻辑,然后返回。过滤器之间⽤虚线分开是因为过滤器可能会在发送代理请求之前(“pre”)或之后(“post”)执⾏业务逻辑。
Spring Cloud Gateway 的特征:
基于 Spring Framework 5,Project Reactor 和 Spring Boot 2.0
动态路由
Predicates 和 Filters 作⽤于特定路由
集成 Hystrix 断路器
集成 Spring Cloud DiscoveryClient
易于编写的 Predicates 和 Filters
限流
路径重写
快速上⼿
引⼊spring-boot  2.1.1.RELEASE ,springcloud的版本为 Greenwich.M3
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.M3</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
添加的依赖包如下
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
注意springcloud gateway使⽤的web框架为webflux,和springMVC不兼容。引⼊的限流组件是hystrix。redis底层不再使⽤jedis,⽽是lettuce。
路由断⾔
接下来就是配置了,可以使⽤java代码硬编码配置路由过滤器,也可以使⽤yml配置⽂件配置。下⾯我们⾸先介绍配置⽂件配置⽅式
server.port: 8082
spring:
application:
name: gateway
cloud:
gateway:
routes:
- id: path_route
uri: localhost:8000
order: 0
predicates:
- Path=/foo/**
filters:
- StripPrefix=1
上⾯给出了⼀个根据请求路径来匹配⽬标uri的例⼦,如果请求的路径为/foo/bar,则⽬标uri为 localhost:8000/bar。如果上⾯例⼦中没有加⼀个StripPrefix=1过滤器,则⽬标uri 为localhost:8000/foo/bar,StripPrefix过滤器是去掉⼀个路径。
其他的路由断⾔和过滤器使⽤⽅法请查看官⽹
接下来我们来看⼀下设计⼀个⽹关应该需要的⼀些功能
修改接⼝返回报⽂
因为⽹关路由的接⼝返回报⽂格式各异,并且⽹关也有有⼀些限流、认证、熔断降级的返回报⽂,为了
统⼀这些报⽂的返回格式,⽹关必须要对接⼝的返回报⽂进⾏修改,过滤器代码如下:
package org.gateway.filter.global;
import java.nio.charset.Charset;
import sponse.Response;
activestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import Ordered;
import io.buffer.DataBuffer;
import io.buffer.DataBufferFactory;
import io.buffer.DataBufferUtils;
import org.springframework.active.ServerHttpResponse;
import org.springframework.active.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import com.alibaba.fastjson.JSON;
publisher.Flux;
publisher.Mono;
@Component
public class WrapperResponseFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
/
/ -1 is response write filter, must be called before that
return -2;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpResponse originalResponse = Response();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
return super.writeWith(fluxBody.map(dataBuffer -> {
// probably should reuse buffers
byte[] content = new adableByteCount()];
// 释放掉内存
String rs = new String(content, Charset.forName("UTF-8"));
Response response = new Response();
response.setCode("1");
response.setMessage("请求成功");
response.setData(rs);
byte[] newRs = JSONString(response).getBytes(Charset.forName("UTF-8"));
return bufferFactory.wrap(newRs);
}));
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
// replace response with decorator
return chain.filter(exchange.mutate().response(decoratedResponse).build());
}
}
需要注意的是order需要⼩于-1,需要先于NettyWriteResponseFilter过滤器执⾏。
有了⼀个这样的过滤器,我们就可以统⼀返回报⽂格式了。
认证
以下提供⼀个简单的认证过滤器
package org.gateway.filter.global;
import java.nio.charset.StandardCharsets;
import sponse.Response;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.active.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import com.alibaba.fastjson.JSON;
publisher.Mono;
@Component
public class AuthFilter implements GlobalFilter{
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String token = Request().getHeaders().getFirst("token");
if ("token".equals(token)) {
return chain.filter(exchange);
}
ServerHttpResponse response = Response();
Response data = new Response();
data.setCode("401");
data.setMessage("⾮法请求");
byte[] datas = JSONString(data).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = response.bufferFactory().wrap(datas);
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return response.writeWith(Mono.just(buffer));
}
}
限流
springcloud gateway 为我们提供了限流过滤器RequestRateLimiterGatewayFilterFactory,和限流的实现类RedisRateLimiter使⽤令牌桶限流。但是官⽅的不⼀定满⾜我们的需求,所以我们重新写⼀个过滤器(基本和官⽅⼀致),只是将官⽅的返回报⽂改了。
package org.gateway.limiter;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import sponse.Response;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
import org.springframework.ute.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.active.ServerHttpResponse;
import com.alibaba.fastjson.JSON;
publisher.Mono;
/**
* User Request Rate Limiter filter. See stripe/blog/rate-limiters and
*/
public class RateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory<RateLimiterGatewayFilterFactory.Config> {
public static final String KEY_RESOLVER_KEY = "keyResolver";
private final RateLimiter defaultRateLimiter;
private final KeyResolver defaultKeyResolver;
public RateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter,
KeyResolver defaultKeyResolver) {
super(Config.class);
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
}
public KeyResolver getDefaultKeyResolver() {
return defaultKeyResolver;
}
public RateLimiter getDefaultRateLimiter() {
return defaultRateLimiter;
}
@SuppressWarnings("unchecked")
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = (config.keyResolver == null) ? defaultKeyResolver : config.keyResolver;
RateLimiter<Object> limiter = (config.rateLimiter == null) ? defaultRateLimiter : config.rateLimiter;
return (exchange, chain) -> {
Route route = Attribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
solve(exchange).flatMap(key ->
// TODO: if key is empty?
limiter.Id(), key).flatMap(response -> {
for (Map.Entry<String, String> header : Headers().entrySet()) {
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
ServerHttpResponse rs = Response();
Response data = new Response();
data.setCode("101");
data.setMessage("访问过快");
byte[] datas = JSONString(data).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = rs.bufferFactory().wrap(datas);
rs.setStatusCode(HttpStatus.UNAUTHORIZED);
return rs.writeWith(Mono.just(buffer));
}));
};
}
public static class Config {
private KeyResolver keyResolver;
private RateLimiter rateLimiter;
private HttpStatus statusCode = HttpStatus.TOO_MANY_REQUESTS;
public KeyResolver getKeyResolver() {
return keyResolver;
}
微服务网关设计
public Config setKeyResolver(KeyResolver keyResolver) {
this.keyResolver = keyResolver;
return this;
}
public RateLimiter getRateLimiter() {
return rateLimiter;
}
public Config setRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
return this;
}
public HttpStatus getStatusCode() {
return statusCode;
}
public Config setStatusCode(HttpStatus statusCode) {
this.statusCode = statusCode;
return this;
}
}
}
然后限流必须要有⼀个key,根据什么来进⾏限流,ip,接⼝,或者⽤户来进⾏限流,所以我们⾃定义⼀个KeyResolver package org.gateway.limiter;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.web.server.ServerWebExchange;
import com.alibaba.fastjson.JSON;
publisher.Mono;
public class CustomKeyResolver implements KeyResolver {
public static final String BEAN_NAME = "customKeyResolver";
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(getKey(exchange));
}
/**
*
* @param exchange
* @return
*/
private String getKey(ServerWebExchange exchange) {
LimitKey limitKey = new LimitKey();
limitKey.Request().getPath().toString());
limitKey.Request().getQueryParams().getFirst("biz"));
JSONString(limitKey);
}
}
最后RedisRateLimiter我们也需要重写,因为不⽀持多级限流,原⽣的只会判断⼀个key。代码如下:    /**
* This uses a basic token bucket algorithm and relies on the fact that Redis scripts
* execute atomically. No other operations can run between fetching the count and
* writing the new count.
*/
@Override
public Mono<Response> isAllowed(String routeId, String id) {
if (!()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
LimitConfig limitConfig = getLimitConfig(routeId);
if (limitConfig == null || TokenConfig().size()==0) {
return Mono.just(new Response(true,null));
}
Map<String, Config> conf = TokenConfig();
LimitKey limitKey = JSON.parseObject(id, LimitKey.class);
//api限流
String api = Api();
Config apiConf = (api);
//业务⽅限流
String biz = Biz();
Config bizConf = (biz);
if (apiConf!=null) {
return isSingleAllow(api,routeId,apiConf).flatMap(res -> {
if (res.isAllowed()) {
if(bizConf!=null) {
return isSingleAllow(biz, routeId, bizConf);
}else {
return Mono.just(new Response(true,new HashMap<>()));
}
}else {
return Mono.just(res);
}
} );
}else {
if (bizConf!=null) {
return isSingleAllow(biz, routeId, bizConf);
}else {
return Mono.just(new Response(true,new HashMap<>()));
}
}
}
/**
* 单级限流
* @param api
* @param routeId
* @param apiConf
* @return
*/
private Mono<Response> isSingleAllow(String key, String routeId, Config config) {
// How many requests per second do you want a user to be allowed to do?
int replenishRate = ReplenishRate();
// How much bursting do you want to allow?
int burstCapacity = BurstCapacity();

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。