harmony/pkg/web/web.go

550 lines
11 KiB
Go

package web
import "C"
import (
"bytes"
"encoding/json"
"log"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"git.sr.ht/~tslocum/harmony/pkg/agent"
"git.sr.ht/~tslocum/harmony/pkg/audio"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/pion/rtp"
"github.com/pion/webrtc/v2"
"github.com/pkg/errors"
"gitlab.com/golang-commonmark/markdown"
)
var peerConnectionConfig = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
var assets http.FileSystem
var incomingClients = make(chan *agent.Client, 10)
var markdownRenderer = markdown.New(markdown.Typographer(false), markdown.Breaks(true), markdown.Quotes([]string{`"`, `"`, `'`, `'`}))
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
EnableCompression: true,
}
type WebInterface struct {
Clients map[int]*agent.Client
ClientsLock *sync.Mutex
Channels map[int]*agent.Channel
ChannelsLock *sync.Mutex
}
func NewWebInterface(address string, path string) *WebInterface {
if assets == nil {
panic("failed to load web assets")
}
w := WebInterface{Clients: make(map[int]*agent.Client), ClientsLock: new(sync.Mutex), Channels: make(map[int]*agent.Channel), ChannelsLock: new(sync.Mutex)}
w.createChannels()
go w.handleIncomingClients()
go w.handleExpireTransmit()
r := mux.NewRouter()
r.HandleFunc(path+"w", w.webSocketHandler)
r.PathPrefix(path).Handler(http.StripPrefix(path, http.FileServer(assets)))
addressSplit := strings.Split(address, ",")
for _, add := range addressSplit {
add := add // Capture
go func() {
if err := http.ListenAndServe(add, r); err != nil {
log.Fatal("failed to listen on address ", add, ":", err)
}
}()
}
return &w
}
func (w *WebInterface) nextChannelID() int {
id := 0
for cid := range w.Channels {
if cid > id {
id = cid
}
}
return id + 1
}
func (w *WebInterface) createChannels() {
w.ChannelsLock.Lock()
defer w.ChannelsLock.Unlock()
// TODO Load channels from database
}
func (w *WebInterface) AddChannel(t agent.ChannelType, name string, topic string) {
w.ChannelsLock.Lock()
defer w.ChannelsLock.Unlock()
ch := agent.NewChannel(w.nextChannelID(), t)
ch.Name = name
ch.Topic = topic
w.Channels[ch.ID] = ch
}
func (w *WebInterface) handleIncomingClients() {
for c := range incomingClients {
c := c // Capture
w.ClientsLock.Lock()
id := w.nextClientID()
c.ID = id
w.Clients[id] = c
go func(c *agent.Client) {
time.Sleep(500 * time.Millisecond)
c.Connected = true
w.updateUserList()
w.ClientsLock.Lock()
for _, wc := range w.Clients {
wc.Out <- &agent.Message{T: agent.MessageConnect, N: c.Name, M: []byte(c.Name)}
}
w.ClientsLock.Unlock()
}(c)
w.ClientsLock.Unlock()
w.sendChannelList(c)
go w.handleRead(c)
}
}
func (w *WebInterface) handleExpireTransmit() {
t := time.NewTicker(250 * time.Millisecond)
for range t.C {
w.ClientsLock.Lock()
for _, wc := range w.Clients {
if wc.AudioIn.ExpireTransmit() {
for _, wcc := range wc.Channel.Clients {
if len(wcc.AudioOut.Tracks) > 0 {
wcc.Out <- &agent.Message{T: agent.MessageTransmitStop, S: wc.ID}
}
}
}
}
w.ClientsLock.Unlock()
}
}
func (w *WebInterface) handleRead(c *agent.Client) {
for msg := range c.In {
if msg == nil {
return
}
if msg.T != agent.MessagePing {
log.Printf("%d -> %s %d", msg.S, msg.T, len(msg.M))
}
switch msg.T {
case agent.MessageBinary:
// TODO Binary message
continue
case agent.MessagePing:
c.Out <- &agent.Message{T: agent.MessagePong, M: msg.M}
case agent.MessageCall:
answer, err := w.answerRTC(c, msg.PC, msg.M)
if err != nil {
log.Printf("failed to answer call: %s", err)
continue
}
c.Out <- &agent.Message{T: agent.MessageAnswer, PC: msg.PC, M: answer}
case agent.MessageChat:
if bytes.HasPrefix(bytes.ToLower(msg.M), []byte("/nick ")) {
go func(mm []byte) {
c.In <- &agent.Message{S: c.ID, T: agent.MessageNick, M: mm}
}(msg.M[6:])
continue
}
log.Printf("<%s> %s", c.Name, msg.M)
msg.M = bytes.TrimSpace([]byte(markdownRenderer.RenderToString(msg.M)))
if bytes.Count(msg.M, []byte("<p>")) == 1 && bytes.Count(msg.M, []byte("</p>")) == 1 && bytes.HasPrefix(msg.M, []byte("<p>")) && bytes.HasSuffix(msg.M, []byte("</p>")) {
msg.M = msg.M[3 : len(msg.M)-4]
}
msg.M = bytes.TrimSpace(bytes.ReplaceAll(msg.M, []byte(`<a href="`), []byte(`<a target="_blank" href="`)))
w.ClientsLock.Lock()
for _, wc := range w.Clients {
wc.Out <- &agent.Message{S: c.ID, C: msg.C, N: c.Name, T: agent.MessageChat, M: msg.M}
}
w.ClientsLock.Unlock()
case agent.MessageNick:
w.ClientsLock.Lock()
oldNick := c.Name
c.Name = agent.Nickname(string(msg.M))
if c.Connected {
msg := &agent.Message{S: c.ID, N: oldNick, T: agent.MessageNick, M: []byte(c.Name)}
for _, wc := range w.Clients {
wc.Out <- msg
}
}
w.ClientsLock.Unlock()
w.updateUserList()
case agent.MessageJoin, agent.MessageQuit:
if msg.T == agent.MessageJoin {
w.joinChannel(c, w.Channels[msg.C])
} else { // MessageQuit
w.quitChannel(c)
c.CloseAudio()
}
w.updateUserList()
case agent.MessageConnect, agent.MessageDisconnect:
w.ClientsLock.Lock()
if msg.T == agent.MessageDisconnect {
w.quitChannel(c)
c.Close()
}
msg.N = c.Name
for _, wc := range w.Clients {
wc.Out <- msg
}
w.ClientsLock.Unlock()
w.updateUserList()
default:
log.Printf("Unhandled message %d %s", msg.T, msg.M)
}
}
}
func (w *WebInterface) joinChannel(c *agent.Client, ch *agent.Channel) {
if ch == nil || (c.Channel != nil && c.Channel.ID == ch.ID) {
return
}
w.quitChannel(c)
w.ClientsLock.Lock()
ch.Lock()
ch.Clients[c.ID] = c
c.Channel = ch
for _, wc := range ch.Clients {
if len(wc.AudioOut.Tracks) == 0 && wc.ID != c.ID {
continue
}
wc.Out <- &agent.Message{T: agent.MessageJoin, N: c.Name, C: ch.ID}
}
ch.Unlock()
w.ClientsLock.Unlock()
w.updateUserList()
}
func (w *WebInterface) quitChannel(c *agent.Client) {
if c.Channel == nil {
return
}
ch := c.Channel
w.ClientsLock.Lock()
ch.Lock()
for _, wc := range ch.Clients {
if len(wc.AudioOut.Tracks) == 0 && wc.ID != c.ID {
continue
}
wc.Out <- &agent.Message{T: agent.MessageQuit, N: c.Name, C: ch.ID}
}
delete(ch.Clients, c.ID)
c.Channel = nil
ch.Unlock()
w.ClientsLock.Unlock()
w.updateUserList()
}
func (w *WebInterface) nextClientID() int {
id := 1
for {
if _, ok := w.Clients[id]; !ok {
break
}
id++
}
return id
}
func (w *WebInterface) webSocketHandler(wr http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(wr, r, nil)
if err != nil {
return
}
c := agent.NewClient(conn)
incomingClients <- c
<-c.Terminated
w.quitChannel(c)
w.ClientsLock.Lock()
for id := range w.Clients {
if w.Clients[id].Status != -1 {
continue
}
name := w.Clients[id].Name
delete(w.Clients, id)
for _, wc := range w.Clients {
wc.Out <- &agent.Message{T: agent.MessageDisconnect, N: name, M: []byte(name)}
}
}
w.ClientsLock.Unlock()
}
func (w *WebInterface) updateUserList() {
w.ClientsLock.Lock()
msg := &agent.Message{T: agent.MessageUsers}
var userList agent.UserList
for _, wc := range w.Clients {
c := 0
if wc.Channel != nil {
c = wc.Channel.ID
}
userList = append(userList, &agent.User{ID: wc.ID, N: wc.Name, C: c})
}
sort.Sort(userList)
var err error
msg.M, err = json.Marshal(userList)
if err != nil {
log.Fatal("failed to marshal user list: ", err)
}
for _, wc := range w.Clients {
wc.Out <- msg
}
w.ClientsLock.Unlock()
}
func (w *WebInterface) sendChannelList(c *agent.Client) {
var channelList agent.ChannelList
for _, ch := range w.Channels {
channelList = append(channelList, &agent.ChannelListing{ID: ch.ID, Type: ch.Type, Name: ch.Name, Topic: ch.Topic})
}
sort.Sort(channelList)
msg := agent.Message{T: agent.MessageChannels}
var err error
msg.M, err = json.Marshal(channelList)
if err != nil {
log.Fatal("failed to marshal channel list: ", err)
}
c.Out <- &msg
}
func (w *WebInterface) answerRTC(c *agent.Client, peerConnID int, offerSDP []byte) ([]byte, error) {
c.PeerConnLock.Lock()
defer c.PeerConnLock.Unlock()
/*if c.NextPeerConn >= 3 {
return nil, errors.New("already have 3 peerconns") // TODO configurable
}*/
if c.PeerConns[peerConnID] != nil {
return nil, errors.New("already have next peerconn")
}
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(offerSDP)}
m := webrtc.MediaEngine{}
err := m.PopulateFromSDP(offer)
if err != nil {
panic(err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m))
audioCodecs := m.GetCodecsByKind(webrtc.RTPCodecTypeAudio)
if len(audioCodecs) == 0 {
panic("Offer contained no audio codecs")
}
pc, err := api.NewPeerConnection(peerConnectionConfig)
if err != nil {
panic(err)
}
c.PeerConns[peerConnID] = pc
err = pc.SetRemoteDescription(offer)
if err != nil {
panic(err)
}
var payloadType uint8
for i := range audioCodecs {
if audioCodecs[i].Name == webrtc.Opus && audioCodecs[i].ClockRate == audio.ClockRate*1000 && audioCodecs[i].Channels == audio.Channels {
payloadType = audioCodecs[i].PayloadType
break
}
}
if payloadType == 0 {
c.ClosePeerConn(peerConnID)
return nil, errors.New("no payloadType")
}
name := "harmony-audio-" + strconv.Itoa(peerConnID)
track, err := pc.NewTrack(payloadType, 1000+uint32(peerConnID), name, name)
if err != nil {
panic(err)
}
direction := webrtc.RTPTransceiverDirectionSendonly
if peerConnID == 0 {
direction = webrtc.RTPTransceiverDirectionSendrecv
}
_, err = pc.AddTransceiverFromTrack(track, webrtc.RtpTransceiverInit{Direction: direction})
if err != nil {
panic(err)
}
c.AudioOut.AddTrack(track)
pc.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) {
var p *rtp.Packet
for {
p, err = remoteTrack.ReadRTP()
if err != nil {
c.ClosePeerConn(peerConnID)
log.Printf("failed to read RTP from %d", c.ID)
return
}
w.ClientsLock.Lock()
if c.Channel == nil {
continue
}
// TODO trim initial x ms transmitting to remove noise (configurable)
if c.AudioIn.StartTransmit() {
for _, wc := range c.Channel.Clients {
if len(wc.AudioOut.Tracks) > 0 {
wc.Out <- &agent.Message{T: agent.MessageTransmitStart, S: c.ID}
}
}
}
for ci, wc := range c.Channel.Clients {
if ci == c.ID {
continue
}
wc.AudioOut.Write(p, c.ID)
}
w.ClientsLock.Unlock()
}
})
pc.OnConnectionStateChange(func(connectionState webrtc.PeerConnectionState) {
log.Printf("%d conn state -> %s\n", c.ID, connectionState)
if peerConnID != 0 {
return // Process events from first PeerConn only
}
if connectionState == webrtc.PeerConnectionStateDisconnected {
w.quitChannel(c)
c.CloseAudio()
w.updateUserList()
}
})
/*pc.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
log.Printf("%d ice state -> %s\n", c.ID, connectionState)
})*/
answerOptions := &webrtc.AnswerOptions{OfferAnswerOptions: webrtc.OfferAnswerOptions{VoiceActivityDetection: false}}
// TODO webrtc does not yet support AnswerOptions
answerOptions = nil
answer, err := pc.CreateAnswer(answerOptions)
if err != nil {
panic(err)
}
err = pc.SetLocalDescription(answer)
if err != nil {
panic(err)
}
return []byte(answer.SDP), nil
}