harmony/pkg/web/web.go

328 lines
6.4 KiB
Go

package web
import (
"io"
"log"
"math/rand"
"net/http"
"sync"
rice "github.com/GeertJohan/go.rice"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/pion/rtp"
"github.com/pion/webrtc/v2"
"github.com/pkg/errors"
)
var peerConnectionConfig = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
var incomingClients = make(chan *Client, 10)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
EnableCompression: true,
}
type Message struct {
S int // Source
T MessageType // Type
M []byte // Message
}
type WebInterface struct {
Clients map[int]*Client
ClientsLock *sync.Mutex
}
func NewWebInterface(address string, path string) *WebInterface {
w := WebInterface{Clients: make(map[int]*Client), ClientsLock: new(sync.Mutex)}
r := mux.NewRouter()
r.HandleFunc(path+"w", w.webSocketHandler)
r.PathPrefix(path).Handler(http.StripPrefix(path, http.FileServer(rice.MustFindBox("public").HTTPBox())))
go w.handleIncomingClients()
go func() {
if err := http.ListenAndServe(address, r); err != nil {
log.Fatal("Web server error: ", err)
}
}()
return &w
}
func (w *WebInterface) handleIncomingClients() {
for c := range incomingClients {
w.ClientsLock.Lock()
id := w.nextClientID()
c.ID = id
w.Clients[id] = c
for _, wc := range w.Clients {
wc.Out <- &Message{T: MessageConnect, M: []byte(c.Name)}
}
w.ClientsLock.Unlock()
go w.handleRead(c)
}
}
func (w *WebInterface) handleRead(c *Client) {
for msg := range c.In {
if msg == nil {
return
}
if msg.T != MessagePing {
log.Printf("%d -> %s %d", msg.S, msg.T, len(msg.M))
}
switch msg.T {
case MessageBinary:
// TODO Binary message
continue
case MessagePing:
// TODO Handle ping
case MessageCall:
answer, err := w.answerRTC(c, msg.M)
if err != nil {
log.Printf("failed to answer call: %s", err)
continue
}
c.Out <- &Message{T: MessageAnswer, M: answer}
case MessageChat:
w.ClientsLock.Lock()
for _, wc := range w.Clients {
wc.Out <- &Message{S: c.ID, T: MessageChat, M: []byte(msg.M)}
}
w.ClientsLock.Unlock()
log.Printf("<%s> %s", c.Name, msg.M)
case MessageConnect, MessageJoin, MessageNick, MessageQuit, MessageDisconnect:
w.ClientsLock.Lock()
if msg.T == MessageQuit || msg.T == MessageDisconnect {
c.AudioTrack = nil
if c.PeerConn != nil {
c.ClosePeerConn()
log.Printf("closing peerconn, peer sent %s %d", msg.T, c.ID)
}
if msg.T == MessageDisconnect {
c.Close()
}
}
msg.M = []byte(c.Name)
for _, wc := range w.Clients {
if (msg.T == MessageJoin || msg.T == MessageQuit) && wc.AudioTrack == nil && wc.ID != c.ID {
continue
}
wc.Out <- msg
}
w.ClientsLock.Unlock()
default:
log.Printf("Unhandled message %d %s", msg.T, msg.M)
}
}
}
func (w *WebInterface) nextClientID() int {
id := 1
for {
if _, ok := w.Clients[id]; !ok {
break
}
id++
}
return id
}
func (w *WebInterface) webSocketHandler(wr http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(wr, r, nil)
if err != nil {
return
}
c := NewClient(conn)
incomingClients <- c
<-c.Terminated
w.ClientsLock.Lock()
for id := range w.Clients {
if w.Clients[id].Status != -1 {
continue
}
name := w.Clients[id].Name
delete(w.Clients, id)
for _, wc := range w.Clients {
wc.Out <- &Message{T: MessageDisconnect, M: []byte(name)}
}
}
w.ClientsLock.Unlock()
}
func (w *WebInterface) answerRTC(c *Client, sdp []byte) ([]byte, error) {
if c.PeerConn != nil {
return nil, errors.New("already have peerconn")
}
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(sdp)}
m := webrtc.MediaEngine{}
err := m.PopulateFromSDP(offer)
if err != nil {
panic(err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m))
audioCodecs := m.GetCodecsByKind(webrtc.RTPCodecTypeAudio)
if len(audioCodecs) == 0 {
panic("Offer contained no audio codecs")
}
pc, err := api.NewPeerConnection(peerConnectionConfig)
if err != nil {
panic(err)
}
c.PeerConn = pc
err = pc.SetRemoteDescription(offer)
if err != nil {
panic(err)
}
var payloadType uint8
for i := range audioCodecs {
if audioCodecs[i].Name == webrtc.Opus {
payloadType = audioCodecs[i].PayloadType
break
}
}
if payloadType == 0 {
c.ClosePeerConn()
return nil, errors.New("no payloadType")
}
t, err := pc.NewTrack(payloadType, rand.Uint32(), "audio", "harmony")
if err != nil {
panic(err)
}
c.AudioTrack = t
_, err = pc.AddTrack(c.AudioTrack)
if err != nil {
panic(err)
}
pc.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) {
var (
p *rtp.Packet
err error
)
for {
p, err = remoteTrack.ReadRTP()
if err != nil {
c.ClosePeerConn()
log.Printf("failed to read RTP from %d", c.ID)
return
}
w.ClientsLock.Lock()
for _, wc := range w.Clients {
if wc.ID == c.ID || wc.AudioTrack == nil {
continue
}
p.SSRC = wc.AudioTrack.SSRC()
p.PayloadType = wc.AudioTrack.PayloadType()
p.SequenceNumber = wc.SequenceNumber
if err = wc.AudioTrack.WriteRTP(p); err != nil && err != io.ErrClosedPipe {
panic(err)
}
wc.SequenceNumber++
}
w.ClientsLock.Unlock()
}
})
pc.OnConnectionStateChange(func(connectionState webrtc.PeerConnectionState) {
log.Printf("%d conn state -> %s\n", c.ID, connectionState)
if connectionState == webrtc.PeerConnectionStateConnected || connectionState == webrtc.PeerConnectionStateDisconnected {
w.ClientsLock.Lock()
if connectionState == webrtc.PeerConnectionStateDisconnected {
c.ClosePeerConn()
log.Printf("closing peerconn, peer disconnected %d", c.ID)
}
for _, wc := range w.Clients {
if wc.AudioTrack == nil && wc.ID != c.ID {
continue
}
if connectionState == webrtc.PeerConnectionStateConnected {
wc.Out <- &Message{T: MessageJoin, M: []byte(c.Name)}
} else {
wc.Out <- &Message{T: MessageQuit, M: []byte(c.Name)}
}
}
w.ClientsLock.Unlock()
}
})
pc.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
log.Printf("%d ice state -> %s\n", c.ID, connectionState)
})
answer, err := pc.CreateAnswer(nil)
if err != nil {
panic(err)
}
err = pc.SetLocalDescription(answer)
if err != nil {
panic(err)
}
return []byte(answer.SDP), nil
}