beehive/worker.go

150 lines
3.1 KiB
Go

package beehive
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"log"
"strconv"
"time"
)
type Worker struct {
ID int
IP string
FestoonsDir string
DeploymentsDir string
Deployments []*Deployment
TaskQueue chan *Task
requestPortsFunc func(d *Deployment) []int
}
func NewWorker(id int, ip string, festoonsDir string, deploymentsDir string) *Worker {
w := &Worker{
ID: id,
IP: ip,
FestoonsDir: festoonsDir,
DeploymentsDir: deploymentsDir,
TaskQueue: make(chan *Task),
}
go w.handleTaskQueue()
return w
}
func (w *Worker) handleTaskQueue() {
for t := range w.TaskQueue {
w.ExecuteTask(t)
}
}
func (w *Worker) ExecuteTask(t *Task) error {
return nil
}
func (w *Worker) HandleRead(c *Client) {
var readFirst bool
scanner := bufio.NewScanner(c.Conn)
for scanner.Scan() {
log.Printf(" <- %s", scanner.Bytes())
if !readFirst {
if !bytes.Equal(scanner.Bytes(), []byte(serverWelcomeMessage)) {
log.Fatalf("unexpected server reply: %s", scanner.Bytes())
}
readFirst = true
continue
}
task := &Task{}
err := json.Unmarshal(scanner.Bytes(), task)
if err != nil {
log.Fatalf("failed to unmarshal %s: %s", scanner.Bytes(), err)
}
log.Printf(" <- task: %+v", task)
switch task.Type {
case TaskHealth:
result := NewResult(TaskHealth, map[string]string{
"time": task.Parameters["time"],
})
for _, d := range w.Deployments {
eventsJson, err := json.Marshal(d.Events)
if err != nil {
log.Fatal(err)
}
result.Parameters[fmt.Sprintf("events_%d", d.ID)] = string(eventsJson)
d.Events = d.Events[:0]
// TODO deployment mutex
}
resultJson, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal result %+v: %s", result, err)
}
c.Out <- append(resultJson, '\n')
case TaskDeploy:
id, err := strconv.Atoi(task.Parameters["id"])
if err != nil {
log.Fatalf("failed to parse id: %s", err)
}
client, err := strconv.Atoi(task.Parameters["client"])
if err != nil {
log.Fatalf("failed to parse client: %s", err)
}
// TODO
ports := make([]int, 10)
for i := range ports {
ports[i], err = strconv.Atoi(task.Parameters[fmt.Sprintf("port_%d", i)])
if err != nil {
log.Fatalf("failed to parse port %d: %s", i, err)
}
}
d := &Deployment{
ID: id,
Client: client,
Festoon: task.Parameters["festoon"],
UID: 7777,
Ports: ports,
Worker: w,
}
go d.handleEvents()
time.Sleep(10 * time.Millisecond) // Give events handler some time to attach.
err = d.deploy()
if err != nil {
log.Fatalf("failed to deploy %+v: %s", d, err)
}
w.Deployments = append(w.Deployments, d)
// Send result
result := NewResult(TaskDeploy, map[string]string{
"id": task.Parameters["id"],
"status": "ok",
})
resultJson, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal result %+v: %s", result, err)
}
c.Out <- append(resultJson, '\n')
default:
log.Fatalf("unknown task type %d", task.Type)
}
}
}