Add client and server architecture

This commit is contained in:
Trevor Slocum 2023-04-09 21:03:48 -07:00
parent 3ee33ad8fe
commit 4f91af6302
14 changed files with 781 additions and 148 deletions

51
client.go Normal file
View File

@ -0,0 +1,51 @@
package beehive
import (
"encoding/json"
"log"
"net"
)
type Client struct {
Conn net.Conn
Out chan []byte
Worker int
}
func NewClient(conn net.Conn) *Client {
c := &Client{
Conn: conn,
Out: make(chan []byte),
}
go c.handleWrite()
return c
}
func (c *Client) handleWrite() {
for out := range c.Out {
_, err := c.Conn.Write(out)
if err != nil {
// TODO terminate client
continue
}
}
}
func (c *Client) Authenticate(id int, password string) {
challenge := &ServerAuthentication{
Worker: id,
Password: password,
}
challengeJSON, err := json.Marshal(challenge)
if err != nil {
log.Fatal(err)
}
_, err = c.Conn.Write(append(challengeJSON, byte('\n')))
if err != nil {
log.Fatalf("Write to server failed: %s", err)
}
}

39
cmd/queenbee/config.go Normal file
View File

@ -0,0 +1,39 @@
package main
import (
"log"
"code.rocketnine.space/tslocum/beehive"
)
type Config struct {
// Address to listen on.
Listen string
// Path to festoons.
FestoonsDir string
// map[ID]Password
Workers map[int]string
}
var config = &Config{}
func parseConfig(configPath string) {
err := beehive.Deserialize(config, configPath)
if err != nil {
log.Fatal(err)
}
if config.Listen == "" {
log.Fatal("listen address is required")
}
if config.FestoonsDir == "" {
log.Fatal("festoons path is required")
}
if len(config.Workers) == 0 {
log.Fatal("at least one worker must be defined")
}
}

30
cmd/queenbee/main.go Normal file
View File

@ -0,0 +1,30 @@
package main
import (
"flag"
"log"
"code.rocketnine.space/tslocum/beehive"
)
func main() {
var (
configPath string
)
flag.StringVar(&configPath, "config", "", "path to configuration file")
flag.Parse()
parseConfig(configPath)
s, err := beehive.NewServer(config.Listen, config.Workers)
if err != nil {
log.Fatal(err)
}
// TODO submit a task to a worker
// TODO port range available is 10000-49151
_ = s
select {}
}

63
cmd/workerbee/config.go Normal file
View File

@ -0,0 +1,63 @@
package main
import (
"log"
"strings"
"code.rocketnine.space/tslocum/beehive"
)
type Config struct {
// Path to festoons.
Festoons string
// Path to deployments.
Deployments string
// Address to connect to queen bee.
Queen string
// Worker IP address.
IP string
// Unique ID.
ID int
// Password.
Password string
}
var config = &Config{}
func parseConfig(configPath string) {
if strings.TrimSpace(configPath) == "" {
log.Fatal("path to configuration file is required")
}
err := beehive.Deserialize(config, configPath)
if err != nil {
log.Fatal(err)
}
if config.ID == 0 {
log.Fatal("worker ID is required")
}
if strings.TrimSpace(config.Queen) == "" {
log.Fatal("queen bee address is required")
}
if strings.TrimSpace(config.Password) == "" {
log.Fatal("password is required")
}
if strings.TrimSpace(config.Deployments) == "" {
log.Fatal("deployments path is required")
}
if strings.TrimSpace(config.Festoons) == "" {
log.Fatal("festoons path is required")
} else if strings.HasSuffix(config.Festoons, "/") {
config.Festoons = strings.TrimSuffix(config.Festoons, "/")
}
}

View File

@ -1,51 +0,0 @@
package main
import (
"bytes"
"fmt"
"log"
"os"
"regexp"
"strconv"
"code.rocketnine.space/tslocum/beehive"
)
var replacementPort = regexp.MustCompile(`(HOSTALGIA_PORT_[A-Z])`)
var replacementPassword = regexp.MustCompile(`(HOSTALGIA_PASSWORD_[A-Z])`)
func interpolate(filePath string, customValues map[string]string, deployment *beehive.Deployment) ([]byte, error) {
buf, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
replacements := map[string]string{
"HOSTALGIA_ID": fmt.Sprintf("%d-%s", deployment.ID, deployment.Festoon),
"HOSTALGIA_IP": deployment.Worker.IP,
"HOSTALGIA_NAME": "hostalgia.net dedicated server",
}
for customKey, customValue := range customValues {
replacements[customKey] = customValue
}
for original, replacement := range replacements {
buf = bytes.ReplaceAll(buf, []byte(original), []byte(replacement))
}
buf = replacementPort.ReplaceAllFunc(buf, func(i []byte) []byte {
index := int(i[len(i)-1:][0] - 'A')
return []byte(strconv.Itoa(requestPort(index)))
})
buf = replacementPassword.ReplaceAllFunc(buf, func(i []byte) []byte {
log.Println("password" + string(i))
return i
})
return buf, nil
}
func requestPort(index int) int {
return 2000 + index
}

View File

@ -2,115 +2,45 @@ package main
import (
"flag"
"io/fs"
"log"
"net"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"code.rocketnine.space/tslocum/beehive"
)
var worker *beehive.Worker
func main() {
var (
festoonsDir string
festoon string
outDir string
configPath string
)
flag.StringVar(&festoonsDir, "festoons", "", "path to festoons directory")
flag.StringVar(&festoon, "festoon", "", "name of festoon")
flag.StringVar(&outDir, "out", "", "path to output directory")
flag.StringVar(&configPath, "config", "", "path to configuration file")
flag.Parse()
if strings.TrimSpace(festoon) == "" {
log.Fatal("festoon is required")
}
parseConfig(configPath)
match, err := regexp.MatchString(`^[a-zA-Z0-9]+$`, festoon)
if err != nil {
log.Fatal(err)
} else if !match {
log.Fatalf("invalid festoon: %s", festoon)
}
if strings.HasSuffix(festoonsDir, "/") {
festoonsDir = strings.TrimSuffix(festoonsDir, "/")
}
festoonPath := path.Join(festoonsDir, festoon)
copyDataDir := path.Join(festoonPath, "data")
fileInfo, err := os.Stat(outDir)
if err != nil {
if !os.IsNotExist(err) {
log.Fatal(err)
}
err = os.MkdirAll(outDir, 0700)
if err != nil {
log.Fatal(err)
}
} else if !fileInfo.IsDir() {
log.Fatalf("invalid output directory: %s", outDir)
}
// TODO
deployment := &beehive.Deployment{
ID: 1,
UID: 108,
Festoon: festoon,
Worker: &beehive.Worker{
IP: "127.0.0.1",
},
}
err = interpolateAndCopy(path.Join(festoonPath, "docker-compose.yml"), path.Join(outDir, "docker-compose.yml"), deployment)
if err != nil {
log.Fatal(err)
}
outDataPath := path.Join(outDir, "data")
fileInfo, err = os.Stat(copyDataDir)
if err != nil {
if !os.IsNotExist(err) {
log.Fatal(err)
}
} else if fileInfo.IsDir() {
err = filepath.WalkDir(copyDataDir, func(filePath string, d fs.DirEntry, err error) error {
relativePath := strings.TrimPrefix(filePath, copyDataDir)
if relativePath == "" {
return err
} else if !strings.HasPrefix(relativePath, "/") {
log.Fatalf("unexpected file path: %s", relativePath)
}
relativePath = relativePath[1:]
outPath := path.Join(outDataPath, relativePath)
if d.IsDir() {
err = os.MkdirAll(outPath, 0700)
if err != nil {
log.Fatal(err)
}
return err
}
return interpolateAndCopy(filePath, outPath, deployment)
})
// Create deployments directory if it doesn't exist.
_, err := os.Stat(config.Deployments)
if os.IsNotExist(err) {
err = os.MkdirAll(config.Deployments, 0700)
if err != nil {
log.Fatal(err)
}
}
}
worker = beehive.NewWorker(config.ID, config.IP, config.Festoons, config.Deployments)
func interpolateAndCopy(inFile string, outFile string, d *beehive.Deployment) error {
data, err := interpolate(inFile, nil, d)
conn, err := net.Dial("tcp", config.Queen)
if err != nil {
return err
println("Dial failed:", err.Error())
os.Exit(1)
}
return os.WriteFile(outFile, data, 0600)
client := beehive.NewClient(conn)
client.Authenticate(config.ID, config.Password)
go worker.HandleRead(client)
_ = client
select {}
}

View File

@ -1,17 +1,165 @@
package beehive
import (
"bytes"
"fmt"
"io/fs"
"log"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
)
type Deployment struct {
ID int
// Client this deployment belongs to.
Client int
Festoon string
// User ID for file permissions.
UID int
// Ports in use.
Ports []int
Festoon string
Worker *Worker
}
var replacementPort = regexp.MustCompile(`(HOSTALGIA_PORT_[A-Z])`)
var replacementPassword = regexp.MustCompile(`(HOSTALGIA_PASSWORD_[A-Z])`)
const defaultName = "hostalgia.net dedicated server"
func (d *Deployment) Interpolate(filePath string, customValues map[string]string) ([]byte, error) {
buf, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
replacements := map[string]string{
"HOSTALGIA_ID": fmt.Sprintf("%s-%d", d.Festoon, d.ID),
"HOSTALGIA_IP": d.Worker.IP,
"HOSTALGIA_QUOTED_NAME": defaultName,
"HOSTALGIA_NAME": defaultName,
}
// TODO passwords
for customKey, customValue := range customValues {
replacements[customKey] = customValue
}
replacements["HOSTALGIA_QUOTED_NAME"] = strings.ReplaceAll(replacements["HOSTALGIA_QUOTED_NAME"], `"`, `\"`)
for original, replacement := range replacements {
buf = bytes.ReplaceAll(buf, []byte(original), []byte(replacement))
}
configPorts := make(map[string]bool)
matches := replacementPort.FindAll(buf, -1)
for _, match := range matches {
configPorts[string(match)] = true
}
buf = replacementPort.ReplaceAllFunc(buf, func(i []byte) []byte {
index := int(i[len(i)-1:][0] - 'A')
if index > len(d.Ports) {
log.Fatalf("failed to Interpolate %s: insufficient ports", i)
}
return []byte(strconv.Itoa(d.Ports[index]))
})
buf = replacementPassword.ReplaceAllFunc(buf, func(i []byte) []byte {
// TODO
return i
})
return buf, nil
}
func (d *Deployment) interpolateAndCopy(inFile string, outFile string) error {
data, err := d.Interpolate(inFile, nil)
if err != nil {
return err
}
return os.WriteFile(outFile, data, 0600)
}
func (d *Deployment) deploy() error {
if strings.TrimSpace(d.Festoon) == "" {
return fmt.Errorf("unknown festoon: %s", d.Festoon)
}
match, err := regexp.MatchString(`^[a-zA-Z0-9]+$`, d.Festoon)
if err != nil {
return err
} else if !match {
return fmt.Errorf("unknown festoon: %s", d.Festoon)
}
festoonPath := path.Join(d.Worker.FestoonsDir, d.Festoon)
copyDataDir := path.Join(festoonPath, "data")
outDir := path.Join(d.Worker.DeploymentsDir, d.Label())
fileInfo, err := os.Stat(outDir)
if err != nil {
if !os.IsNotExist(err) {
return err
}
err = os.MkdirAll(outDir, 0700)
if err != nil {
return err
}
} else if !fileInfo.IsDir() {
return fmt.Errorf("invalid output directory: %s", outDir)
}
err = d.interpolateAndCopy(path.Join(festoonPath, "docker-compose.yml"), path.Join(outDir, "docker-compose.yml"))
if err != nil {
return err
}
outDataPath := path.Join(outDir, "data")
fileInfo, err = os.Stat(copyDataDir)
if err != nil {
if !os.IsNotExist(err) {
return err
}
} else if fileInfo.IsDir() {
err = filepath.WalkDir(copyDataDir, func(filePath string, dirEntry fs.DirEntry, err error) error {
relativePath := strings.TrimPrefix(filePath, copyDataDir)
if relativePath == "" {
return err
} else if !strings.HasPrefix(relativePath, "/") {
log.Fatalf("unexpected file path: %s", relativePath)
}
relativePath = relativePath[1:]
outPath := path.Join(outDataPath, relativePath)
if dirEntry.IsDir() {
err = os.MkdirAll(outPath, 0700)
if err != nil {
log.Fatal(err)
}
return err
}
return d.interpolateAndCopy(filePath, outPath)
})
if err != nil {
return err
}
}
return nil
}
func (d *Deployment) Label() string {
return fmt.Sprintf("%s-%d", d.Festoon, d.ID)
}

4
go.mod
View File

@ -1,3 +1,7 @@
module code.rocketnine.space/tslocum/beehive
go 1.19
require sigs.k8s.io/yaml v1.3.0
require gopkg.in/yaml.v2 v2.4.0 // indirect

8
go.sum Normal file
View File

@ -0,0 +1,8 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=

24
result.go Normal file
View File

@ -0,0 +1,24 @@
package beehive
type ResultType int
// Note: Result types must only be appended to preserve values.
const (
ResultHealth ResultType = iota + 1
ResultDeploy
ResultStart
ResultRestart
ResultStop
)
type Result struct {
Type ResultType
Parameters map[string]string
}
func NewResult(t ResultType, parameters map[string]string) *Result {
return &Result{
Type: t,
Parameters: parameters,
}
}

193
server.go Normal file
View File

@ -0,0 +1,193 @@
package beehive
import (
"bufio"
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"strconv"
"time"
)
const (
minPort = 10000
maxPort = 49151
)
const serverWelcomeMessage = "BUZZ"
type ServerAuthentication struct {
Worker int
Password string
}
type Server struct {
Address string
WorkerConfig map[int]string
Clients []*Client
}
func NewServer(address string, workerConfig map[int]string) (*Server, error) {
s := &Server{
Address: address,
WorkerConfig: workerConfig,
}
go s.listen()
return s, nil
}
func (s *Server) listen() {
listener, err := net.Listen("tcp", s.Address)
if err != nil {
log.Fatalf("failed to listen on %s: %s", s.Address, err)
}
var conn net.Conn
for {
conn, err = listener.Accept()
if err != nil {
log.Fatalf("failed to accept connection on %s: %s", s.Address, err)
}
client := NewClient(conn)
s.Clients = append(s.Clients, client)
go s.handleConnection(client)
}
}
func (s *Server) sendTestingTask(c *Client) {
t := NewTask(TaskHealth, map[string]string{
"time": fmt.Sprintf("%d", time.Now().UnixNano()),
})
s.sendTask(c.Worker, t)
ports := []int{10500, 10501, 10502, 10503, 10504, 10505, 10506, 10507, 10508, 10509}
d := &Deployment{
ID: 1,
Client: 7,
Festoon: "openttd",
UID: 7777,
Ports: ports,
}
parameters := map[string]string{
"id": fmt.Sprintf("%d", d.ID),
"client": fmt.Sprintf("%d", d.Client),
"festoon": d.Festoon,
"uid": fmt.Sprintf("%d", d.UID),
}
for i := range ports {
parameters[fmt.Sprintf("port_%d", i)] = fmt.Sprintf("%d", ports[i])
}
t = NewTask(TaskDeploy, parameters)
s.sendTask(c.Worker, t)
}
func (s *Server) handleRead(c *Client) {
var readMessage bool
scanner := bufio.NewScanner(c.Conn)
for scanner.Scan() {
// Authenticate.
if !readMessage {
challenge := &ServerAuthentication{}
err := json.Unmarshal(scanner.Bytes(), challenge)
if err != nil {
// TODO terminate
return
}
if challenge.Worker <= 0 {
// TODO terminate
return
}
password, ok := s.WorkerConfig[challenge.Worker]
if ok && password != "" && password == challenge.Password {
c.Conn.Write([]byte(serverWelcomeMessage + "\n"))
c.Worker = challenge.Worker
log.Printf("Worker %d connected", c.Worker)
// TODO
s.sendTestingTask(c)
readMessage = true
continue
}
// TODO terminate
return
}
result := &Result{}
err := json.Unmarshal(scanner.Bytes(), result)
if err != nil {
log.Fatalf("failed to unmarshal %s: %s", scanner.Bytes(), err)
}
log.Printf(" <- result: %+v", result)
switch result.Type {
case ResultHealth:
resultTime, err := strconv.ParseInt(result.Parameters["time"], 10, 64)
if err != nil {
// TODO disconnect worker
log.Fatal(err)
}
log.Printf("health check time: %.0fms", float64(time.Now().UnixNano()-resultTime)/1000000)
case ResultDeploy:
switch result.Parameters["status"] {
case "ok":
log.Printf("Deployment %s completed", result.Parameters["id"])
default:
log.Fatalf("unknown deployment status: %s", result.Parameters["status"])
}
default:
log.Fatalf("unknown result type %d", result.Type)
}
}
}
func (s *Server) allocatePorts(d *Deployment) {
const allocatePortRange = 10
// TODO check if port range is already allocated, if so return that
startPort := minPort + rand.Intn(rand.Intn(maxPort)-minPort-allocatePortRange)
startPort -= startPort % allocatePortRange
ports := make([]int, allocatePortRange)
for i := range ports {
ports[i] = startPort + i
}
d.Ports = ports
}
func (s *Server) handleConnection(c *Client) {
s.handleRead(c)
// TODO remove from clients list with mutex
log.Printf("Worker %d disconnected", c.Worker)
}
func (s *Server) sendTask(workerID int, t *Task) bool {
for i := range s.Clients {
if s.Clients[i].Worker == workerID {
taskJSON, err := json.Marshal(t)
if err != nil {
log.Fatal(err)
}
if t.Type == TaskDeploy {
log.Printf("Deployment %s initiated", t.Parameters["id"])
}
s.Clients[i].Out <- append(taskJSON, byte('\n'))
return true
}
}
return false
}

24
task.go Normal file
View File

@ -0,0 +1,24 @@
package beehive
type TaskType int
// Note: Task types must only be appended to preserve values.
const (
TaskHealth TaskType = iota + 1
TaskDeploy
TaskStart
TaskRestart
TaskStop
)
type Task struct {
Type TaskType
Parameters map[string]string
}
func NewTask(t TaskType, parameters map[string]string) *Task {
return &Task{
Type: t,
Parameters: parameters,
}
}

56
util.go Normal file
View File

@ -0,0 +1,56 @@
package beehive
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"sigs.k8s.io/yaml"
)
// Serialize stores data in YAML format at the specified path.
func Serialize(object interface{}, p string) error {
if p == "" {
return errors.New("failed to serialize: no path specified")
}
out, err := yaml.Marshal(object)
if err != nil {
return fmt.Errorf("failed to marshal configuration: %s", err)
}
os.MkdirAll(path.Dir(p), 0)
err = ioutil.WriteFile(p, out, 0600)
if err != nil {
return fmt.Errorf("failed to write to %s: %s", p, err)
}
return nil
}
// Deserialize loads data from the specified path. If a file does not exist at
// the specified path, no error is returned.
func Deserialize(object interface{}, path string) error {
if path == "" {
return errors.New("failed to deserialize: no path specified")
}
_, err := os.Stat(path)
if os.IsNotExist(err) {
return nil
}
configData, err := ioutil.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read file: %s", err)
}
err = yaml.Unmarshal(configData, object)
if err != nil {
return fmt.Errorf("failed to parse file: %s", err)
}
return nil
}

122
worker.go
View File

@ -1,14 +1,128 @@
package beehive
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"log"
"strconv"
)
type Worker struct {
ID int
// IP address.
IP string
// Range of ports available for deployments.
StartPort int
EndPort int
FestoonsDir string
DeploymentsDir string
Deployments []*Deployment
TaskQueue chan *Task
requestPortsFunc func(d *Deployment) []int
}
func NewWorker(id int, ip string, festoonsDir string, deploymentsDir string) *Worker {
w := &Worker{
ID: id,
IP: ip,
FestoonsDir: festoonsDir,
DeploymentsDir: deploymentsDir,
TaskQueue: make(chan *Task),
}
go w.handleTaskQueue()
return w
}
func (w *Worker) handleTaskQueue() {
for t := range w.TaskQueue {
w.ExecuteTask(t)
}
}
func (w *Worker) ExecuteTask(t *Task) error {
return nil
}
func (w *Worker) HandleRead(c *Client) {
var readFirst bool
scanner := bufio.NewScanner(c.Conn)
for scanner.Scan() {
if !readFirst {
if !bytes.Equal(scanner.Bytes(), []byte(serverWelcomeMessage)) {
log.Fatalf("unexpected server reply: %s", scanner.Bytes())
}
readFirst = true
continue
}
task := &Task{}
err := json.Unmarshal(scanner.Bytes(), task)
if err != nil {
log.Fatalf("failed to unmarshal %s: %s", scanner.Bytes(), err)
}
log.Printf(" <- task: %+v", task)
switch task.Type {
case TaskHealth:
result := NewResult(ResultHealth, map[string]string{
"time": task.Parameters["time"],
})
resultJson, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal result %+v: %s", result, err)
}
c.Out <- append(resultJson, '\n')
case TaskDeploy:
id, err := strconv.Atoi(task.Parameters["id"])
if err != nil {
log.Fatalf("failed to parse id: %s", err)
}
client, err := strconv.Atoi(task.Parameters["client"])
if err != nil {
log.Fatalf("failed to parse client: %s", err)
}
// TODO
ports := make([]int, 10)
for i := range ports {
ports[i], err = strconv.Atoi(task.Parameters[fmt.Sprintf("port_%d", i)])
if err != nil {
log.Fatalf("failed to parse port %d: %s", i, err)
}
}
d := &Deployment{
ID: id,
Client: client,
Festoon: task.Parameters["festoon"],
UID: 7777,
Ports: ports,
Worker: w,
}
err = d.deploy()
if err != nil {
log.Fatalf("failed to deploy %+v: %s", d, err)
}
// Send result
result := NewResult(ResultDeploy, map[string]string{
"id": task.Parameters["id"],
"status": "ok",
})
resultJson, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal result %+v: %s", result, err)
}
c.Out <- append(resultJson, '\n')
default:
log.Fatalf("unknown task type %d", task.Type)
}
}
}