harmony/pkg/web/client.go

217 lines
3.9 KiB
Go

package web
import (
"encoding/json"
"log"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/pion/rtp"
"github.com/pion/webrtc/v2"
"github.com/pkg/errors"
)
type Client struct {
ID int
Name string
Status int
Conn *websocket.Conn
Connected bool
PeerConns map[int]*webrtc.PeerConnection
PeerConnLock *sync.Mutex
In chan *Message
Out chan *Message
AudioTracks map[int]*webrtc.Track
VoiceIn chan []int16
VoiceInActive time.Time
VoiceInNotify time.Time
VoiceInTransmitting bool
VoiceInLock *sync.Mutex
VoiceOut []chan *rtp.Packet
VoiceOutClient []int
VoiceOutActive []time.Time
VoiceOutLock *sync.Mutex
Channel *Channel
Terminated chan bool
}
func NewClient(conn *websocket.Conn) *Client {
c := Client{Conn: conn,
Name: "Anonymous",
PeerConns: make(map[int]*webrtc.PeerConnection),
PeerConnLock: new(sync.Mutex),
In: make(chan *Message, 10),
Out: make(chan *Message, 10),
AudioTracks: make(map[int]*webrtc.Track),
VoiceIn: make(chan []int16, 10),
VoiceInLock: new(sync.Mutex),
VoiceOutLock: new(sync.Mutex),
Terminated: make(chan bool)}
go c.handleRead()
go c.handleWrite()
return &c
}
func (c *Client) handleRead() {
var (
messageTypeInt int
messageType MessageType
message []byte
err error
)
for {
c.Conn.SetReadDeadline(time.Now().Add(1 * time.Minute))
messageTypeInt, message, err = c.Conn.ReadMessage()
if err != nil || c.Status == -1 {
c.Close()
return
}
messageType = MessageType(messageTypeInt)
in := Message{}
if messageType == 2 {
in.T = messageType
in.M = message
} else {
err = json.Unmarshal(message, &in)
if err != nil {
// TODO Place error behind debug/verbose var
log.Println(string(message))
log.Println()
log.Println(err)
c.Close()
return
}
}
in.S = c.ID
c.In <- &in
}
}
func (c *Client) handleWrite() {
var (
out []byte
err error
)
for msg := range c.Out {
if msg == nil {
return
}
out, err = json.Marshal(msg)
if err != nil {
c.Close()
return
}
c.Conn.WriteMessage(1, out)
}
}
func (c *Client) Close() {
if c.Status == -1 {
return
}
c.Status = -1
c.CloseAudio()
if c.Conn != nil {
c.Conn.Close()
}
c.In <- nil
c.Out <- nil
// TODO Place behind debug/verbose flag
log.Printf("%+v", errors.New("Closing client #"+strconv.Itoa(c.ID)))
go func() {
c.Terminated <- true
}()
}
func (c *Client) InitAudio() {
c.VoiceOutLock.Lock()
defer c.VoiceOutLock.Unlock()
if len(c.VoiceOut) > 0 {
return
}
for i := 0; i < 3; i++ {
c.VoiceOut = append(c.VoiceOut, make(chan *rtp.Packet, 10))
c.VoiceOutClient = append(c.VoiceOutClient, 0)
c.VoiceOutActive = append(c.VoiceOutActive, time.Time{})
}
}
func (c *Client) WriteAudio(p *rtp.Packet, source int) {
c.VoiceOutLock.Lock()
for i := range c.VoiceOut {
if c.VoiceOutClient[i] == 0 || c.VoiceOutClient[i] == source || time.Since(c.VoiceOutActive[i]) >= 50*time.Millisecond {
select {
case c.VoiceOut[i] <- p:
default:
log.Printf("client %d warning: filled voice out buffer when writing from %d", c.ID, source)
continue
}
c.VoiceOutActive[i] = time.Now()
c.VoiceOutClient[i] = source
c.VoiceOutLock.Unlock()
return
}
}
c.VoiceOutLock.Unlock()
}
func (c *Client) CloseAudio() {
c.ClosePeerConns()
c.PeerConns = make(map[int]*webrtc.PeerConnection)
c.VoiceOut = nil
c.VoiceOutClient = nil
c.VoiceOutActive = nil
}
func (c *Client) ClosePeerConns() {
for id := range c.PeerConns {
c.ClosePeerConn(id)
}
}
func (c *Client) ClosePeerConn(id int) {
c.VoiceOutLock.Lock()
defer c.VoiceOutLock.Unlock()
pc := c.PeerConns[id]
if pc == nil {
return
}
select {
case c.VoiceOut[id] <- nil:
default:
log.Println("failed to close channel for client " + strconv.Itoa(c.ID))
}
pc.Close()
pc = nil
c.AudioTracks[id] = nil
}