package web import ( "bytes" "encoding/json" "log" "net/http" "sort" "strconv" "strings" "sync" "time" "github.com/pion/interceptor" "code.rocketnine.space/tslocum/harmony/pkg/agent" "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/pion/rtp" "github.com/pion/webrtc/v3" "github.com/pkg/errors" "gitlab.com/golang-commonmark/markdown" ) var peerConnectionConfig = webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } var assets http.FileSystem var incomingClients = make(chan *agent.Client, 10) var markdownRenderer = markdown.New(markdown.Typographer(false), markdown.Breaks(true), markdown.Quotes([]string{`"`, `"`, `'`, `'`})) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true }, EnableCompression: true, } type WebInterface struct { Clients map[int]*agent.Client ClientsLock *sync.Mutex Channels map[int]*agent.Channel ChannelsLock *sync.Mutex } func NewWebInterface(address string, path string) *WebInterface { if assets == nil { panic("failed to load web assets") } w := WebInterface{Clients: make(map[int]*agent.Client), ClientsLock: new(sync.Mutex), Channels: make(map[int]*agent.Channel), ChannelsLock: new(sync.Mutex)} w.createChannels() go w.handleIncomingClients() go w.handleExpireTransmit() r := mux.NewRouter() r.HandleFunc(path+"w", w.webSocketHandler) r.PathPrefix(path).Handler(http.StripPrefix(path, http.FileServer(assets))) addressSplit := strings.Split(address, ",") for _, add := range addressSplit { add := add // Capture go func() { if err := http.ListenAndServe(add, r); err != nil { log.Fatal("failed to listen on address ", add, ":", err) } }() } return &w } func (w *WebInterface) nextChannelID() int { id := 0 for cid := range w.Channels { if cid > id { id = cid } } return id + 1 } func (w *WebInterface) createChannels() { w.ChannelsLock.Lock() defer w.ChannelsLock.Unlock() // TODO Load channels from database } func (w *WebInterface) AddChannel(t agent.ChannelType, name string, topic string) { w.ChannelsLock.Lock() defer w.ChannelsLock.Unlock() ch := agent.NewChannel(w.nextChannelID(), t) ch.Name = name ch.Topic = topic w.Channels[ch.ID] = ch } func (w *WebInterface) handleIncomingClients() { for c := range incomingClients { c := c // Capture w.ClientsLock.Lock() id := w.nextClientID() c.ID = id w.Clients[id] = c go func(c *agent.Client) { time.Sleep(500 * time.Millisecond) c.Connected = true w.updateUserList() w.ClientsLock.Lock() for _, wc := range w.Clients { wc.Out <- &agent.Message{T: agent.MessageConnect, N: c.Name, M: []byte(c.Name)} } w.ClientsLock.Unlock() }(c) w.ClientsLock.Unlock() w.sendChannelList(c) go w.handleRead(c) } } func (w *WebInterface) handleExpireTransmit() { t := time.NewTicker(250 * time.Millisecond) for range t.C { w.ClientsLock.Lock() for _, wc := range w.Clients { if wc.AudioIn.ExpireTransmit() { for _, wcc := range wc.Channel.Clients { if len(wcc.AudioOut.Tracks) > 0 { wcc.Out <- &agent.Message{T: agent.MessageTransmitStop, S: wc.ID} } } } } w.ClientsLock.Unlock() } } func (w *WebInterface) handleRead(c *agent.Client) { for msg := range c.In { if msg == nil { return } if msg.T != agent.MessagePing { log.Printf("%d -> %s %d", msg.S, msg.T, len(msg.M)) } switch msg.T { case agent.MessageBinary: // TODO Binary message continue case agent.MessagePing: c.Out <- &agent.Message{T: agent.MessagePong, M: msg.M} case agent.MessageCall: answer, err := w.answerRTC(c, msg.PC, msg.M) if err != nil { log.Printf("failed to answer call: %s", err) continue } c.Out <- &agent.Message{T: agent.MessageAnswer, PC: msg.PC, M: answer} case agent.MessageChat: if bytes.HasPrefix(bytes.ToLower(msg.M), []byte("/nick ")) { go func(mm []byte) { c.In <- &agent.Message{S: c.ID, T: agent.MessageNick, M: mm} }(msg.M[6:]) continue } log.Printf("<%s> %s", c.Name, msg.M) msg.M = bytes.TrimSpace([]byte(markdownRenderer.RenderToString(msg.M))) if bytes.Count(msg.M, []byte("

")) == 1 && bytes.Count(msg.M, []byte("

")) == 1 && bytes.HasPrefix(msg.M, []byte("

")) && bytes.HasSuffix(msg.M, []byte("

")) { msg.M = msg.M[3 : len(msg.M)-4] } msg.M = bytes.TrimSpace(bytes.ReplaceAll(msg.M, []byte(`= 3 { return nil, errors.New("already have 3 peerconns") // TODO configurable }*/ if c.PeerConns[peerConnID] != nil { return nil, errors.New("already have next peerconn") } offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(offerSDP)} codecParameters := webrtc.RTPCodecParameters{ RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 1, SDPFmtpLine: "", RTCPFeedback: nil}, } m := &webrtc.MediaEngine{} err := m.RegisterCodec(codecParameters, webrtc.RTPCodecTypeAudio) if err != nil { panic(err) } i := &interceptor.Registry{} if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil { panic(err) } api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)) pc, err := api.NewPeerConnection(peerConnectionConfig) if err != nil { panic(err) } c.PeerConns[peerConnID] = pc if _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { panic(err) } err = pc.SetRemoteDescription(offer) if err != nil { panic(err) } name := "harmony-audio-" + strconv.Itoa(peerConnID) track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, name, name) if err != nil { panic(err) } /*direction := webrtc.RTPTransceiverDirectionSendonly if peerConnID == 0 { direction = webrtc.RTPTransceiverDirectionSendrecv } _, err = pc.AddTransceiverFromTrack(track, webrtc.RTPTransceiverInit{Direction: direction}) if err != nil { panic(err) }*/ c.AudioOut.AddTrack(track) _, err = pc.AddTrack(track) if err != nil { c.ClosePeerConn(peerConnID) log.Printf("failed to add track to %d: %s", c.ID, err) return nil, err } pc.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { log.Println("ON REMOTE TRACK") var p *rtp.Packet for { p, _, err = remoteTrack.ReadRTP() if err != nil { c.ClosePeerConn(peerConnID) log.Printf("failed to read RTP from %d", c.ID) return } // TODO log.Println("READ RTP!!!") w.ClientsLock.Lock() if c.Channel == nil { continue } // TODO trim initial x ms transmitting to remove noise (configurable) if c.AudioIn.StartTransmit() { for _, wc := range c.Channel.Clients { if len(wc.AudioOut.Tracks) > 0 { wc.Out <- &agent.Message{T: agent.MessageTransmitStart, S: c.ID} } } } for ci, wc := range c.Channel.Clients { if ci == c.ID { continue } wc.AudioOut.Write(p, c.ID) } w.ClientsLock.Unlock() } }) pc.OnConnectionStateChange(func(connectionState webrtc.PeerConnectionState) { log.Printf("%d conn state -> %s\n", c.ID, connectionState) if peerConnID != 0 { return // Process events from first PeerConn only } if connectionState == webrtc.PeerConnectionStateDisconnected { w.quitChannel(c) c.CloseAudio() w.updateUserList() } }) /*pc.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { log.Printf("%d ice state -> %s\n", c.ID, connectionState) })*/ answerOptions := &webrtc.AnswerOptions{OfferAnswerOptions: webrtc.OfferAnswerOptions{VoiceActivityDetection: false}} // TODO webrtc does not yet support AnswerOptions answerOptions = nil answer, err := pc.CreateAnswer(answerOptions) if err != nil { panic(err) } err = pc.SetLocalDescription(answer) if err != nil { panic(err) } return []byte(answer.SDP), nil }