0%

Pion是什么

golang实现的webrtc。

webrtc peerconnection

webrtc里最基本的模块就是peerconnection,代码里经常缩写成pc。

peerconnection对外提供的接口有一部分是On开头的,用于在peerconnection内部发生某事件的回调接口。

使用默认配置新建pc

1
pc, err := webrtc.NewPeerConnection(config)

或使用自定义配置新建pc

1
peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)).NewPeerConnection(config)

NewAPI接口设计使用Function Option的编程模式,扩展性好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// webrtc可自定义的引擎和调用链,都定义为私有,只能在初始化时从外部指定
type API struct {
settingEngine *SettingEngine // 和ICE,NAT,DTLS等相关的功能,几乎都是标准流程,可定制化的内容很少
mediaEngine *MediaEngine // 和pc能力相关的参数,比如codec,extension等
interceptorRegistry *interceptor.Registry // 和RTP/RTCP包处理相关的功能,比如JitterBuffer,NACK,TWCC等

interceptor interceptor.Interceptor // Generated per PeerConnection
}
// 对外初始化接口,可灵活扩展
func NewAPI(options ...func(*API)) *API {
a := &API{interceptor: &interceptor.NoOp{}}

for _, o := range options {
o(a)
}
...
}
// 外部可自定义实例化这个成员
func WithInterceptorRegistry(interceptorRegistry *interceptor.Registry) func(a *API) {
return func(a *API) {
a.interceptorRegistry = interceptorRegistry
}
}

TURN

TURN服务器兼容了STUN服务器的功能,即打洞和中继服务器。

STUN消息类型:

  • Request 0x00
  • Indication 0x01
  • Success Response 0x02
  • Error Response 0x03

STUN消息方法:

  • Binding 0x001
  • Allocate 0x003
  • Refresh 0x004
  • Send 0x006
  • Data 0x007
  • CreatePermission 0x008
  • ChannelBind 0x009

turn server实现了一个allocation manager用于管理已授权的client信息,用作管理client的长效鉴权和分配中继信息。

请求分配中继

TURN client和server之间可选择UDP,TCP(包括TLS)传输STUN消息,TURN server和peer之间一般是UDP传输DATA。

数据交换之转发机制

使用Indication标识转发数据,Send和Data不支持长效验证,因此需要先获得permission再开始indication发送数据。一旦permission建立,信任这个address发来的数据,时效5min,且无法通过收发数据刷新时效。

webrtc使用indication做连通性测试,即Ping。

数据交换之信道机制

send/data indication添加在stun的应用层的头部,需要36字节的开销,在有些带宽资源敏感的场景,比如VoIP,就显著增加了带宽。turn提供了ChannelData消息格式做数据交换。

ChannelData不使用stun应用头部,而是使用4字节的信道号做标识。client在request消息带上未绑定的信道编号和地址信息给server,server若同意绑定,client就可以使用这个信道编号发送ChannelData消息给目的peer,server也可以通过这个信道转发数据给client。

channel时效10min,ChannelData消息或重新绑定channel到peer都可以刷新channel的时效,但是它只能过期,不能像indication通过设置lifetime=0立即失效。

保活机制

中继分配好之后,中继信息存在server的allocation map里,客户端需要发送refresh消息进行保活,refresh消息也必须带上鉴权信息。
server每次收到新的refresh请求,更新lifetime,给allocation续期。

参考

https://tools.ietf.org/id/draft-ietf-behave-turn-08.html

ICE

interceptor

处理RTP/RTCP流的框架,它定义了一套处理数据包的interface,实例化函数接口必须按照指定格式进行调用,这个接口定义为Interceptor

  • BindRTCPReaderBindRTCPWriter 处理incoming和outgoing的RTCP包。

  • BindLocalStreamUnbindLocalStream 处理outgoing的RTP包。

  • BindRemoteStreamUnbindRemoteStream 处理incoming的RTP包。

chain结构体把interceptor串行在一起,并保证interceptor的执行顺序。

streamInfo是处理媒体流的上下文,用于在interceptor之间传递信息。

对外提供registry结构体,在chain上的又一层封装,暴露Add和Build接口。

它提供了一个type NoOp struct{}的结构体,任何需要实例化interceptor的结构体都可以继承它。

NACK interceptor的实现

GeneratorInterceptor receiver的实现

GeneratorInterceptor 是NACK interceptor对外提供的接收端接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
type GeneratorInterceptor struct {
interceptor.NoOp // 继承interceptor
size uint16 // receiveLog参数
skipLastN uint16 // receiveLog参数
interval time.Duration // 发送RTCP包间隔
m sync.Mutex // 这个结构体的锁
wg sync.WaitGroup // RTCP writer loop结束才可以关闭的控制
close chan struct{} // 控制关闭
log logging.LeveledLogger

receiveLogs map[uint32]*receiveLog // 记录了接收到的sequence number, key为SSRC
receiveLogsMu sync.Mutex // receiveLog的map锁
}
状态控制

实例化了interceptor的 UnbindLocalStreamBindRemoteStream 用于处理收到的RTP包, 在收到RTP包时将seq num记录在对应SSRC的receiveLog里。
实例化了interceptor的 BindRTCPWriter 用于反馈RTCP包,这里起了一个goroutine异步处理, 按定时器间隔来检查receiveLog。

receiveLog实现

receiveLog是interceptor里实现NACK的重要结构体,它记录了一段连续的包是否收到的情况。

1
2
3
4
5
6
7
8
type receiveLog struct {
packets []uint64 // 记录RTP每个包是否收到的bitmap,使用uint64可标识64个包,因此size必须为64的整数倍
size uint16 // 可记录连续的RTP包的总数量
end uint16 // 记录收到的最后一个seq
started bool // 第一个包,初始化上下文
lastConsecutive uint16 // 记录收到的最后一个连续的seq,查找丢失包时,遍历它到end之间即可
m sync.RWMutex // 并发锁
}

使用uint64数组当bitmap,记录收到的RTP包,收到时将bit置1,删除时置0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *receiveLog) setReceived(seq uint16) {
pos := seq % s.size
s.packets[pos/64] |= 1 << (pos % 64)
}

func (s *receiveLog) delReceived(seq uint16) {
pos := seq % s.size
s.packets[pos/64] &^= 1 << (pos % 64)
}

func (s *receiveLog) getReceived(seq uint16) bool {
pos := seq % s.size
return (s.packets[pos/64] & (1 << (pos % 64))) != 0
}

每收到一个包,刷新连续的最后一个位置lastConsecutive和记录seq最大的那个位置end
以便在间隔时间检查需要重传包时,只需遍历从lastConsecutiveend之间的那段bit是否为0即可。

1
2
3
4
5
6
7
func (s *receiveLog) fixLastConsecutive() {
i := s.lastConsecutive + 1
for ; i != s.end+1 && s.getReceived(i); i++ {
// find all consecutive packets
}
s.lastConsecutive = i - 1
}

由于bitmap的size标识的是一段时间内需要重传的RTP包,每收到一个新的seq时,需要判断这段buffer是否翻转,并清空从end到seq之间的记录。

优点

算法简单,使用bitmap记录seq,节省内存空间并且查找高效。

缺点

定时器触发NACK请求,重传间隔受定时器间隔影响,实时性不高。
没有根据rtt评估是否需要请求重传,在乱序时导致误重传。
没有根据retries重试次数来做策略,可能会在网络拥塞严重时一直请求重传,进一步加大了网络拥塞程度。

ResponderInterceptor sender的实现

ResponderInterceptor 是NACK interceptor对外提供的发送端接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type ResponderInterceptor struct {
interceptor.NoOp // 继承interceptor
size uint16 // sendBuffer缓存包的数量
log logging.LeveledLogger
packetFactory packetFactory // 提供可定制化packet管理接口,也有默认的packetManager

streams map[uint32]*localStream // 缓存每个SSRC的包队列
streamsMu sync.Mutex // streams的map锁
}

type localStream struct {
sendBuffer *sendBuffer // 发送缓存
rtpWriter interceptor.RTPWriter // 注册用于发送重传包的回调函数
}

缓存每个SSRC的发送包,并且注册一个回调函数,用于发送重传包。
当收到RTCP NACK请求时,在缓存中查找需要重传的包,并发送给对方。

sendBuffer实现

sendBuffer是发送端缓存,存储最近发送的包,参数size指定缓存的大小。size只能为2的幂次方倍。

1
2
3
4
5
6
7
8
type sendBuffer struct {
packets []*retainablePacket
size uint16
lastAdded uint16
started bool

m sync.RWMutex
}

TWCC interceptor的实现

SR/RR interceptor的实现

ion-sfu

其他

通过设置环境变量改变pion log的日志级别,例如: export PION_LOG_TRACE=all

HTTP推送技术 Push technology

传统的HTTP请求都是client主动发起到server的请求,若server有通知需要即时通知client时,需要推送技术。

常规轮询 Regular Polling

每隔一段时间pull/get消息。

  • 没有消息时浪费资源。
  • 有消息时不能即时推送。

长轮询 Long Polling

  1. 请求

    client发起请求并把keepalive时间设的较长,服务器收到消息并不立即返回,而是hold一段时间。

  2. 响应

    当有消息推送时,server立即返回消息。或server的connection hang timeout时,返回消息。server返回消息后,将连接关闭。

  3. 循环

    client收到response后,应立即再次发起请求。

Long Polling vs Websocket

Long Polling并不是一种全新的协议,而是基于XMLHttpRequest实现了一种推送机制。最大的优点是过渡期的兼容性好,client无需新增协议,容易适配。
同个client若发起多个request连接,消息只返回一次,应通过本地缓存共享推送消息。

websocket协议相对较晚出现,需要client和server端都支持websocket协议,尽管它的握手还是基于HTTP协议,但它在HTTP协议上实现了全双工通信。

websocket几乎解决了Long Polling的所有缺点,只是被断开后不能自恢复。

Apollo在Long polling的实践

Apollo是携程开源的配置中心,提供了带缓存和不带缓存的pull接口,和long polling即时变更推送接口。

为了避免long polling机制的缺陷,使用了版本号等方案。

  • Long polling获取的消息时,需要带上本地缓存的notificationId,server端判断client带过来的notificationId是否为最新,若需要更新则立即返回消息,否则等配置有更新才会返回,返回消息携带notificationId,client端更新缓存。这样做的好处是,即使中间断开连接,恢复连接后也可以对比本地是否需要更新配置信息。

  • client使用pull接口获取配置时,可填参数IP表示client的唯一Id,当需要灰度推送时,根据client携带的IP判断是否给它灰度配置。这样做的好处是,使用逻辑上的Id(IP),相同IP的client可以获取到相同的配置。

  • client使用pull接口获取配置时,可填参数releaseKey。Apollo每次发布配置时,会为新版本的配置生成releaseKey,使用常规轮询时server端无需每次都查表,可根据releaseKey判断是否需要读取配置。client也无需每次都更新本地缓存的配置。

  • 流程图不需要考虑很严谨的逻辑,因为有版本号,即使中间有异常,下次轮询的时候也可以恢复。

  • 做好降级策略,定期轮询,防止路由策略等导致的long polling服务不可用。

refer

https://ably.com/blog/websockets-vs-long-polling

mediasoup

基本概念

v3版本源码实现了SFU的基本转发功能,由C++部分的worker和TS部分的信令组成。这两部分之间用Unix domain socket通信,是进程之间的全双工通信方式,基于文件系统,不需要走协议栈,因此必须同机部署。

peer

逻辑层抽象,代表了通话的成员。

room

逻辑层通常和router绑定,代表了通话的房间。

router

模型层代表了媒体流转发的单元,一个router内包含了transport, producer, consumer这些模型的实例。
记录了transport和流的包含关系,还有producer和consumer的映射关系,主要负责从transport中获取数据包和producer/consumer的流媒体转发。

transport

数据层抽象,代表了一路socket层面的数据流,可以承载多个RTP stream。通常一个典型的通话里,上行和下行需要分别建立两个socket连接,就是两个transport。客户端与mediasoup之间的transport通常是webrtcTransport。mediasoup之间的routers也可以建立transport实现级联关系,通常是pipeTransport。发给GStreamer和ffmepg的为plainTransport

基于socket的webrtcTransport的ICE只支持Lite模式,一方面是因为作者并非将它设计为商业用途,另一方面mediasoup的STUN把自己也当成一个host,简单实现了协议功能。

producer

媒体流的生产者。

consumer

媒体流的消费者。

源码结构

mediasoup

实现协议

WebRTC实现连接的ICE和能力协商的SDP都属于描述性协议,并不严格规定具体的实现。因此信令层面由mediasoup官方提供的democlient-cpp,client-js等实现具体基于webrtc的调用,这部分代码主要在demo。数据层则是实现了RTP/RTCP协议的基本传输和反馈能力,这部分代码主要在worker。client-cpp提供了调用webrtc-native的API,client-js则是调用支持webrtc的浏览器端提供的API。client和demo/worker通过私有信令和标准协议通信实现了基本的RTC功能。

动手实现一个Mediasoup client

使用pion提供的webrtc库实现一个可以对接mediasoup的client小工具,使用的语言主要为golang。

建立媒体传输通道

因为mediasoup把SDP拆成了几条信令在websocket上协商,因此需要把SDP中的字段拆解成所需要的字段封装在client端和给mediasoup server。

ICE

从webrtc transport信令中拿到candidate列表,因为mediasoup是lite的ICE,client端是controlling,mediasoup是controlled,只需要从client去建立连接即可。

1
2
3
4
5
6
7
8
9
10
11
12

// 创建ice agent
iceAgent, err := ice.NewAgent(&ice.AgentConfig{
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
})

// 把candidate加入ice的列表里
iceAgent.AddRemoteCandidate(candidate)

// 建立连接
iceConn, err := iceAgent.Dial(context.Background(), rsp.Answer.IceParameters.UsernameFragment, rsp.Answer.IceParameters.Password)

至此,和webrtctransport的ice连接就建立了一半,接下来在ice的基础上做dtls密钥交换。

DTLS

mediasoup支持双向的dtls连接,dtls之后将密钥导出到ice的connection,对rtp的payload进行加密,媒体数据走的是srtp标准协议。

生成dtls证书和指纹

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

// 不使用pem证书文件,自签名生成证书,对证书计算摘要
var tlsCerts []tls.Certificate
certificate, err := selfsign.GenerateSelfSigned()
if err != nil {
return mediasoupFPs, tlsCerts, err
}
x509cert, err := x509.ParseCertificate(certificate.Certificate[0])
if err != nil {
return mediasoupFPs, tlsCerts, err
}
actualSHA256, err := fingerprint.Fingerprint(x509cert, crypto.SHA256)
if err != nil {
return mediasoupFPs, tlsCerts, err
}

将dtls指纹参数通过信令发送给mediasoup,然后进行dtls连接即可,dtls可使用server也可以使用client,通过信令参数均可调整。

1
2
3
4
5
6
7
8
9
10
11
12
13

config := &dtls.Config{
Certificates: tlsCerts,
ExtendedMasterSecret: dtls.RequireExtendedMasterSecret,
// Create timeout context for accepted connection.
ConnectContextMaker: func() (context.Context, func()) {
return context.WithTimeout(context.Background(), 30*time.Second)
},
SRTPProtectionProfiles: []dtls.SRTPProtectionProfile{dtls.SRTP_AES128_CM_HMAC_SHA1_80},
}

dtlsConn, err := dtls.Server(iceConn, config) // or Client

至此,媒体通道webrtc transport已经建立。

媒体传输

srtp不需要对整个数据包进行加密,因此收发数据仍然是在ice的connection上进行,但是将dtls的密钥导出用于加密每一个rtp的payload。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

// 导出dtls密钥
srtpConfig := &srtp.Config{
Profile: srtp.ProtectionProfileAes128CmHmacSha1_80,
}

connState := dtlsConn.ConnectionState()
if err := srtpConfig.ExtractSessionKeysFromDTLS(&connState, false); err != nil {
return nil, fmt.Errorf("errDtlsKeyExtractionFailed: %v", err)
}

// 建立srtp session
srtpSession, err := srtp.NewSessionSRTP(iceConn, srtpConfig)

// 建立srtp读写handler
rtpWriteStream, err := srtpSession.OpenWriteStream()

// 建立srtcp session
srtcpSession, err := srtp.NewSessionSRTCP(iceConn, srtpConfig)

// 建立srtcp读写handler
rtcpReadStream, err := srtcpSession.OpenReadStream(opt.SSRC)

将RTP包的header和payload分别填入handler即可在之前建立的媒体通道上进行收发媒体。

参考资料

源码实现

官网

官方讨论组

golang实现信令层

RTC

RTC Server

流媒体服务器

媒体数据的传输,实现实时传输的最基本功能。

信令服务器

管理多路用户的逻辑,将多个通信用户组建房间进行权限控制和资源管理。

打洞服务器

为媒体传输提供更丰富的传输通道。

其他

CDN

提供就近接入和媒体缓存。

专线

复杂网络情况中提升各运营商之间传输质量。

RTC

NAT

NAT解决了IPv4资源不足问题,将端口和IP隐藏在内网,从而提高安全性

  1. 完全锥型
  2. IP限制型
  3. 端口限制型
  4. 对称型

STUN

组网的网关就像一道墙,阻隔着外部host和内网host直接访问。身处两个NAT后面的host不得不借助于打洞服务器来探测和协助两个host相互访问。因此,以上4种类型的组合情况有12种。

完全锥形比较容易理解,就是一个四元组:

 {
   内网IP,
   内网端口,
   映射的外网IP,
   映射的外网端口
 }
 

这个映射四元组就是墙上的“洞”,NAT通过查找这个映射表来转发数据。完全锥形没有任何限制,内网的任何host都可以通过这个“洞”和外界通信。

IP限制型,多加了一个元组:

被访问主机的IP

即限制了内网其他IP的host不能通过这个洞。

端口限制型,又多加了一个元组:

被访问主机的端口

即限制了内网同个host的其他端口不能通过这个洞。

最复杂和多变的对称型,对于同一个外网host访问内网host1和host2,它映射的IP和端口都是变化的,因此:对称型—对称型对称型—端口限制型这两种情况在STUN中是无法穿越的。

NAT穿越为媒体传输通道提供了直连的选择,同一局域网应该首选直连通道。其他情况可根据业务需求和质量情况来判断选择哪种通道。

多对多通信

Mesh方案

思想基于一对一方案,将每个host之间都建立一个连接,且每个host之间都能打通直连通道。

局限性:

  1. 现实场景中,不一定每个host之间都能建立直连通道
  2. 共享媒体流的时候,将给每个对端发送一份数据,上行带宽很大

MCU方案

Multipoint Conferencing Unit

服务器进行多路混流和重新编码,视频会议场景等,对服务器硬件要求高。
优势很明显,场景体验好,节省带宽资源。
局限性也很明显,对服务器要求高,混流过程有一定的延时。

SFU方案

Selective Forwarding Unit

服务器对多路流根据实际场景进行转发。

传输质量

QoS

流媒体传输协议

传统直播推拉流架构的主要协议:HLS和RTMP协议。

由于WebRTC传输数据基于UDP协议,RTMP基于TCP协议,HLS基于HTTP协议的特点,传输实时性:WebRTC(RTCP/RTP) > RTMP > HLS

  • WebRTC多用于实时音视频通信或互动场景
  • RTMP多用于推拉流场景
  • HLS多用于拉流端和点播场景

HLS

算法设计

在实时传输中,一般使用UDP协议,因此需要对抗丢包,乱序,抖动等质量问题。在实时和可靠之间做权衡,一切都是trade-off。

NACK

NACK receiver

目标: 既要做到及时快速的重传,又要避免过多过久的重传,减少带宽的使用。

思路1: 收到包时记录seq,定时检查最近1000个包里丢失了哪些,发送RTCP NACK请求。
问题:定时器间隔导致丢包不能快速请求。

思路2: 思路1+每次收到包都触发一次快速请求重传。
问题:乱序时误重传,浪费带宽。

思路3: 思路2+为避免即时和定时频繁发送NACK请求的问题,根据rtt推算是否在定时器触发时需要再次请求NACK,并记录包的重传次数,超过一定次数则停止再发重传请求。

弱网环境下的补充:若重传队列过长,可清理至上一个keyframe处的seq。若队列还是过长,可直接清空重传队列,并发送RTCP PLI请求。

NACK sender

sfu的sender应尽量减少包拷贝次数,多个发送端使用统一发包队列,只记录每个sender的索引。

RTC端到端

发送端

前处理 包括缩放、图像增强、美颜。
编码 包括码控、预测、变换、量化、熵编码。
在 RTP 打包的时候是以 Slice 为单位 打包 的,而不是以帧为单位打包的。Slice 其实是为了并行编码设计的。将一帧图像划分成几个 Slice,并且 Slice 之间相互独立、互不依赖、独立编码。

在RTC视频通话场景下最好选择 CBR 的码控算法,从而保证输出码率能够比较好地贴合预估带宽。
如果编码器输出码率差网络带宽太多,也会导致 PacedSender 缓冲太多数据包,从而引起延时太长。码控和 PacedSender 都很重要,它们是一起协作来减少卡顿的。

接收端

Jitter Buffer工作在接收端,主要功能就是在接收端收到包之后进行组帧,并判断帧的完整性、可解码性、发送丢包重传请求、发送关键帧请求以及估算网络抖动的。

判断帧完整性

结合发送端的打包特性:

  • 在 RTP 打包的时候是以 Slice 为单位打包的,一帧是有可能有多个 Slice 的。
  • 在 RTP 包里面,RTP 头有一个标志位 M,表示是一帧的结尾。因此只要收到这个标志位为 1 的包就代表收到了这一帧的最后一个包。
  • 同一帧 RTP 序列号一直连续。
  • 同一帧 RTP 时间戳相同。
  • slice_header 的 first_mb_in_slice 字段为 0,就代表是帧的第一个 Slice 了。

方法一:
first_mb_in_slice 为0,序号连续,找到 M 为1的包。

方法二:
收包队列从 M 为1向前找序号连续的,直到找到RTP时间戳跳变为止为帧开头。

两种方法相同的都是用M标志位判断帧结尾,区别是如何找帧的开头。第一种方法更加灵活。

卡顿

一般的通话视频来说,帧之间间隔超过200ms会造成视觉卡顿。

发送端排查

  • 帧率不够5fps,提高帧率。
  • 机器性能不够,导致前处理或编码耗时过长。可以在高分辨率的时候尽量使用 GPU 做前处理,并使用硬件编码或者将软件编码设置为快速档加快处理的速度(压缩率会下降)。
  • 编码器输出码率超过实际网络带宽。如果使用 VBR 码控算法,编码器的输出码率会随着画面的复杂程度变化,那就会有很大的概率因为画面复杂而出现输出码率超过预估带宽的情况,从而导致对端出现严重的卡顿。而 CBR 码控算法是你设置多少目标码率,编码器的输出码率就会接近于目标码率。
  • 复杂帧编码后过大或者 I 帧比较大。为了能够减小这种大帧带来的瞬时网络波动,我们可以在编码打包之后、发送之前,加一个平滑发送的模块来平滑地发送视频包。这个模块在 WebRTC 中叫做 PacedSender(节奏发送器)。PacedSender 主要的工作原理就是编码输出的码流打包之后先放到它的缓冲区中,而不是直接发送。之后它再按照预估带宽大小对应的发送速度,将缓冲区中的数据发送到网络当中。

接收端排查

  • 丢包重传策略应对网络丢包。
  • 丢包重传仍然没有恢复,FIR请求,收到IDR帧强制关键帧(立即刷新帧)才可恢复。

花屏

基本上都是接收端的问题:

  • 帧不完整。如果帧出现了丢包就送去解码的话,若能解码成功,那肯定会出现解码花屏的问题。
  • 参考帧不完整。
  • 渲染的时候 YUV 格式弄错了。
  • Stride 问题会造成花屏。我们解码后渲染前一定要处理好 YUV 的 Stride 问题,不要和宽度弄混了。

后两者花屏的图像比较有辨识度。

outline

支持判等类型: 布尔,数字,字符串,指针,接口,通道,结构体,包含上述类型的数组
不支持判等类型: slice, map, function

context

全双工协议

简而言之,许多应用场景需要全双工的web应用,例如:服务器主动推送和实时交互的应用。这些应用不适合在HTTP协议上实现。最好的方式是使用HTTP建立握手,之后基于TCP做长连接。

与HTTP的关系是:使用了其握手机制和端口号。

Relationship to TCP and HTTP

The WebSocket Protocol is an independent TCP-based protocol. Its
only relationship to HTTP is that its handshake is interpreted by
HTTP servers as an Upgrade request.

By default, the WebSocket Protocol uses port 80 for regular WebSocket
connections and port 443 for WebSocket connections tunneled over
Transport Layer Security (TLS) [RFC2818].

支持WebSocket的浏览器

握手

WebSocket通过HTTP/1.1协议的101状态码握手。

客户端发起握手请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
GET ws://localhost:6060/ws HTTP/1.1
Host: localhost:6060
Connection: Upgrade # 连接升级
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36
Upgrade: websocket # 升级到WebSocket
Origin: http://localhost:6060
Sec-WebSocket-Version: 13 # 表示支持的WebSocket版本
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,ja;q=0.6,zh-TW;q=0.5
Cookie: wp-settings-time-1=1566626634
Sec-WebSocket-Key: Omt7iN7CaEUxYdm+wlVEaA==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits

服务器回应

1
2
3
4
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: ggdcU6xoySObbFCh62I2J3eRxu0=
  • Sec-WebSocket-Key是随机的字符串,服务器端会用这些数据来构造出一个SHA-1的信息摘要。把Sec-WebSocket-Key加上一个特殊字符串(固定):258EAFA5-E914-47DA-95CA-C5AB0DC85B11,然后计算SHA-1摘要,之后进行BASE-64编码,将结果做为Sec-WebSocket-Accept头的值,返回给客户端。如此操作,可以尽量避免普通HTTP请求被误认为Websocket协议。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 固定字符串
var keyGUID = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")

// 服务器根据客户端请求的随机key计算Sec-WebSocket-Accept字段的值
func computeAcceptKey(challengeKey string) string {
h := sha1.New()
h.Write([]byte(challengeKey))
h.Write(keyGUID)
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

// 客户端生成随机key
func generateChallengeKey() (string, error) {
p := make([]byte, 16)
if _, err := io.ReadFull(rand.Reader, p); err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(p), nil
}

  • Origin字段是可选的,通常用来表示在浏览器中发起此Websocket连接所在的页面。

协议格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
  • FIN为true表示是最后一个分片。
  • RSV一般情况都为0。
  • opcode 定义了payload data的类型。
  • mask为true表示payload是编码处理的。
  • masking-key编码key。
  • payload length表示payload的长度。

extend为payload过长时的扩展字段。

客户端发给服务器的包,必须masking:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

const wordSize = int(unsafe.Sizeof(uintptr(0)))

// 随机生成masking-key
func newMaskKey() [4]byte {
n := rand.Uint32()
return [4]byte{byte(n), byte(n >> 8), byte(n >> 16), byte(n >> 24)}
}

func maskBytes(key [4]byte, pos int, b []byte) int {
// Mask one byte at a time for small buffers.
if len(b) < 2*wordSize {
for i := range b {
b[i] ^= key[pos&3]
pos++
}
return pos & 3
}

// Mask one byte at a time to word boundary.
if n := int(uintptr(unsafe.Pointer(&b[0]))) % wordSize; n != 0 {
n = wordSize - n
for i := range b[:n] {
b[i] ^= key[pos&3]
pos++
}
b = b[n:]
}

// Create aligned word size key.
var k [wordSize]byte
for i := range k {
k[i] = key[(pos+i)&3]
}
kw := *(*uintptr)(unsafe.Pointer(&k))

// Mask one word at a time.
n := (len(b) / wordSize) * wordSize
for i := 0; i < n; i += wordSize {
*(*uintptr)(unsafe.Pointer(uintptr(unsafe.Pointer(&b[0])) + uintptr(i))) ^= kw
}

// Mask one byte at a time for remaining bytes.
b = b[n:]
for i := range b {
b[i] ^= key[pos&3]
pos++
}

return pos & 3
}

控制消息

WebSocket定义了3种类型的控制消息(control message):close, ping and pong。

close

opcode为0x8

客户端发起的close,websocket.payload.close.status_code置为1000,表示normal closure。若不设置,则默认为CloseNoStatusReceived=1005。

服务器收到关闭请求,响应包也发送1000。

客户端收到响应后,执行TCP四次挥手过长,双方关闭TCP连接。

在实际捕获websocket的close code时,应注意以下细节:

  • 1006属于错误码,并不是可从SetCloseHandler中捕获到的,因为它没有真正的发送close消息。
  • 若websocket由Nginx代理,应考虑Nginx可能会主动发送close的情况,因此应该从读取message的地方去获取断开原因,因为SetCloseHandler有可能会接收到Nginx发来的close状态。

ping

opcode为0x9

发送方 -> 接收方

WebSocket为了保持客户端、服务端的实时双向通信,需要确保客户端、服务端之间的TCP通道保持连接没有断开。然而,对于长时间没有数据往来的连接,如果依旧长时间保持着,可能会浪费包括的连接资源。但不排除有些场景,客户端、服务端虽然长时间没有数据往来,但仍需要保持连接。这个时候,可以采用心跳来实现。

如果客户端支持ping,最好由客户端发起ping,然后服务器记录时间,超时断开即可。浏览器中没有相关api发送ping给服务器,只能由服务器发ping给浏览器。

pong

opcode为0xA

接收方 -> 发送方

参考

https://tools.ietf.org/html/rfc6455

https://zh.wikipedia.org/wiki/WebSocket

https://godoc.org/github.com/gorilla/websocket

https://github.com/gorilla/websocket

websocket ping/pong timeout

Job scheduling

作业调度

场景

多个作业进程(或服务器,后称为process)可处理相同的任务,并具有不同的处理能力。

假设

进程(服务器) 处理能力
A 4
B 3
C 2

在一轮任务(Job)的分配中,均衡的分配给每一个process。

处理能力可认为是分配权重weight。

算法

加权轮询调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Supposing that there is a server set S = {S0, S1, …, Sn-1};
W(Si) indicates the weight of Si;
i indicates the server selected last time, and i is initialized with -1;
cw is the current weight in scheduling, and cw is initialized with zero;
max(S) is the maximum weight of all the servers in S;
gcd(S) is the greatest common divisor of all server weights in S;

while (true) {
i = (i + 1) mod n;
if (i == 0) {
cw = cw - gcd(S);
if (cw <= 0) {
cw = max(S);
if (cw == 0)
return NULL;
}
}
if (W(Si) >= cw)
return Si;
}

输出的结果是AABABCABC

这个算法的两个技巧:

  1. gcd最大公约数:相当于算法的步长。比如权重为1000,100,10的三个process,分配时可以认为它们是权重为100,10,1.最大公约数为10,循环执行10次的效果是一样的。
  2. 当W(Si) >= cw时,才返回Si。这样做的效果是,不同process中优先调用高于其他weight的部分,当所有process剩下的weight相等时,按顺序循环调用。

实现代码

python实现

更为随机的加权轮询调度

设想A,B,C的权值分别为5,1,1的情景下,以上的调度算法输出为AAAAABC

定义:

  1. 每个process的动态当前权值current weight为cw
  2. 每个process的固有权值weight为w
  3. 参加轮询的process的weight之和为sw

算法:

  1. 每个cw初始值为0
  2. 每个cw = cw + w
  3. 被选中的process为cw最大的,被选中后cw的值被惩罚为该process的cw减去sw,而其他process的cw保持不变。即max(cw) - sw
  4. 若所有process的cw都为0,则一轮结束。否则,进行下一步
  5. 跳到第2步

运算:

奇数行根据最大的current weight选择process

被选中的cw被高亮标出

A B C
0 0 0
5 1 1
-2 1 1
3 2 2
-4 2 2
1 3 3
1 -4 3
6 -3 4
-1 -3 4
4 -2 5
4 -2 -2
9 -1 -1
2 -1 -1
7 0 0
0 0 0

技巧:

  1. 算法里循环执行的cw = cw + wmax(cw)-sw两个步骤确保了cw之和等于sw,循环次数为sw
  2. 和上一个算法相比,该算法每次让被选出来的process尽可能的排在最后面,确保了不会有连续的被选中

参考资料

加权轮询调度算法:

http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling