From 10efc654b32c4bcce3ab67bb5b1c0bdd2ef3f5af Mon Sep 17 00:00:00 2001 From: Trevor Slocum Date: Tue, 2 May 2023 22:36:43 -0700 Subject: [PATCH] Reconnect automatically to queen bee --- cmd/workerbee/config.go | 32 ++++------------- cmd/workerbee/main.go | 24 +++---------- deployment.go | 26 +++++++++----- server.go | 19 ++++++++++ worker.go | 77 ++++++++++++++++++++++++++++++++++------- 5 files changed, 111 insertions(+), 67 deletions(-) diff --git a/cmd/workerbee/config.go b/cmd/workerbee/config.go index 37d2b84..6012ab4 100644 --- a/cmd/workerbee/config.go +++ b/cmd/workerbee/config.go @@ -7,27 +7,7 @@ import ( "code.rocketnine.space/tslocum/beehive" ) -type Config struct { - // Path to festoons. - Festoons string - - // Path to deployments. - Deployments string - - // Address to connect to queen bee. - Queen string - - // Worker IP address. - IP string - - // Unique ID. - ID int - - // Password. - Password string -} - -var config = &Config{} +var config = &beehive.WorkerConfig{} func parseConfig(configPath string) { if strings.TrimSpace(configPath) == "" { @@ -51,13 +31,13 @@ func parseConfig(configPath string) { log.Fatal("password is required") } - if strings.TrimSpace(config.Deployments) == "" { + if strings.TrimSpace(config.DeploymentsDir) == "" { log.Fatal("deployments path is required") } - if strings.TrimSpace(config.Festoons) == "" { - log.Fatal("festoons path is required") - } else if strings.HasSuffix(config.Festoons, "/") { - config.Festoons = strings.TrimSuffix(config.Festoons, "/") + if strings.TrimSpace(config.FestoonsDir) == "" { + log.Fatal("FestoonsDir path is required") + } else if strings.HasSuffix(config.FestoonsDir, "/") { + config.FestoonsDir = strings.TrimSuffix(config.FestoonsDir, "/") } } diff --git a/cmd/workerbee/main.go b/cmd/workerbee/main.go index 085ec18..1711f2f 100644 --- a/cmd/workerbee/main.go +++ b/cmd/workerbee/main.go @@ -3,7 +3,6 @@ package main import ( "flag" "log" - "net" "os" "time" @@ -24,9 +23,9 @@ func main() { parseConfig(configPath) // Create deployments directory if it doesn't exist. - _, err := os.Stat(config.Deployments) + _, err := os.Stat(config.DeploymentsDir) if os.IsNotExist(err) { - err = os.MkdirAll(config.Deployments, 0700) + err = os.MkdirAll(config.DeploymentsDir, 0700) if err != nil { log.Fatal(err) } @@ -44,23 +43,8 @@ func main() { } log.Println("Connected to Docker successfully") - worker = beehive.NewWorker(config.ID, config.IP, config.Festoons, config.Deployments) + worker = beehive.NewWorker(config) - var conn net.Conn - for { - conn, err = net.Dial("tcp", config.Queen) - if err != nil { - log.Printf("Failed to connect to queen: %s", err) - time.Sleep(retryDelay) - continue - } - break - } - - client := beehive.NewClient(conn) - client.Authenticate(config.ID, config.Password) - go worker.HandleRead(client) - - _ = client + _ = worker select {} } diff --git a/deployment.go b/deployment.go index 1faf12e..4181ca7 100644 --- a/deployment.go +++ b/deployment.go @@ -74,6 +74,15 @@ var DeploymentStatusLabels = map[string]DeploymentStatus{ "update": StatusUpdate, } +func DeploymentStatusLabel(status DeploymentStatus) string { + for str, i := range DeploymentStatusLabels { + if i == status { + return str + } + } + return "" +} + var RecordedDeploymentStatuses = []DeploymentStatus{ StatusCreate, StatusDestroy, @@ -162,10 +171,9 @@ func (d *Deployment) Interpolate(filePath string, customValues map[string]string } replacements := map[string]string{ - "BEEHIVE_VAR_ID": fmt.Sprintf("%s-%d", d.Festoon, d.ID), - "BEEHIVE_VAR_IP": d.Worker.IP, - "BEEHIVE_VAR_QUOTED_NAME": defaultName, - "BEEHIVE_VAR_NAME": defaultName, + "ID": fmt.Sprintf("%s-%d", d.Festoon, d.ID), + "IP": d.Worker.IP, + "NAME": defaultName, } // TODO passwords, remain the same between redeploys (stored in postgres, sent as custom replacement) for customKey, customValue := range customValues { @@ -175,16 +183,16 @@ func (d *Deployment) Interpolate(filePath string, customValues map[string]string // Add quoted variables. newReplacements := make(map[string]string) for original, replacement := range replacements { - newReplacements[original] = replacement + newReplacements[original] = strings.ReplaceAll(replacement, "BEEHIVE_VAR_", "BEEHIVE-VAR-") - quotedLabel := "BEEHIVE_VAR_QUOTED_" + original[10:] + quotedLabel := "QUOTED_" + original quotedValue := fmt.Sprintf(`"%s"`, strings.ReplaceAll(replacements[original], `"`, `\"`)) replacements[quotedLabel] = quotedValue } + // Apply substitutions. for original, replacement := range replacements { - replacement = strings.ReplaceAll(replacement, "BEEHIVE_VAR_", "BEEHIVE-VAR-") - buf = bytes.ReplaceAll(buf, []byte(original), []byte(replacement)) + buf = bytes.ReplaceAll(buf, []byte("BEEHIVE_VAR_"+original), []byte(replacement)) } configPorts := make(map[string]bool) @@ -207,6 +215,8 @@ func (d *Deployment) Interpolate(filePath string, customValues map[string]string return i }) + buf = bytes.ReplaceAll(buf, []byte("BEEHIVE-VAR-"), []byte("BEEHIVE_VAR_")) + return buf, nil } diff --git a/server.go b/server.go index b3ec1fc..3089e87 100644 --- a/server.go +++ b/server.go @@ -8,6 +8,7 @@ import ( "math/rand" "net" "strconv" + "strings" "sync" "time" ) @@ -148,6 +149,24 @@ func (s *Server) handleRead(c *Client) { log.Fatal(err) } log.Printf("health check time: %.0fms", float64(time.Now().UnixNano()-resultTime)/1000000) + for key, value := range result.Parameters { + if strings.HasPrefix(key, "events_") { + var events []DeploymentEvent + err := json.Unmarshal([]byte(value), &events) + if err != nil { + log.Fatal(err) + } + log.Printf("deployment %s events:", key[7:]) + for _, event := range events { + t := time.Unix(event.Time, 0) + label := DeploymentStatusLabel(event.Status) + if label == "" { + label = "(UNKNOWN)" + } + log.Printf(" %s %s", t.Format(time.DateTime), label) + } + } + } case TaskDeploy: switch result.Parameters["status"] { case "ok": diff --git a/worker.go b/worker.go index 58143ea..1bf8edd 100644 --- a/worker.go +++ b/worker.go @@ -6,48 +6,66 @@ import ( "encoding/json" "fmt" "log" + "net" "os" "path" "strconv" "time" ) -type Worker struct { - ID int +const retryDelay = time.Second * 2 +type WorkerConfig struct { + // Path to festoons. + FestoonsDir string + + // Path to deployments. + DeploymentsDir string + + // Address to connect to queen bee. + Queen string + + // Worker IP address. IP string - FestoonsDir string - DeploymentsDir string + // Unique ID. + ID int + + // Password. + Password string +} + +type Worker struct { + WorkerConfig Deployments []*Deployment TaskQueue chan *TaskMessage requestPortsFunc func(d *Deployment) []int + + reconnect chan struct{} } -func NewWorker(id int, ip string, festoonsDir string, deploymentsDir string) *Worker { +func NewWorker(config *WorkerConfig) *Worker { w := &Worker{ - ID: id, - IP: ip, - FestoonsDir: festoonsDir, - DeploymentsDir: deploymentsDir, - TaskQueue: make(chan *TaskMessage), + WorkerConfig: *config, + TaskQueue: make(chan *TaskMessage), + reconnect: make(chan struct{}), } go w.handleTaskQueue() log.Println("Loading deployments...") // TODO list directories in deployments dir, no db needed - dirEntries, err := os.ReadDir(deploymentsDir) + dirEntries, err := os.ReadDir(config.DeploymentsDir) if err != nil { log.Fatal(err) } for _, dirEntry := range dirEntries { if dirEntry.IsDir() { log.Println(dirEntry.Name()) - d, err := LoadDeployment(path.Join(deploymentsDir, dirEntry.Name())) + d, err := LoadDeployment(path.Join(config.DeploymentsDir, dirEntry.Name())) if err != nil { log.Fatalf("failed to load deployment %s: %s", dirEntry.Name(), err) } @@ -57,12 +75,41 @@ func NewWorker(id int, ip string, festoonsDir string, deploymentsDir string) *Wo w.Deployments = append(w.Deployments, d) } } - log.Println("Finished loading deployments") + go w.handleConnect() + return w } +func (w *Worker) handleConnect() { + log.Printf("Connecting to queen bee at %s...", w.Queen) + for { + var conn net.Conn + var err error + for { + conn, err = net.Dial("tcp", w.Queen) + if err != nil { + log.Printf("Failed to connect to queen: %s", err) + log.Println("Retrying in 2 seconds...") + time.Sleep(retryDelay) + continue + } + break + } + + log.Println("Connected") + + client := NewClient(conn) + client.Authenticate(w.ID, w.Password) + go w.HandleRead(client) + + <-w.reconnect + + log.Println("Reconnecting...") + } +} + func (w *Worker) handleTaskQueue() { for t := range w.TaskQueue { w.ExecuteTask(t) @@ -189,4 +236,8 @@ func (w *Worker) HandleRead(c *Client) { log.Fatalf("unknown task type %d", task.Type) } } + + log.Println("Connection lost") + + w.reconnect <- struct{}{} }