You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

244 lines
4.8 KiB
Go

package beehive
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"log"
"net"
"os"
"path"
"strconv"
"time"
)
const retryDelay = time.Second * 2
type WorkerConfig struct {
// Path to festoons.
FestoonsDir string
// Path to deployments.
DeploymentsDir string
// Address to connect to queen bee.
Queen string
// Worker IP address.
IP string
// Unique ID.
ID int
// Password.
Password string
}
type Worker struct {
WorkerConfig
Deployments []*Deployment
TaskQueue chan *TaskMessage
requestPortsFunc func(d *Deployment) []int
reconnect chan struct{}
}
func NewWorker(config *WorkerConfig) *Worker {
w := &Worker{
WorkerConfig: *config,
TaskQueue: make(chan *TaskMessage),
reconnect: make(chan struct{}),
}
go w.handleTaskQueue()
log.Println("Loading deployments...")
// TODO list directories in deployments dir, no db needed
dirEntries, err := os.ReadDir(config.DeploymentsDir)
if err != nil {
log.Fatal(err)
}
for _, dirEntry := range dirEntries {
if dirEntry.IsDir() {
log.Println(dirEntry.Name())
d, err := LoadDeployment(path.Join(config.DeploymentsDir, dirEntry.Name()))
if err != nil {
log.Fatalf("failed to load deployment %s: %s", dirEntry.Name(), err)
}
d.Worker = w
w.Deployments = append(w.Deployments, d)
}
}
log.Println("Finished loading deployments")
go w.handleConnect()
return w
}
func (w *Worker) handleConnect() {
log.Printf("Connecting to queen bee at %s...", w.Queen)
for {
var conn net.Conn
var err error
for {
conn, err = net.Dial("tcp", w.Queen)
if err != nil {
log.Printf("Failed to connect to queen: %s", err)
log.Println("Retrying in 2 seconds...")
time.Sleep(retryDelay)
continue
}
break
}
log.Println("Connected")
client := NewClient(conn)
client.Authenticate(w.ID, w.Password)
go w.HandleRead(client)
<-w.reconnect
log.Println("Reconnecting...")
}
}
func (w *Worker) handleTaskQueue() {
for t := range w.TaskQueue {
w.ExecuteTask(t)
}
}
func (w *Worker) ExecuteTask(t *TaskMessage) error {
return nil
}
func (w *Worker) HandleRead(c *Client) {
var readFirst bool
scanner := bufio.NewScanner(c.Conn)
for scanner.Scan() {
log.Printf(" <- %s", scanner.Bytes())
if !readFirst {
if !bytes.Equal(scanner.Bytes(), []byte(serverWelcomeMessage)) {
log.Fatalf("unexpected server reply: %s", scanner.Bytes())
}
readFirst = true
continue
}
task := &TaskMessage{}
err := json.Unmarshal(scanner.Bytes(), task)
if err != nil {
log.Fatalf("failed to unmarshal %s: %s", scanner.Bytes(), err)
}
log.Printf(" <- task: %+v", task)
switch task.Type {
case TaskHealth:
result := NewResult(TaskHealth, map[string]string{
"time": task.Parameters["time"],
})
for _, d := range w.Deployments {
eventsJson, err := json.Marshal(d.Events)
if err != nil {
log.Fatal(err)
}
result.Parameters[fmt.Sprintf("events_%d", d.ID)] = string(eventsJson)
d.Events = d.Events[:0]
// TODO deployment mutex
}
resultJson, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal result %+v: %s", result, err)
}
c.Out <- append(resultJson, '\n')
case TaskDeploy:
id, err := strconv.Atoi(task.Parameters["id"])
if err != nil {
log.Fatalf("failed to parse id: %s", err)
}
client, err := strconv.Atoi(task.Parameters["client"])
if err != nil {
log.Fatalf("failed to parse client: %s", err)
}
// TODO
ports := make([]int, 10)
for i := range ports {
ports[i], err = strconv.Atoi(task.Parameters[fmt.Sprintf("port_%d", i)])
if err != nil {
log.Fatalf("failed to parse port %d: %s", i, err)
}
}
var d *Deployment
var newDeployment bool
for _, deployment := range w.Deployments {
if id == deployment.ID {
d = deployment
d.Client = client
d.Festoon = task.Parameters["festoon"]
d.Ports = ports
break
}
}
if d == nil {
d = &Deployment{
ID: id,
Client: client,
Festoon: task.Parameters["festoon"],
UID: 7777,
Ports: ports,
Worker: w,
}
newDeployment = true
go d.handleEvents()
time.Sleep(10 * time.Millisecond) // Give events handler some time to attach.
}
err = d.deploy()
if err != nil {
log.Fatalf("failed to deploy %+v: %s", d, err)
}
if newDeployment {
w.Deployments = append(w.Deployments, d)
}
// Send result
result := NewResult(TaskDeploy, map[string]string{
"id": task.Parameters["id"],
"status": "ok",
})
resultJson, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal result %+v: %s", result, err)
}
c.Out <- append(resultJson, '\n')
default:
log.Fatalf("unknown task type %d", task.Type)
}
}
log.Println("Connection lost")
w.reconnect <- struct{}{}
}