Reconnect automatically to queen bee

This commit is contained in:
Trevor Slocum 2023-05-02 22:36:43 -07:00
parent fbc69bfb5d
commit 10efc654b3
5 changed files with 111 additions and 67 deletions

View File

@ -7,27 +7,7 @@ import (
"code.rocketnine.space/tslocum/beehive"
)
type Config struct {
// Path to festoons.
Festoons string
// Path to deployments.
Deployments string
// Address to connect to queen bee.
Queen string
// Worker IP address.
IP string
// Unique ID.
ID int
// Password.
Password string
}
var config = &Config{}
var config = &beehive.WorkerConfig{}
func parseConfig(configPath string) {
if strings.TrimSpace(configPath) == "" {
@ -51,13 +31,13 @@ func parseConfig(configPath string) {
log.Fatal("password is required")
}
if strings.TrimSpace(config.Deployments) == "" {
if strings.TrimSpace(config.DeploymentsDir) == "" {
log.Fatal("deployments path is required")
}
if strings.TrimSpace(config.Festoons) == "" {
log.Fatal("festoons path is required")
} else if strings.HasSuffix(config.Festoons, "/") {
config.Festoons = strings.TrimSuffix(config.Festoons, "/")
if strings.TrimSpace(config.FestoonsDir) == "" {
log.Fatal("FestoonsDir path is required")
} else if strings.HasSuffix(config.FestoonsDir, "/") {
config.FestoonsDir = strings.TrimSuffix(config.FestoonsDir, "/")
}
}

View File

@ -3,7 +3,6 @@ package main
import (
"flag"
"log"
"net"
"os"
"time"
@ -24,9 +23,9 @@ func main() {
parseConfig(configPath)
// Create deployments directory if it doesn't exist.
_, err := os.Stat(config.Deployments)
_, err := os.Stat(config.DeploymentsDir)
if os.IsNotExist(err) {
err = os.MkdirAll(config.Deployments, 0700)
err = os.MkdirAll(config.DeploymentsDir, 0700)
if err != nil {
log.Fatal(err)
}
@ -44,23 +43,8 @@ func main() {
}
log.Println("Connected to Docker successfully")
worker = beehive.NewWorker(config.ID, config.IP, config.Festoons, config.Deployments)
worker = beehive.NewWorker(config)
var conn net.Conn
for {
conn, err = net.Dial("tcp", config.Queen)
if err != nil {
log.Printf("Failed to connect to queen: %s", err)
time.Sleep(retryDelay)
continue
}
break
}
client := beehive.NewClient(conn)
client.Authenticate(config.ID, config.Password)
go worker.HandleRead(client)
_ = client
_ = worker
select {}
}

View File

@ -74,6 +74,15 @@ var DeploymentStatusLabels = map[string]DeploymentStatus{
"update": StatusUpdate,
}
func DeploymentStatusLabel(status DeploymentStatus) string {
for str, i := range DeploymentStatusLabels {
if i == status {
return str
}
}
return ""
}
var RecordedDeploymentStatuses = []DeploymentStatus{
StatusCreate,
StatusDestroy,
@ -162,10 +171,9 @@ func (d *Deployment) Interpolate(filePath string, customValues map[string]string
}
replacements := map[string]string{
"BEEHIVE_VAR_ID": fmt.Sprintf("%s-%d", d.Festoon, d.ID),
"BEEHIVE_VAR_IP": d.Worker.IP,
"BEEHIVE_VAR_QUOTED_NAME": defaultName,
"BEEHIVE_VAR_NAME": defaultName,
"ID": fmt.Sprintf("%s-%d", d.Festoon, d.ID),
"IP": d.Worker.IP,
"NAME": defaultName,
}
// TODO passwords, remain the same between redeploys (stored in postgres, sent as custom replacement)
for customKey, customValue := range customValues {
@ -175,16 +183,16 @@ func (d *Deployment) Interpolate(filePath string, customValues map[string]string
// Add quoted variables.
newReplacements := make(map[string]string)
for original, replacement := range replacements {
newReplacements[original] = replacement
newReplacements[original] = strings.ReplaceAll(replacement, "BEEHIVE_VAR_", "BEEHIVE-VAR-")
quotedLabel := "BEEHIVE_VAR_QUOTED_" + original[10:]
quotedLabel := "QUOTED_" + original
quotedValue := fmt.Sprintf(`"%s"`, strings.ReplaceAll(replacements[original], `"`, `\"`))
replacements[quotedLabel] = quotedValue
}
// Apply substitutions.
for original, replacement := range replacements {
replacement = strings.ReplaceAll(replacement, "BEEHIVE_VAR_", "BEEHIVE-VAR-")
buf = bytes.ReplaceAll(buf, []byte(original), []byte(replacement))
buf = bytes.ReplaceAll(buf, []byte("BEEHIVE_VAR_"+original), []byte(replacement))
}
configPorts := make(map[string]bool)
@ -207,6 +215,8 @@ func (d *Deployment) Interpolate(filePath string, customValues map[string]string
return i
})
buf = bytes.ReplaceAll(buf, []byte("BEEHIVE-VAR-"), []byte("BEEHIVE_VAR_"))
return buf, nil
}

View File

@ -8,6 +8,7 @@ import (
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
)
@ -148,6 +149,24 @@ func (s *Server) handleRead(c *Client) {
log.Fatal(err)
}
log.Printf("health check time: %.0fms", float64(time.Now().UnixNano()-resultTime)/1000000)
for key, value := range result.Parameters {
if strings.HasPrefix(key, "events_") {
var events []DeploymentEvent
err := json.Unmarshal([]byte(value), &events)
if err != nil {
log.Fatal(err)
}
log.Printf("deployment %s events:", key[7:])
for _, event := range events {
t := time.Unix(event.Time, 0)
label := DeploymentStatusLabel(event.Status)
if label == "" {
label = "(UNKNOWN)"
}
log.Printf(" %s %s", t.Format(time.DateTime), label)
}
}
}
case TaskDeploy:
switch result.Parameters["status"] {
case "ok":

View File

@ -6,48 +6,66 @@ import (
"encoding/json"
"fmt"
"log"
"net"
"os"
"path"
"strconv"
"time"
)
type Worker struct {
ID int
const retryDelay = time.Second * 2
type WorkerConfig struct {
// Path to festoons.
FestoonsDir string
// Path to deployments.
DeploymentsDir string
// Address to connect to queen bee.
Queen string
// Worker IP address.
IP string
FestoonsDir string
DeploymentsDir string
// Unique ID.
ID int
// Password.
Password string
}
type Worker struct {
WorkerConfig
Deployments []*Deployment
TaskQueue chan *TaskMessage
requestPortsFunc func(d *Deployment) []int
reconnect chan struct{}
}
func NewWorker(id int, ip string, festoonsDir string, deploymentsDir string) *Worker {
func NewWorker(config *WorkerConfig) *Worker {
w := &Worker{
ID: id,
IP: ip,
FestoonsDir: festoonsDir,
DeploymentsDir: deploymentsDir,
TaskQueue: make(chan *TaskMessage),
WorkerConfig: *config,
TaskQueue: make(chan *TaskMessage),
reconnect: make(chan struct{}),
}
go w.handleTaskQueue()
log.Println("Loading deployments...")
// TODO list directories in deployments dir, no db needed
dirEntries, err := os.ReadDir(deploymentsDir)
dirEntries, err := os.ReadDir(config.DeploymentsDir)
if err != nil {
log.Fatal(err)
}
for _, dirEntry := range dirEntries {
if dirEntry.IsDir() {
log.Println(dirEntry.Name())
d, err := LoadDeployment(path.Join(deploymentsDir, dirEntry.Name()))
d, err := LoadDeployment(path.Join(config.DeploymentsDir, dirEntry.Name()))
if err != nil {
log.Fatalf("failed to load deployment %s: %s", dirEntry.Name(), err)
}
@ -57,12 +75,41 @@ func NewWorker(id int, ip string, festoonsDir string, deploymentsDir string) *Wo
w.Deployments = append(w.Deployments, d)
}
}
log.Println("Finished loading deployments")
go w.handleConnect()
return w
}
func (w *Worker) handleConnect() {
log.Printf("Connecting to queen bee at %s...", w.Queen)
for {
var conn net.Conn
var err error
for {
conn, err = net.Dial("tcp", w.Queen)
if err != nil {
log.Printf("Failed to connect to queen: %s", err)
log.Println("Retrying in 2 seconds...")
time.Sleep(retryDelay)
continue
}
break
}
log.Println("Connected")
client := NewClient(conn)
client.Authenticate(w.ID, w.Password)
go w.HandleRead(client)
<-w.reconnect
log.Println("Reconnecting...")
}
}
func (w *Worker) handleTaskQueue() {
for t := range w.TaskQueue {
w.ExecuteTask(t)
@ -189,4 +236,8 @@ func (w *Worker) HandleRead(c *Client) {
log.Fatalf("unknown task type %d", task.Type)
}
}
log.Println("Connection lost")
w.reconnect <- struct{}{}
}