harmony/pkg/web/web.go

485 lines
10 KiB
Go

package web
import "C"
import (
"bytes"
"encoding/json"
"log"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"git.sr.ht/~tslocum/harmony/pkg/audio"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/pion/rtp"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
"github.com/pkg/errors"
"gitlab.com/golang-commonmark/markdown"
)
var peerConnectionConfig = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
var incomingClients = make(chan *Client, 10)
var markdownRenderer = markdown.New(markdown.Typographer(false), markdown.Breaks(true), markdown.Quotes([]string{`"`, `"`, `'`, `'`}))
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
EnableCompression: true,
}
type Message struct {
S int // Source
N string // Source nickname
PC int // PeerConn
T MessageType // Type
M []byte // Message
}
type WebInterface struct {
Clients map[int]*Client
ClientsLock *sync.Mutex
}
func NewWebInterface(address string, path string) *WebInterface {
if assets == nil {
panic("web assets not found")
}
w := WebInterface{Clients: make(map[int]*Client), ClientsLock: new(sync.Mutex)}
r := mux.NewRouter()
r.HandleFunc(path+"w", w.webSocketHandler)
r.PathPrefix(path).Handler(http.StripPrefix(path, http.FileServer(assets)))
go w.handleIncomingClients()
go w.handleExpireTransmit()
addressSplit := strings.Split(address, ",")
for _, add := range addressSplit {
add := add // Capture
go func() {
if err := http.ListenAndServe(add, r); err != nil {
log.Fatal("failed to listen on address ", add, ":", err)
}
}()
}
return &w
}
func (w *WebInterface) handleIncomingClients() {
for c := range incomingClients {
c := c // Capture
w.ClientsLock.Lock()
id := w.nextClientID()
c.ID = id
w.Clients[id] = c
go func(c *Client) {
time.Sleep(500 * time.Millisecond)
c.Connected = true
w.updateUserList()
w.ClientsLock.Lock()
for _, wc := range w.Clients {
wc.Out <- &Message{T: MessageConnect, N: c.Name, M: []byte(c.Name)}
}
w.ClientsLock.Unlock()
}(c)
w.ClientsLock.Unlock()
go w.handleRead(c)
}
}
func (w *WebInterface) handleExpireTransmit() {
t := time.NewTicker(250 * time.Millisecond)
for range t.C {
w.ClientsLock.Lock()
for _, wc := range w.Clients {
wc.VoiceInLock.Lock()
if wc.VoiceInTransmitting && time.Since(wc.VoiceInLastActive) >= 100*time.Millisecond {
wc.VoiceInTransmitting = false
for _, wcc := range w.Clients {
if len(wcc.AudioTracks) > 0 {
wcc.Out <- &Message{T: MessageTransmitStop, S: wc.ID}
}
}
}
wc.VoiceInLock.Unlock()
}
w.ClientsLock.Unlock()
}
}
func (w *WebInterface) handleRead(c *Client) {
for msg := range c.In {
if msg == nil {
return
}
if msg.T != MessagePing {
log.Printf("%d -> %s %d", msg.S, msg.T, len(msg.M))
}
switch msg.T {
case MessageBinary:
// TODO Binary message
continue
case MessagePing:
c.Out <- &Message{T: MessagePong, M: msg.M}
case MessageCall:
answer, err := w.answerRTC(c, msg.PC, msg.M)
if err != nil {
log.Printf("failed to answer call: %s", err)
continue
}
c.Out <- &Message{T: MessageAnswer, PC: msg.PC, M: answer}
case MessageChat:
if bytes.HasPrefix(bytes.ToLower(msg.M), []byte("/nick ")) {
go func(mm []byte) {
c.In <- &Message{S: c.ID, T: MessageNick, M: mm}
}(msg.M[6:])
continue
}
log.Printf("<%s> %s", c.Name, msg.M)
msg.M = bytes.TrimSpace([]byte(markdownRenderer.RenderToString(msg.M)))
if bytes.Count(msg.M, []byte("<p>")) == 1 && bytes.Count(msg.M, []byte("</p>")) == 1 && bytes.HasPrefix(msg.M, []byte("<p>")) && bytes.HasSuffix(msg.M, []byte("</p>")) {
msg.M = msg.M[3 : len(msg.M)-4]
}
msg.M = bytes.ReplaceAll(msg.M, []byte(`<a href="`), []byte(`<a target="_blank" href="`))
w.ClientsLock.Lock()
for _, wc := range w.Clients {
wc.Out <- &Message{S: c.ID, N: c.Name, T: MessageChat, M: msg.M}
}
w.ClientsLock.Unlock()
case MessageNick:
w.ClientsLock.Lock()
oldNick := c.Name
c.Name = Nickname(string(msg.M))
if c.Connected {
msg := &Message{S: c.ID, N: oldNick, T: MessageNick, M: []byte(c.Name)}
for _, wc := range w.Clients {
wc.Out <- msg
}
}
w.ClientsLock.Unlock()
w.updateUserList()
case MessageConnect, MessageJoin, MessageQuit, MessageDisconnect:
w.ClientsLock.Lock()
if msg.T == MessageQuit || msg.T == MessageDisconnect {
c.CloseAudio()
if msg.T == MessageDisconnect {
c.Close()
}
}
msg.N = c.Name
for _, wc := range w.Clients {
if (msg.T == MessageJoin || msg.T == MessageQuit) && len(wc.AudioTracks) == 0 && wc.ID != c.ID {
continue
}
wc.Out <- msg
}
w.ClientsLock.Unlock()
w.updateUserList()
default:
log.Printf("Unhandled message %d %s", msg.T, msg.M)
}
}
}
func (w *WebInterface) nextClientID() int {
id := 1
for {
if _, ok := w.Clients[id]; !ok {
break
}
id++
}
return id
}
func (w *WebInterface) webSocketHandler(wr http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(wr, r, nil)
if err != nil {
return
}
c := NewClient(conn)
incomingClients <- c
<-c.Terminated
w.ClientsLock.Lock()
for id := range w.Clients {
if w.Clients[id].Status != -1 {
continue
}
name := w.Clients[id].Name
delete(w.Clients, id)
for _, wc := range w.Clients {
wc.Out <- &Message{T: MessageDisconnect, N: name, M: []byte(name)}
}
}
w.ClientsLock.Unlock()
}
func (w *WebInterface) updateUserList() {
w.ClientsLock.Lock()
msg := &Message{T: MessageUsers}
var userList UserList
for _, wc := range w.Clients {
c := 0
if len(wc.AudioTracks) > 0 {
c = 1
}
userList = append(userList, &User{ID: wc.ID, N: wc.Name, C: c})
}
sort.Sort(userList)
var err error
msg.M, err = json.Marshal(userList)
if err != nil {
log.Fatal("failed to marshal user list: ", err)
}
for _, wc := range w.Clients {
wc.Out <- msg
}
w.ClientsLock.Unlock()
}
func (w *WebInterface) answerRTC(c *Client, peerConnID int, offerSDP []byte) ([]byte, error) {
c.PeerConnLock.Lock()
defer c.PeerConnLock.Unlock()
/*if c.NextPeerConn >= 3 {
return nil, errors.New("already have 3 peerconns") // TODO configurable
}*/
if c.PeerConns[peerConnID] != nil {
return nil, errors.New("already have next peerconn")
}
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(offerSDP)}
m := webrtc.MediaEngine{}
err := m.PopulateFromSDP(offer)
if err != nil {
panic(err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m))
audioCodecs := m.GetCodecsByKind(webrtc.RTPCodecTypeAudio)
if len(audioCodecs) == 0 {
panic("Offer contained no audio codecs")
}
pc, err := api.NewPeerConnection(peerConnectionConfig)
if err != nil {
panic(err)
}
if len(c.PeerConns) == 0 {
c.InitAudio()
}
c.PeerConns[peerConnID] = pc
err = pc.SetRemoteDescription(offer)
if err != nil {
panic(err)
}
var payloadType uint8
for i := range audioCodecs {
if audioCodecs[i].Name == webrtc.Opus && audioCodecs[i].ClockRate == audio.ClockRate*1000 && audioCodecs[i].Channels == audio.Channels {
payloadType = audioCodecs[i].PayloadType
break
}
}
if payloadType == 0 {
c.ClosePeerConn(peerConnID)
return nil, errors.New("no payloadType")
}
name := "harmony-audio-" + strconv.Itoa(peerConnID)
c.AudioTracks[peerConnID], err = pc.NewTrack(payloadType, 1000+uint32(peerConnID), name, name)
if err != nil {
panic(err)
}
direction := webrtc.RTPTransceiverDirectionSendonly
if peerConnID == 0 {
direction = webrtc.RTPTransceiverDirectionSendrecv
}
_, err = pc.AddTransceiverFromTrack(c.AudioTracks[peerConnID], webrtc.RtpTransceiverInit{Direction: direction})
if err != nil {
panic(err)
}
go func() {
for p := range c.VoiceOut[peerConnID] {
if p == nil {
return
}
err = c.AudioTracks[peerConnID].WriteSample(media.Sample{Data: p.Payload, Samples: audio.Samples})
if err != nil {
panic(err)
}
}
}()
pc.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) {
log.Printf("client %d ontrack", c.ID)
var p *rtp.Packet
for {
p, err = remoteTrack.ReadRTP()
if err != nil {
c.ClosePeerConn(peerConnID)
log.Printf("failed to read RTP from %d", c.ID)
return
}
w.ClientsLock.Lock()
c.VoiceInLock.Lock()
if !c.VoiceInTransmitting {
c.VoiceInTransmitting = true
for _, wc := range w.Clients {
if len(wc.AudioTracks) > 0 {
wc.Out <- &Message{T: MessageTransmitStart, S: c.ID}
}
}
}
c.VoiceInLastActive = time.Now()
c.VoiceInLock.Unlock()
// TODO trim initial x ms transmitting to remove noise (configurable)
for ci, wc := range w.Clients {
if ci == c.ID {
continue
}
wc.WriteAudio(p, c.ID)
}
w.ClientsLock.Unlock()
}
})
pc.OnConnectionStateChange(func(connectionState webrtc.PeerConnectionState) {
log.Printf("%d conn state -> %s\n", c.ID, connectionState)
if peerConnID != 0 {
return // Process events from first PeerConn only
}
if connectionState == webrtc.PeerConnectionStateConnected || connectionState == webrtc.PeerConnectionStateDisconnected {
w.ClientsLock.Lock()
if connectionState == webrtc.PeerConnectionStateDisconnected {
c.ClosePeerConn(peerConnID)
log.Printf("closing peerconn, peer disconnected %d", c.ID)
}
for _, wc := range w.Clients {
if len(wc.AudioTracks) == 0 && wc.ID != c.ID {
continue
}
if connectionState == webrtc.PeerConnectionStateConnected {
wc.Out <- &Message{T: MessageJoin, N: c.Name, M: []byte(c.Name)}
} else {
wc.Out <- &Message{T: MessageQuit, N: c.Name, M: []byte(c.Name)}
}
}
w.ClientsLock.Unlock()
w.updateUserList()
}
})
pc.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
log.Printf("%d ice state -> %s\n", c.ID, connectionState)
})
answerOptions := &webrtc.AnswerOptions{OfferAnswerOptions: webrtc.OfferAnswerOptions{VoiceActivityDetection: false}}
// TODO webrtc does not yet support AnswerOptions
answerOptions = nil
answer, err := pc.CreateAnswer(answerOptions)
if err != nil {
panic(err)
}
err = pc.SetLocalDescription(answer)
if err != nil {
panic(err)
}
return []byte(answer.SDP), nil
}