利⽤dubbo打造真正的nodejs端的微服务体系
Java在微服务⽅⾯的⽣态⽐较完善,国内常见的有:
基于consul的spring⽣态。nest-cloud就是基于consul来搭建微服务。
阿⾥巴巴的dubbo与sofa。
基本上国内的Java微服务都是使⽤以上的⽣态。⽽nodejs为了能够与Java微服务互通,⽬前还没有⾮常完善的体系,除了nest-cloud的微服务体系。今天,我主要为了讲解如何在nodejs上通过dubbo打造与Java互通的微服务体系。
我们的主⾓是
如果不是TS写的,都不好意思拿出来说。
TCP通讯
所有的微服务基本都基于TCP的长连接,我们要解决的问题主要是以下⼏个:
TCP数据发送与接收时候的粘包与拆包问题
TCP连接的⼼跳检测以及重连重试机制
数据传输时候的序列化与发序列化算法
使⽤注册中⼼的订阅机制
TCP传输具有⾮常可靠的安全性,不像UDP传输那样会丢包,所以微服务间的通讯基本使⽤TCP去完成,⼀旦连接,通讯速度是⾮常快的。
服务的注册与发现
⼀般的,在dubbo中,我们使⽤ 来实现服务注册与发现,但是也有⽣态是使⽤Redis来实现的。不过今天我们就讲使⽤ZK的场景下的微服务。
其实ZK就是⼀个云端的KEY/VALUE存储器,同时具备了订阅通知的功能。这⾥推荐使⽤,这个库已相当稳定,每周下载量也不少,值得放⼼使⽤。
使⽤dubbo.ts创建注册中⼼的连接是⾮常简单的:
import { Registry, RegistryInitOptions } from 'dubbo.ts';
分布式和微服务的关系
const registry = new Registry({
host: '127.0.0.1:2181' // zk地址
} as RegistryInitOptions);
t(); // 连接
registry.close(); // 断开
提供服务
纵观整个NPM,没有出服务提供者的库。⼀般来说,使⽤dubbo的nodejs程序,仅仅只是调⽤java⽅服务的,⽆法提供基于nodejs的微服务的注册与被调⽤。也就是说,通过dubbo.ts我们可以像java⼀样,在注册中⼼注册我们的微服务,供nodejs或者java来调⽤。
在dubbo.ts中,我们可以这样来玩:
1. 创建服务提供者对象
import {
Provider,
ProviderInitOptions,
ProviderServiceChunkInitOptions
} from 'dubbo.ts';
const provider = new Provider({
application: 'test',
dubbo_version: '2.0.2',
port: 8080,
pid: process.pid,
registry: registry,
heartbeat?: 60000,
} as ProviderInitOptions);
2. 为其添加微服务接⼝定义
class CUATOM_SERVICE {
xxx() {}
ddd() {}
}
provider.addService(CUATOM_SERVICE, {
interface: 'xxx',
version: 'x.x.x',
group; 'xxxx',
methods: ['xxx', 'ddd'],
timeout: 3000
} as ProviderServiceChunkInitOptions);
// ..,.
3. 启动服务⾃动注册到中⼼或者卸载服务
await provider.listen();
await provider.close();
如果你有zk的监控平台,那么你可以在平台上看到微服务已经被注册上去了。
消费者
消费者就是⽤来连接微服务,通过⽅法及参数来获得最终数据的。它通过ZK⾃动发现服务后连接服务,当服务被注销的时候也⾃动注销连接。当请求服务的时候有如下规则:
如果服务⽅法超时,将⾃动重试。
如果重试的时候,微服务提供者是多个,那么重试的时候将择优选择不同的相同微服务接⼝调⽤。
如果重试时候,微服务提供者只有⼀个,那么重试这个接⼝N次。
如果重试没有提供者,将报no prividers错误。
创建消费者对象:
import { Consumer } from 'dubbo.ts';
const consumer = new Consumer({
application: 'dist',
dubbo_version: '2.0.2',
pid: process.pid,
registry: registry,
});
await consumer.listen();
await consumer.close();
连接微服务获取数据
const invoker = ('com.mifa.stib.service.ProviderService');
const java = require('js-to-java');
type resultData = {
name: string,
age: number,
}
const result = await invoker.invoke<resultData>('testRpc', [
javabine('com.mifa.stibmon.RpcData', {
"name":"gxh",
"age":"18",
})
])
通过很简单的调⽤,我们就能获得微服务数据。NPM上所有消费者的设计都⼤同⼩异,有的还做了熔断处理。这⾥本架构没有做处理,⽤户可以根据需求⾃⾏完成。
架构成AOP模式
使⽤TS创建类似java的注解⾮常⽅便。⽆⾮利⽤Provider.addService的参数做⽂章,具体⽤户可以⾃⾏设计。这⾥我给⼤家看⼀个我司的最终使⽤例⼦(你也可以参考的注解设计来完成):
import { provide, inject } from 'injection';
import { rpc } from '@nelts/dubbo';
import { RPC_INPUT_SCHEMA, MIN_PROGRAM_TYPE, error } from '@node/com.stib.utils'; // 私有源上的包,参考时候可忽略功能
import WX from './wx';
import * as ioredis from 'ioredis';
@provide('User')
@rpc.interface('com.mifa.stib.service.User')
@rpc.version('1.0.0')
export default class UserService {
@inject('wx')
private wx: WX;
@inject('redis')
private redis: ioredis.Redis;
@hod
@rpc.middleware(OutputConsole)
login(req: RPC_INPUT_SCHEMA) {
switch (req.headers.platform) {
case MIN_PROGRAM_TYPE.WX:
if (de) return deSession(de);
return this.wx.jsLogin(req.data, req.headers.appName);
case MIN_PROGRAM_TYPE.WX_SDK: return this.wx.sdkLogin(de, req.headers.appName);
default: throw error('不⽀持的登录类型');
}
}
@hod
async status(req: RPC_INPUT_SCHEMA) {
if (!req.headers.userToken) throw error('401 Not logined', 401);
const value = (req.headers.userToken);
if (!value) throw error('401 Not logined', 401);
const user = dis.hgetall(value);
if (!value) throw error('401 Not logined', 401);
user.sex = Number(user.sex);
user.id = undefined;
user.unionid = undefined;
return user;
}
}
async function OutputConsole(ctx, next) {
console.log('in middleware');
await next()
}
我推荐使⽤midway.js的injection模块来设计服务间的IOC模型,这样更有利于开发与维护。
Swagger
⼀般来说,在java中,swagger是建⽴在微服务上的,spring的⼀整套swagger都是纯HTTP的。我参考了dubbo的swagger⽅式,觉得单服务单swagger的模式并不有利于开发者查阅,所以,我们约定了⼀种分布式swagger模式。在duboo.ts已内置。
微服务swagger⽅法,采⽤zookeeper⾃管理⽅案。通过微服务启动,收集interface与method信息上报到⾃定义zookeeper节点来完成数据上报。前端服务,可以通过读取这个节点信息来获得具体的接⼝与⽅法。
上报格式:
/swagger/{subject}/{interface}/exports/{base64 data}
url参数:
subject 总项⽬命名节点名
interface 接⼝名
base64 data 它是⼀个记录该接⼝下⽅法和参数的数组(最终base64化),见以下参数格式。
base64 data 参数详解:
type Base64DataType = {
description?: string, // 该接⼝的描述
group: string, // 组名如果没有组,请使⽤字符串`-`
version: string, // 版本名如果没有版本,请使⽤字符串 `0.0.0`
methods: [
{
name: string, // ⽅法名
summary?: string, // ⽅法描述,摘要
input: Array<{ $class: string, $schema: JSONSCHEMA; }>, // ⼊参
output: JSONSCHEMA // 出参
},
// ...
]
}
最终将数据base64后再进⾏encodeURIComponent操作,最后插⼊zookeeper的节点即可。
在Provider程序中,我们可以这样使⽤来发布到zookeeper:
import { SwaggerProvider, Provider } from 'dubbo.ts';
const swagger = new SwaggerProvider('subject name', provider as Provider);
await swagger.publish(); // 发布
await swagger.unPublish(); // 卸载
使⽤SwaggerConsumer调⽤分布式swgger后得到的数据。
import { SwaggerConsumer, Registry } from 'dubbo.ts';
const swgger = new SwaggerConsumer('subject name', registry as Registry);
const resultTree = ();
我们来看⼀个基于@nelts/dubbo的实例,在具体微服务的service上,我们可以这样写
import { provide, inject } from 'injection';
import { rpc } from '@nelts/dubbo';
import { RPC_INPUT_SCHEMA, MIN_PROGRAM_TYPE, error, RpcRequestParameter, RpcResponseParameter } from '@node/com.stib.utils'; import WX from './wx';
import * as ioredis from 'ioredis';
import Relations from './relations';
import { tableName as WxTableName } from '../tables/stib.user.wx';
@provide('User')
@rpc.interface('com.mifa.stib.service.UserService')
@rpc.version('1.0.0')
@rpc.description('⽤户中⼼服务接⼝')
export default class UserService {
@inject('wx')
private wx: WX;
@inject('redis')
private redis: ioredis.Redis;
@inject('relation')
private rel: Relations;
@hod
@rpc.summay('⽤户统⼀登录')

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。