0%

Pion

Pion是什么

golang实现的webrtc。

webrtc peerconnection

webrtc里最基本的模块就是peerconnection,代码里经常缩写成pc。

peerconnection对外提供的接口有一部分是On开头的,用于在peerconnection内部发生某事件的回调接口。

使用默认配置新建pc

1
pc, err := webrtc.NewPeerConnection(config)

或使用自定义配置新建pc

1
peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)).NewPeerConnection(config)

NewAPI接口设计使用Function Option的编程模式,扩展性好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// webrtc可自定义的引擎和调用链,都定义为私有,只能在初始化时从外部指定
type API struct {
settingEngine *SettingEngine // 和ICE,NAT,DTLS等相关的功能,几乎都是标准流程,可定制化的内容很少
mediaEngine *MediaEngine // 和pc能力相关的参数,比如codec,extension等
interceptorRegistry *interceptor.Registry // 和RTP/RTCP包处理相关的功能,比如JitterBuffer,NACK,TWCC等

interceptor interceptor.Interceptor // Generated per PeerConnection
}
// 对外初始化接口,可灵活扩展
func NewAPI(options ...func(*API)) *API {
a := &API{interceptor: &interceptor.NoOp{}}

for _, o := range options {
o(a)
}
...
}
// 外部可自定义实例化这个成员
func WithInterceptorRegistry(interceptorRegistry *interceptor.Registry) func(a *API) {
return func(a *API) {
a.interceptorRegistry = interceptorRegistry
}
}

TURN

TURN服务器兼容了STUN服务器的功能,即打洞和中继服务器。

STUN消息类型:

  • Request 0x00
  • Indication 0x01
  • Success Response 0x02
  • Error Response 0x03

STUN消息方法:

  • Binding 0x001
  • Allocate 0x003
  • Refresh 0x004
  • Send 0x006
  • Data 0x007
  • CreatePermission 0x008
  • ChannelBind 0x009

turn server实现了一个allocation manager用于管理已授权的client信息,用作管理client的长效鉴权和分配中继信息。

请求分配中继

TURN client和server之间可选择UDP,TCP(包括TLS)传输STUN消息,TURN server和peer之间一般是UDP传输DATA。

数据交换之转发机制

使用Indication标识转发数据,Send和Data不支持长效验证,因此需要先获得permission再开始indication发送数据。一旦permission建立,信任这个address发来的数据,时效5min,且无法通过收发数据刷新时效。

webrtc使用indication做连通性测试,即Ping。

数据交换之信道机制

send/data indication添加在stun的应用层的头部,需要36字节的开销,在有些带宽资源敏感的场景,比如VoIP,就显著增加了带宽。turn提供了ChannelData消息格式做数据交换。

ChannelData不使用stun应用头部,而是使用4字节的信道号做标识。client在request消息带上未绑定的信道编号和地址信息给server,server若同意绑定,client就可以使用这个信道编号发送ChannelData消息给目的peer,server也可以通过这个信道转发数据给client。

channel时效10min,ChannelData消息或重新绑定channel到peer都可以刷新channel的时效,但是它只能过期,不能像indication通过设置lifetime=0立即失效。

保活机制

中继分配好之后,中继信息存在server的allocation map里,客户端需要发送refresh消息进行保活,refresh消息也必须带上鉴权信息。
server每次收到新的refresh请求,更新lifetime,给allocation续期。

参考

https://tools.ietf.org/id/draft-ietf-behave-turn-08.html

ICE

interceptor

处理RTP/RTCP流的框架,它定义了一套处理数据包的interface,实例化函数接口必须按照指定格式进行调用,这个接口定义为Interceptor

  • BindRTCPReaderBindRTCPWriter 处理incoming和outgoing的RTCP包。

  • BindLocalStreamUnbindLocalStream 处理outgoing的RTP包。

  • BindRemoteStreamUnbindRemoteStream 处理incoming的RTP包。

chain结构体把interceptor串行在一起,并保证interceptor的执行顺序。

streamInfo是处理媒体流的上下文,用于在interceptor之间传递信息。

对外提供registry结构体,在chain上的又一层封装,暴露Add和Build接口。

它提供了一个type NoOp struct{}的结构体,任何需要实例化interceptor的结构体都可以继承它。

NACK interceptor的实现

GeneratorInterceptor receiver的实现

GeneratorInterceptor 是NACK interceptor对外提供的接收端接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
type GeneratorInterceptor struct {
interceptor.NoOp // 继承interceptor
size uint16 // receiveLog参数
skipLastN uint16 // receiveLog参数
interval time.Duration // 发送RTCP包间隔
m sync.Mutex // 这个结构体的锁
wg sync.WaitGroup // RTCP writer loop结束才可以关闭的控制
close chan struct{} // 控制关闭
log logging.LeveledLogger

receiveLogs map[uint32]*receiveLog // 记录了接收到的sequence number, key为SSRC
receiveLogsMu sync.Mutex // receiveLog的map锁
}
状态控制

实例化了interceptor的 UnbindLocalStreamBindRemoteStream 用于处理收到的RTP包, 在收到RTP包时将seq num记录在对应SSRC的receiveLog里。
实例化了interceptor的 BindRTCPWriter 用于反馈RTCP包,这里起了一个goroutine异步处理, 按定时器间隔来检查receiveLog。

receiveLog实现

receiveLog是interceptor里实现NACK的重要结构体,它记录了一段连续的包是否收到的情况。

1
2
3
4
5
6
7
8
type receiveLog struct {
packets []uint64 // 记录RTP每个包是否收到的bitmap,使用uint64可标识64个包,因此size必须为64的整数倍
size uint16 // 可记录连续的RTP包的总数量
end uint16 // 记录收到的最后一个seq
started bool // 第一个包,初始化上下文
lastConsecutive uint16 // 记录收到的最后一个连续的seq,查找丢失包时,遍历它到end之间即可
m sync.RWMutex // 并发锁
}

使用uint64数组当bitmap,记录收到的RTP包,收到时将bit置1,删除时置0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *receiveLog) setReceived(seq uint16) {
pos := seq % s.size
s.packets[pos/64] |= 1 << (pos % 64)
}

func (s *receiveLog) delReceived(seq uint16) {
pos := seq % s.size
s.packets[pos/64] &^= 1 << (pos % 64)
}

func (s *receiveLog) getReceived(seq uint16) bool {
pos := seq % s.size
return (s.packets[pos/64] & (1 << (pos % 64))) != 0
}

每收到一个包,刷新连续的最后一个位置lastConsecutive和记录seq最大的那个位置end
以便在间隔时间检查需要重传包时,只需遍历从lastConsecutiveend之间的那段bit是否为0即可。

1
2
3
4
5
6
7
func (s *receiveLog) fixLastConsecutive() {
i := s.lastConsecutive + 1
for ; i != s.end+1 && s.getReceived(i); i++ {
// find all consecutive packets
}
s.lastConsecutive = i - 1
}

由于bitmap的size标识的是一段时间内需要重传的RTP包,每收到一个新的seq时,需要判断这段buffer是否翻转,并清空从end到seq之间的记录。

优点

算法简单,使用bitmap记录seq,节省内存空间并且查找高效。

缺点

定时器触发NACK请求,重传间隔受定时器间隔影响,实时性不高。
没有根据rtt评估是否需要请求重传,在乱序时导致误重传。
没有根据retries重试次数来做策略,可能会在网络拥塞严重时一直请求重传,进一步加大了网络拥塞程度。

ResponderInterceptor sender的实现

ResponderInterceptor 是NACK interceptor对外提供的发送端接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type ResponderInterceptor struct {
interceptor.NoOp // 继承interceptor
size uint16 // sendBuffer缓存包的数量
log logging.LeveledLogger
packetFactory packetFactory // 提供可定制化packet管理接口,也有默认的packetManager

streams map[uint32]*localStream // 缓存每个SSRC的包队列
streamsMu sync.Mutex // streams的map锁
}

type localStream struct {
sendBuffer *sendBuffer // 发送缓存
rtpWriter interceptor.RTPWriter // 注册用于发送重传包的回调函数
}

缓存每个SSRC的发送包,并且注册一个回调函数,用于发送重传包。
当收到RTCP NACK请求时,在缓存中查找需要重传的包,并发送给对方。

sendBuffer实现

sendBuffer是发送端缓存,存储最近发送的包,参数size指定缓存的大小。size只能为2的幂次方倍。

1
2
3
4
5
6
7
8
type sendBuffer struct {
packets []*retainablePacket
size uint16
lastAdded uint16
started bool

m sync.RWMutex
}

TWCC interceptor的实现

SR/RR interceptor的实现

ion-sfu

其他

通过设置环境变量改变pion log的日志级别,例如: export PION_LOG_TRACE=all