package beehive import ( "bufio" "encoding/json" "fmt" "log" "math/rand" "net" "strconv" "strings" "sync" "time" ) const ( minPort = 10000 maxPort = 49151 ) const serverWelcomeMessage = "BUZZ" type ServerAuthentication struct { Worker int Password string } type Server struct { Address string WorkerConfig map[int]string Clients map[int]*Client ClientsLock sync.Mutex } func NewServer(address string, workerConfig map[int]string) (*Server, error) { s := &Server{ Address: address, WorkerConfig: workerConfig, Clients: make(map[int]*Client), } go s.listen() return s, nil } func (s *Server) listen() { listener, err := net.Listen("tcp", s.Address) if err != nil { log.Fatalf("failed to listen on %s: %s", s.Address, err) } var conn net.Conn for { conn, err = listener.Accept() if err != nil { log.Fatalf("failed to accept connection on %s: %s", s.Address, err) } client := NewClient(conn) go s.handleConnection(client) } } func (s *Server) sendTestingTask(c *Client) { t := NewTask(TaskHealth, map[string]string{ "time": fmt.Sprintf("%d", time.Now().UnixNano()), }) s.sendTask(c.Worker, t) ports := []int{10500, 10501, 10502, 10503, 10504, 10505, 10506, 10507, 10508, 10509} d := &Deployment{ ID: 1, Festoon: "openttd", } parameters := map[string]string{ "id": fmt.Sprintf("%d", d.ID), "festoon": d.Festoon, } for i := range ports { parameters[fmt.Sprintf("port_%d", i)] = fmt.Sprintf("%d", ports[i]) } parameters["ports"] = strconv.Itoa(len(ports)) t = NewTask(TaskDeploy, parameters) s.sendTask(c.Worker, t) time.Sleep(time.Second * 10) t = NewTask(TaskHealth, map[string]string{ "time": fmt.Sprintf("%d", time.Now().UnixNano()), }) s.sendTask(c.Worker, t) } func (s *Server) addClient(c *Client) { s.ClientsLock.Lock() defer s.ClientsLock.Unlock() // Remove existing client. existing := s.Clients[c.Worker] if existing != nil { existing.Conn.Close() log.Printf("Dropped existing connection with worker %d", c.Worker) } s.Clients[c.Worker] = c } func (s *Server) handleRead(c *Client) { var readMessage bool scanner := bufio.NewScanner(c.Conn) for scanner.Scan() { log.Printf(" <- %s", scanner.Bytes()) // Authenticate. if !readMessage { challenge := &ServerAuthentication{} err := json.Unmarshal(scanner.Bytes(), challenge) if err != nil { // TODO terminate return } if challenge.Worker <= 0 { // TODO terminate return } password, ok := s.WorkerConfig[challenge.Worker] if ok && password != "" && password == challenge.Password { c.Conn.Write([]byte(serverWelcomeMessage + "\n")) c.Worker = challenge.Worker log.Printf("Worker %d connected", c.Worker) s.addClient(c) // TODO s.sendTestingTask(c) readMessage = true continue } // TODO terminate return } result := &Result{} err := json.Unmarshal(scanner.Bytes(), result) if err != nil { log.Fatalf("failed to unmarshal %s: %s", scanner.Bytes(), err) } log.Printf(" <- result: %+v", result) switch result.Type { case TaskHealth: resultTime, err := strconv.ParseInt(result.Parameters["time"], 10, 64) if err != nil { // TODO disconnect worker log.Fatal(err) } log.Printf("health check time: %.0fms", float64(time.Now().UnixNano()-resultTime)/1000000) for key, value := range result.Parameters { if strings.HasPrefix(key, "events_") { var events []DeploymentEvent err := json.Unmarshal([]byte(value), &events) if err != nil { log.Fatal(err) } log.Printf("deployment %s events:", key[7:]) for _, event := range events { t := time.Unix(event.Time, 0) label := DeploymentStatusLabel(event.Status) if label == "" { label = "(UNKNOWN)" } log.Printf(" %s %s", t.Format(time.DateTime), label) } } } case TaskDeploy: switch result.Parameters["status"] { case "ok": log.Printf("Deployment %s completed", result.Parameters["id"]) default: log.Fatalf("unknown deployment status: %s", result.Parameters["status"]) } default: log.Fatalf("unknown result type %d", result.Type) } } } func (s *Server) allocatePorts(d *Deployment) { const allocatePortRange = 10 // TODO check if port range is already allocated, if so return that startPort := minPort + rand.Intn(rand.Intn(maxPort)-minPort-allocatePortRange) startPort -= startPort % allocatePortRange ports := make([]int, allocatePortRange) for i := range ports { ports[i] = startPort + i } } func (s *Server) handleConnection(c *Client) { s.handleRead(c) s.ClientsLock.Lock() defer s.ClientsLock.Unlock() delete(s.Clients, c.Worker) // TODO remove from clients list with mutex log.Printf("Worker %d disconnected", c.Worker) } func (s *Server) sendTask(workerID int, t *TaskMessage) bool { client := s.Clients[workerID] if client == nil { return false } taskJSON, err := json.Marshal(t) if err != nil { log.Fatal(err) } if t.Type == TaskDeploy { log.Printf("Deployment %s initiated", t.Parameters["id"]) } client.Out <- append(taskJSON, byte('\n')) return true }