You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
250 lines
5.3 KiB
Go
250 lines
5.3 KiB
Go
package beehive
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
minPort = 10000
|
|
maxPort = 49151
|
|
)
|
|
|
|
const serverWelcomeMessage = "BUZZ"
|
|
|
|
type ServerAuthentication struct {
|
|
Worker int
|
|
Password string
|
|
}
|
|
|
|
type Server struct {
|
|
Address string
|
|
|
|
WorkerConfig map[int]string
|
|
|
|
Clients []*Client
|
|
ClientsLock sync.Mutex
|
|
}
|
|
|
|
func NewServer(address string, workerConfig map[int]string) (*Server, error) {
|
|
s := &Server{
|
|
Address: address,
|
|
WorkerConfig: workerConfig,
|
|
}
|
|
go s.listen()
|
|
return s, nil
|
|
}
|
|
|
|
func (s *Server) listen() {
|
|
listener, err := net.Listen("tcp", s.Address)
|
|
if err != nil {
|
|
log.Fatalf("failed to listen on %s: %s", s.Address, err)
|
|
}
|
|
|
|
var conn net.Conn
|
|
for {
|
|
conn, err = listener.Accept()
|
|
if err != nil {
|
|
log.Fatalf("failed to accept connection on %s: %s", s.Address, err)
|
|
}
|
|
client := NewClient(conn)
|
|
go s.handleConnection(client)
|
|
}
|
|
}
|
|
|
|
func (s *Server) sendTestingTask(c *Client) {
|
|
t := NewTask(TaskHealth, map[string]string{
|
|
"time": fmt.Sprintf("%d", time.Now().UnixNano()),
|
|
})
|
|
s.sendTask(c.Worker, t)
|
|
|
|
ports := []int{10500, 10501, 10502, 10503, 10504, 10505, 10506, 10507, 10508, 10509}
|
|
|
|
d := &Deployment{
|
|
ID: 1,
|
|
Client: 7,
|
|
Festoon: "openttd",
|
|
UID: 7777,
|
|
Ports: ports,
|
|
}
|
|
|
|
parameters := map[string]string{
|
|
"id": fmt.Sprintf("%d", d.ID),
|
|
"client": fmt.Sprintf("%d", d.Client),
|
|
"festoon": d.Festoon,
|
|
"uid": fmt.Sprintf("%d", d.UID),
|
|
}
|
|
for i := range ports {
|
|
parameters[fmt.Sprintf("port_%d", i)] = fmt.Sprintf("%d", ports[i])
|
|
}
|
|
t = NewTask(TaskDeploy, parameters)
|
|
s.sendTask(c.Worker, t)
|
|
|
|
time.Sleep(time.Second * 10)
|
|
|
|
t = NewTask(TaskHealth, map[string]string{
|
|
"time": fmt.Sprintf("%d", time.Now().UnixNano()),
|
|
})
|
|
s.sendTask(c.Worker, t)
|
|
}
|
|
|
|
func (s *Server) handleRead(c *Client) {
|
|
var readMessage bool
|
|
scanner := bufio.NewScanner(c.Conn)
|
|
for scanner.Scan() {
|
|
log.Printf(" <- %s", scanner.Bytes())
|
|
|
|
// Authenticate.
|
|
if !readMessage {
|
|
challenge := &ServerAuthentication{}
|
|
err := json.Unmarshal(scanner.Bytes(), challenge)
|
|
if err != nil {
|
|
// TODO terminate
|
|
return
|
|
}
|
|
|
|
if challenge.Worker <= 0 {
|
|
// TODO terminate
|
|
return
|
|
}
|
|
|
|
password, ok := s.WorkerConfig[challenge.Worker]
|
|
if ok && password != "" && password == challenge.Password {
|
|
c.Conn.Write([]byte(serverWelcomeMessage + "\n"))
|
|
c.Worker = challenge.Worker
|
|
|
|
log.Printf("Worker %d connected", c.Worker)
|
|
|
|
// TODO
|
|
s.sendTestingTask(c)
|
|
|
|
readMessage = true
|
|
continue
|
|
}
|
|
// TODO terminate
|
|
return
|
|
}
|
|
|
|
result := &Result{}
|
|
err := json.Unmarshal(scanner.Bytes(), result)
|
|
if err != nil {
|
|
log.Fatalf("failed to unmarshal %s: %s", scanner.Bytes(), err)
|
|
}
|
|
|
|
log.Printf(" <- result: %+v", result)
|
|
|
|
switch result.Type {
|
|
case TaskHealth:
|
|
resultTime, err := strconv.ParseInt(result.Parameters["time"], 10, 64)
|
|
if err != nil {
|
|
// TODO disconnect worker
|
|
log.Fatal(err)
|
|
}
|
|
log.Printf("health check time: %.0fms", float64(time.Now().UnixNano()-resultTime)/1000000)
|
|
for key, value := range result.Parameters {
|
|
if strings.HasPrefix(key, "events_") {
|
|
var events []DeploymentEvent
|
|
err := json.Unmarshal([]byte(value), &events)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
log.Printf("deployment %s events:", key[7:])
|
|
for _, event := range events {
|
|
t := time.Unix(event.Time, 0)
|
|
label := DeploymentStatusLabel(event.Status)
|
|
if label == "" {
|
|
label = "(UNKNOWN)"
|
|
}
|
|
log.Printf(" %s %s", t.Format(time.DateTime), label)
|
|
}
|
|
}
|
|
}
|
|
case TaskDeploy:
|
|
switch result.Parameters["status"] {
|
|
case "ok":
|
|
log.Printf("Deployment %s completed", result.Parameters["id"])
|
|
default:
|
|
log.Fatalf("unknown deployment status: %s", result.Parameters["status"])
|
|
}
|
|
default:
|
|
log.Fatalf("unknown result type %d", result.Type)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) allocatePorts(d *Deployment) {
|
|
const allocatePortRange = 10
|
|
// TODO check if port range is already allocated, if so return that
|
|
|
|
startPort := minPort + rand.Intn(rand.Intn(maxPort)-minPort-allocatePortRange)
|
|
startPort -= startPort % allocatePortRange
|
|
|
|
ports := make([]int, allocatePortRange)
|
|
for i := range ports {
|
|
ports[i] = startPort + i
|
|
}
|
|
|
|
d.Ports = ports
|
|
}
|
|
|
|
func (s *Server) handleConnection(c *Client) {
|
|
s.ClientsLock.Lock()
|
|
|
|
// Remove existing client.
|
|
for i, client := range s.Clients {
|
|
if client.Worker == c.Worker {
|
|
client.Conn.Close()
|
|
s.Clients = append(s.Clients[:i], s.Clients[i+1:]...)
|
|
log.Printf("Dropped existing connection with worker %d", c.Worker)
|
|
break
|
|
}
|
|
}
|
|
|
|
// Add to clients list.
|
|
s.Clients = append(s.Clients, c)
|
|
|
|
s.ClientsLock.Unlock()
|
|
|
|
s.handleRead(c)
|
|
|
|
s.ClientsLock.Lock()
|
|
defer s.ClientsLock.Unlock()
|
|
|
|
for i, client := range s.Clients {
|
|
if client.Worker == c.Worker {
|
|
s.Clients = append(s.Clients[:i], s.Clients[i+1:]...)
|
|
return
|
|
}
|
|
}
|
|
|
|
// TODO remove from clients list with mutex
|
|
log.Printf("Worker %d disconnected", c.Worker)
|
|
}
|
|
|
|
func (s *Server) sendTask(workerID int, t *TaskMessage) bool {
|
|
for i := range s.Clients {
|
|
if s.Clients[i].Worker == workerID {
|
|
taskJSON, err := json.Marshal(t)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
if t.Type == TaskDeploy {
|
|
log.Printf("Deployment %s initiated", t.Parameters["id"])
|
|
}
|
|
|
|
s.Clients[i].Out <- append(taskJSON, byte('\n'))
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|