You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

250 lines
5.3 KiB
Go

package beehive
import (
"bufio"
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
)
const (
minPort = 10000
maxPort = 49151
)
const serverWelcomeMessage = "BUZZ"
type ServerAuthentication struct {
Worker int
Password string
}
type Server struct {
Address string
WorkerConfig map[int]string
Clients []*Client
ClientsLock sync.Mutex
}
func NewServer(address string, workerConfig map[int]string) (*Server, error) {
s := &Server{
Address: address,
WorkerConfig: workerConfig,
}
go s.listen()
return s, nil
}
func (s *Server) listen() {
listener, err := net.Listen("tcp", s.Address)
if err != nil {
log.Fatalf("failed to listen on %s: %s", s.Address, err)
}
var conn net.Conn
for {
conn, err = listener.Accept()
if err != nil {
log.Fatalf("failed to accept connection on %s: %s", s.Address, err)
}
client := NewClient(conn)
go s.handleConnection(client)
}
}
func (s *Server) sendTestingTask(c *Client) {
t := NewTask(TaskHealth, map[string]string{
"time": fmt.Sprintf("%d", time.Now().UnixNano()),
})
s.sendTask(c.Worker, t)
ports := []int{10500, 10501, 10502, 10503, 10504, 10505, 10506, 10507, 10508, 10509}
d := &Deployment{
ID: 1,
Client: 7,
Festoon: "openttd",
UID: 7777,
Ports: ports,
}
parameters := map[string]string{
"id": fmt.Sprintf("%d", d.ID),
"client": fmt.Sprintf("%d", d.Client),
"festoon": d.Festoon,
"uid": fmt.Sprintf("%d", d.UID),
}
for i := range ports {
parameters[fmt.Sprintf("port_%d", i)] = fmt.Sprintf("%d", ports[i])
}
t = NewTask(TaskDeploy, parameters)
s.sendTask(c.Worker, t)
time.Sleep(time.Second * 10)
t = NewTask(TaskHealth, map[string]string{
"time": fmt.Sprintf("%d", time.Now().UnixNano()),
})
s.sendTask(c.Worker, t)
}
func (s *Server) handleRead(c *Client) {
var readMessage bool
scanner := bufio.NewScanner(c.Conn)
for scanner.Scan() {
log.Printf(" <- %s", scanner.Bytes())
// Authenticate.
if !readMessage {
challenge := &ServerAuthentication{}
err := json.Unmarshal(scanner.Bytes(), challenge)
if err != nil {
// TODO terminate
return
}
if challenge.Worker <= 0 {
// TODO terminate
return
}
password, ok := s.WorkerConfig[challenge.Worker]
if ok && password != "" && password == challenge.Password {
c.Conn.Write([]byte(serverWelcomeMessage + "\n"))
c.Worker = challenge.Worker
log.Printf("Worker %d connected", c.Worker)
// TODO
s.sendTestingTask(c)
readMessage = true
continue
}
// TODO terminate
return
}
result := &Result{}
err := json.Unmarshal(scanner.Bytes(), result)
if err != nil {
log.Fatalf("failed to unmarshal %s: %s", scanner.Bytes(), err)
}
log.Printf(" <- result: %+v", result)
switch result.Type {
case TaskHealth:
resultTime, err := strconv.ParseInt(result.Parameters["time"], 10, 64)
if err != nil {
// TODO disconnect worker
log.Fatal(err)
}
log.Printf("health check time: %.0fms", float64(time.Now().UnixNano()-resultTime)/1000000)
for key, value := range result.Parameters {
if strings.HasPrefix(key, "events_") {
var events []DeploymentEvent
err := json.Unmarshal([]byte(value), &events)
if err != nil {
log.Fatal(err)
}
log.Printf("deployment %s events:", key[7:])
for _, event := range events {
t := time.Unix(event.Time, 0)
label := DeploymentStatusLabel(event.Status)
if label == "" {
label = "(UNKNOWN)"
}
log.Printf(" %s %s", t.Format(time.DateTime), label)
}
}
}
case TaskDeploy:
switch result.Parameters["status"] {
case "ok":
log.Printf("Deployment %s completed", result.Parameters["id"])
default:
log.Fatalf("unknown deployment status: %s", result.Parameters["status"])
}
default:
log.Fatalf("unknown result type %d", result.Type)
}
}
}
func (s *Server) allocatePorts(d *Deployment) {
const allocatePortRange = 10
// TODO check if port range is already allocated, if so return that
startPort := minPort + rand.Intn(rand.Intn(maxPort)-minPort-allocatePortRange)
startPort -= startPort % allocatePortRange
ports := make([]int, allocatePortRange)
for i := range ports {
ports[i] = startPort + i
}
d.Ports = ports
}
func (s *Server) handleConnection(c *Client) {
s.ClientsLock.Lock()
// Remove existing client.
for i, client := range s.Clients {
if client.Worker == c.Worker {
client.Conn.Close()
s.Clients = append(s.Clients[:i], s.Clients[i+1:]...)
log.Printf("Dropped existing connection with worker %d", c.Worker)
break
}
}
// Add to clients list.
s.Clients = append(s.Clients, c)
s.ClientsLock.Unlock()
s.handleRead(c)
s.ClientsLock.Lock()
defer s.ClientsLock.Unlock()
for i, client := range s.Clients {
if client.Worker == c.Worker {
s.Clients = append(s.Clients[:i], s.Clients[i+1:]...)
return
}
}
// TODO remove from clients list with mutex
log.Printf("Worker %d disconnected", c.Worker)
}
func (s *Server) sendTask(workerID int, t *TaskMessage) bool {
for i := range s.Clients {
if s.Clients[i].Worker == workerID {
taskJSON, err := json.Marshal(t)
if err != nil {
log.Fatal(err)
}
if t.Type == TaskDeploy {
log.Printf("Deployment %s initiated", t.Parameters["id"])
}
s.Clients[i].Out <- append(taskJSON, byte('\n'))
return true
}
}
return false
}