WebRTC进阶流媒体服务器开发(五)Mediasoup源码分析之Mediasoup启动过
补充:JS部分和C++代码见
mediasoup C++:C++部分,⽤于处理流媒体传输,包括lib与worker两部分
--->lib:⼀些js⽂件组成,主要⽤于对mediasoup的管理⼯作
--->worker:C++核⼼代码
⼀:Mediasoup启动详解---JS部分
见:
(⼀)启动⽂件server.js
run();
async function run()
{// Run a mediasoup Worker.
await runMediasoupWorkers();  //启动进程
}
async function runMediasoupWorkers()
{
const { numWorkers } = diasoup;
for (let i = 0; i < numWorkers; ++i)  //创建进程,按照CPU核数
{
const worker = ateWorker(  //创建进程
{
logLevel  : diasoup.workerSettings.logLevel,
logTags    : diasoup.workerSettings.logTags,
rtcMinPort : MinPort),
rtcMaxPort : MaxPort)
});
<('died', () =>
{
<(
'mediasoup Worker died, exiting  in [pid:%d]', worker.pid);
setTimeout(() => it(1), 2000);
});
mediasoupWorkers.push(worker);  //放⼊数组中
      // Log worker resource usage every X seconds.
      setInterval(async () =>
      {
       const usage = ResourceUsage();  //定时去获取worker进程的信息----内部包含通过channel通信过程
       logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
     }, 120000);
}
}
(⼆)调⽤index.js中的createWorker⽅法创建进程
async function createWorker({ logLevel = 'error', logTags, rtcMinPort = 10000, rtcMaxPort = 59999, dtlsCertificateFile, dtlsPrivateKeyFile, appData = {} } = {}) {
logger.debug('createWorker()');
if (appData && typeof appData !== 'object')
throw new TypeError('if given, appData must be an object');
const worker = new Worker_1.Worker({
logLevel,
logTags,
rtcMinPort,
rtcMaxPort,
dtlsCertificateFile,
dtlsPrivateKeyFile,
appData
});
return new Promise((resolve, reject) => {
<('@success', () => {
// Emit observer event.
observer.safeEmit('newworker', worker);
resolve(worker);
});
<('@failure', reject);
});
}
(⼆)调⽤Worker.js中的Worker类创建进程
该⽂件存在于:
类⽅法概括:
class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter {
constructor()——构造函数
get pid()——获得Worker进程的ID
get closed()——确认Worker是否关闭
get appData()——返回custom的数据
set appData()——当设置⽆效时抛出异常信息
get observer()——开启观察者模式
close()——关闭Worker
async dump()——转存Worker
async getResourceUsage()——获得worker进程资源使⽤信息
async updateSettings()——更新设置
async createRouter()——创建房间
}
类主要⽅法分析:
const workerBin = v.MEDIASOUP_WORKER_BIN  //判断是否存在环境变量
: v.MEDIASOUP_BUILDTYPE === 'Debug'  //是否为debug
path.join(__dirname, '..', 'worker', 'out', 'Debug', 'mediasoup-worker')
: path.join(__dirname, '..', 'worker', 'out', 'Release', 'mediasoup-worker');  //查看该⽬录下的⽂件,是启动⽂件class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter {
constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }) {  //调⽤构造函数        super();
// Closed flag.
this._closed = false;
// Routers set.
this._routers = new Set();
// Observer instance.
this._observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
logger.debug('constructor()');
let spawnBin = workerBin;
let spawnArgs = [];
if (v.MEDIASOUP_USE_VALGRIND === 'true') {
spawnBin = v.MEDIASOUP_VALGRIND_BIN || 'valgrind';
if (v.MEDIASOUP_VALGRIND_OPTIONS) {
spawnArgs = v.MEDIASOUP_VALGRIND_OPTIONS.split(/\s+/));
}
spawnArgs.push(workerBin);
}
if (typeof logLevel === 'string' && logLevel)
spawnArgs.push(`--logLevel=${logLevel}`);
for (const logTag of (Array.isArray(logTags) ? logTags : [])) {
if (typeof logTag === 'string' && logTag)
spawnArgs.push(`--logTag=${logTag}`);
}
if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort))
spawnArgs.push(`--rtcMinPort=${rtcMinPort}`);
if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort))
spawnArgs.push(`--rtcMaxPort=${rtcMaxPort}`);
if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile)
spawnArgs.push(`--dtlsCertificateFile=${dtlsCertificateFile}`);
if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile)
spawnArgs.push(`--dtlsPrivateKeyFile=${dtlsPrivateKeyFile}`);
logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(''));
this._child = child_process_1.spawn(  //底层由libuv提供,⽤于启动进程
spawnBin,   //spawn将启动这个程序
spawnArgs,   //上⾯配置的其他参数
// options
{        //其他可选参数
env: {   //定义的环境
MEDIASOUP_VERSION: '3.7.11',  //版本
...v
},
detached: false,  //启动的进程,与node是否是分离开的---当关闭node之后,是否关闭其他⼦进程
        // fd 0 (stdin)  : Just ignore it.
       // fd 1 (stdout)  : Pipe it for 3rd libraries that log their own stuff.
       // fd 2 (stderr)  : Same as stdout.
       // fd 3 (channel) : Producer Channel fd.
        // fd 4 (channel) : Consumer Channel fd.
       // fd 5 (channel) : Producer PayloadChannel fd.
       // fd 6 (channel) : Consumer PayloadChannel fd.
stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'],  //输⼊输出重定向,主要通过管道通信。⽤于C++与JS进⾏通信            windowsHide: true
});
this._pid = this._child.pid;
     //下⾯两个Channel与C++进⾏通信,可以查看后⾯C++代码,发现两个个数、类型⼀致this._channel = new Channel_1.Channel({
producerSocket: this._child.stdio[3],
consumerSocket: this._child.stdio[4],
pid: this._pid
});
this._payloadChannel = new PayloadChannel_1.PayloadChannel({
// NOTE: TypeScript does not like more than 5 fds.
// @ts-ignore
producerSocket: this._child.stdio[5],
// @ts-ignore
consumerSocket: this._child.stdio[6]
});
this._appData = appData;
let spawnDone = false;
// Listen for 'running' notification.
this._(String(this._pid), (event) => {
if (!spawnDone && event === 'running') {
spawnDone = true;
logger.debug('worker process running [pid:%s]', this._pid);
}
});
this._('exit', (code, signal) => {
this._child = undefined;
this.close();
if (!spawnDone) {
spawnDone = true;
if (code === 42) {
<('worker process failed due to wrong settings [pid:%s]', this._pid);
}
else {
<('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
}
}
else {
<('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
this.safeEmit('died', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`));
}
});
this._('error', (error) => {
this._child = undefined;
this.close();
if (!spawnDone) {
spawnDone = true;
<('worker process failed [pid:%s]: %s', this._pid, ssage);
}
else {
<('worker process error [pid:%s]: %s', this._pid, ssage);
this.safeEmit('died', error);
}
});
// Be ready for 3rd party worker libraries logging to stdout.
this._('data', (buffer) => {
for (const line String('utf8').split('\n')) {
if (line)
workerLogger.debug(`(stdout) ${line}`);
}
});
// In case of a worker bug, mediasoup will log to stderr.
this._('data', (buffer) => {
for (const line String('utf8').split('\n')) {
if (line)
<(`(stderr) ${line}`);
}
});
react router源码
}
close() {
if (this._closed)
return;
logger.debug('close()');
this._closed = true;
// Kill the worker process.
if (this._child) {
// Remove event listeners but leave a fake 'error' hander to avoid
// propagation.
this._veAllListeners('exit');
this._veAllListeners('error');
this._('error', () => { });
this._child.kill('SIGTERM');
this._child = undefined;
}
// Close the Channel instance.
this._channel.close();
// Close the PayloadChannel instance.
this._payloadChannel.close();
// Close every Router.
for (const router of this._routers) {
router.workerClosed();
}
this._routers.clear();
// Emit observer event.
this._observer.safeEmit('close');
}
async createRouter({ mediaCodecs, appData = {} } = {}) {
logger.debug('createRouter()');
if (appData && typeof appData !== 'object')
throw new TypeError('if given, appData must be an object');
// This may throw.
const rtpCapabilities = ateRouterRtpCapabilities(mediaCodecs);
const internal = { routerId: uuid_1.v4() };
await this._quest('ateRouter', internal);
const data = { rtpCapabilities };
const router = new Router_1.Router({
internal,
data,
channel: this._channel,
payloadChannel: this._payloadChannel,
appData
});
this._routers.add(router);
<('@close', () => this._routers.delete(router));
// Emit observer event.
this._observer.safeEmit('newrouter', router);
return router;
}
}
exports.Worker = Worker;
⾄此,在JS部分实现了多进程的启动
⼆:Mediasoup启动详解---C++部分
下⾯分析进程中实现的主要业务:
(⼀)进⼊C++源⽂件⽬录
(⼆)分析main⽅法
static constexpr int ConsumerChannelFd{ 3 };
static constexpr ProducerChannelFd{ 4
static constexpr 5 };
static constexpr  };
int main(int argc, char* argv[])
{
// Ensure we are called by our Node library.
if (!std::getenv("MEDIASOUP_VERSION"))  //先检测版本问题,如果为null,则表⽰不是由nodejs产⽣的,⽽是进⼊⽬录中使⽤命令⾏直接产⽣的(但是没有设置环境变量是⽆法启动的)    {
MS_ERROR_STD("you don't seem to be my real father!");
std::_Exit(EXIT_FAILURE);
}
std::string version = std::getenv("MEDIASOUP_VERSION");  //获取版本信息
auto statusCode = run_worker(  //开始运⾏进程
argc,
argv,
version.c_str(),
ConsumerChannelFd,
switch (statusCode)  //判断返回码
{
case0:
std::_Exit(EXIT_SUCCESS);
case1:
std::_Exit(EXIT_FAILURE);
case42:
std::_Exit(42);
}
}
(三)分析run_worker⽅法,运⾏worker进程
extern"C"int run_worker(
char* argv[],
const char* version,
int consumerChannelFd,
int producerChannelFd,
int payloadConsumeChannelFd,
int payloadProduceChannelFd)
{
// Initialize libuv stuff (we need it for the Channel).
DepLibUV::ClassInit();  //使⽤libUV,进⾏全局初始化
// Channel socket (it will be handled and deleted by the Worker).
Channel::ChannelSocket* channel{ nullptr };  //通道socket,与前⾯的
// PayloadChannel socket (it will be handled and deleted by the Worker).
PayloadChannel::PayloadChannelSocket* payloadChannel{ nullptr }; 
try
catch (const MediaSoupError& error)
{
MS_ERROR_STD("error creating the Channel: %s", error.what());
return1;
}
try
{
payloadChannel =
new PayloadChannel::PayloadChannelSocket(payloadConsumeChannelFd, payloadProduceChannelFd);  //创建通道    }
catch (const MediaSoupError& error)
{
MS_ERROR_STD("error creating the RTC Channel: %s", error.what());
return1;
}
// Initialize the Logger.
Logger::ClassInit(channel);  //初始化⽇志
try
{
Settings::SetConfiguration(argc, argv);
}
catch (const MediaSoupTypeError& error)
{
MS_ERROR_STD("settings error: %s", error.what());
// 42 is a custom exit code to notify "settings error" to the Node library.
return42;
}
catch (const MediaSoupError& error)
{
MS_ERROR_STD("unexpected settings error: %s", error.what());
return1;
}
MS_DEBUG_TAG(info, "starting mediasoup-worker process [version:%s]", version);
#if defined(MS_LITTLE_ENDIAN)
MS_DEBUG_TAG(info, "little-endian CPU detected");
#elif defined(MS_BIG_ENDIAN)
MS_DEBUG_TAG(info, "big-endian CPU detected");
#else
MS_WARN_TAG(info, "cannot determine whether little-endian or big-endian");
#endif
#if defined(INTPTR_MAX) && defined(INT32_MAX) && (INTPTR_MAX == INT32_MAX)
MS_DEBUG_TAG(info, "32 bits architecture detected");
#elif defined(INTPTR_MAX) && defined(INT64_MAX) && (INTPTR_MAX == INT64_MAX)
MS_DEBUG_TAG(info, "64 bits architecture detected");
#else
MS_WARN_TAG(info, "cannot determine 32 or 64 bits architecture");
#endif
Settings::PrintConfiguration();
DepLibUV::PrintVersion();
try
{
// Initialize static stuff.  初始化静态事务
DepOpenSSL::ClassInit();
DepLibSRTP::ClassInit();
DepUsrSCTP::ClassInit();
DepLibWebRTC::ClassInit();
Utils::Crypto::ClassInit();
RTC::DtlsTransport::ClassInit();
RTC::SrtpSession::ClassInit();
Channel::ChannelNotifier::ClassInit(channel);
PayloadChannel::PayloadChannelNotifier::ClassInit(payloadChannel);
#ifdef MS_EXECUTABLE
{
// Ignore some signals.
IgnoreSignals();
}
#endif
// Run the Worker.
Worker worker(channel, payloadChannel);  //运⾏具体worker
// Free static stuff.
DepLibSRTP::ClassDestroy();
Utils::Crypto::ClassDestroy();
DepLibWebRTC::ClassDestroy();
RTC::DtlsTransport::ClassDestroy();
DepUsrSCTP::ClassDestroy();
DepLibUV::ClassDestroy();
// Wait a bit so pending messages to stdout/Channel arrive to the Node
// process.
uv_sleep(200);
return0;
}
catch (const MediaSoupError& error)
{
MS_ERROR_STD("failure exit: %s", error.what());
}
}
三:进程间通信
(⼀)常见通信⽅法
管道:匿名管道进程之间必须是⽗⼦关系,有名管道可以应⽤于⾮⽗⼦关系的进程之间通信(⽐如socket.XX)
socket:远端和本地通信可以通过socket通信,那么必然进程之间可以通讯。
共享内存和信号:对应数据和事件
...
(⼆)匿名管道(半双⼯)
管道创建(含两个⽂件描述符,⽤于读、写)时机,在创建⼦进程之前创建管道:
创建⼦进程:会拷贝⽂件描述符,3读,4写,两个进程同时写⼊、读取混乱!,所以需要关闭部分不想要的操作(锁
关闭部分描述符,实现半双⼯,⽗⼦进程通信
(三)socket(全双⼯)--注意箭头
对于socket,由于是全双⼯,所以进程通信之间的缓冲区与进程之间通信使⽤双向箭头:
同上⾯匿名管道所说,依旧混乱,所以我们依旧需要关闭部分描述符:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。