You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
123 lines
2.6 KiB
Go
123 lines
2.6 KiB
Go
package beehive
|
|
|
|
import (
|
|
"database/sql"
|
|
"log"
|
|
)
|
|
|
|
const initSchema = `
|
|
CREATE TABLE worker (
|
|
id serial PRIMARY KEY
|
|
);
|
|
|
|
CREATE TABLE deployment (
|
|
id serial PRIMARY KEY,
|
|
created timestamptz,
|
|
modified timestamptz,
|
|
worker_id integer REFERENCES worker,
|
|
festoon text
|
|
);
|
|
|
|
CREATE TABLE task (
|
|
id serial PRIMARY KEY,
|
|
type integer,
|
|
created timestamptz,
|
|
started timestamptz,
|
|
completed timestamptz,
|
|
deployment_id integer REFERENCES deployment
|
|
);
|
|
|
|
CREATE INDEX ON task (started);
|
|
CREATE INDEX ON task (completed);
|
|
CREATE INDEX ON task (deployment_id);
|
|
|
|
CREATE FUNCTION task_notify() RETURNS trigger AS $$
|
|
BEGIN
|
|
NOTIFY task;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
CREATE TRIGGER task_notify_trigger
|
|
AFTER INSERT ON task
|
|
EXECUTE FUNCTION task_notify();
|
|
`
|
|
|
|
var DB *Database
|
|
|
|
type Database struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
func NewDatabase(driverName string, dataSource string) (*Database, error) {
|
|
db, err := sql.Open(driverName, dataSource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Test connection.
|
|
_, err = db.Exec("SET search_path TO queenbee")
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
// Initialize database.
|
|
var result int
|
|
err = db.QueryRow("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'queenbee' AND table_name = 'deployment'").Scan(&result)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
} else if result == 0 {
|
|
_, err = db.Exec(initSchema)
|
|
if err != nil {
|
|
log.Fatalf("failed to initialize database: %s", err)
|
|
}
|
|
log.Println("Initialized database schema")
|
|
}
|
|
|
|
d := &Database{
|
|
db: db,
|
|
}
|
|
return d, nil
|
|
}
|
|
|
|
func (d *Database) StartTransaction() error {
|
|
_, err := d.db.Exec("BEGIN")
|
|
return err
|
|
}
|
|
|
|
func (d *Database) CancelTransaction() error {
|
|
_, err := d.db.Exec("ROLLBACK")
|
|
return err
|
|
}
|
|
|
|
func (d *Database) CommitTransaction() error {
|
|
_, err := d.db.Exec("COMMIT")
|
|
return err
|
|
}
|
|
|
|
func (d *Database) PendingTasks() ([]*Task, error) {
|
|
var tasks []*Task
|
|
rows, err := d.db.Query("SELECT id, created, started, completed, deployment_id from task where started = 0 ORDER BY id ASC")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for rows.Next() {
|
|
task := &Task{}
|
|
err = rows.Scan(task.ID, task.Created, task.Started, task.Completed, task.DeploymentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tasks = append(tasks, task)
|
|
}
|
|
return tasks, nil
|
|
}
|
|
|
|
func (d *Database) AddTask(t *Task) error {
|
|
_, err := d.db.Exec("INSERT INTO task (created, started, completed, deployment_id) VALUES (null, ?, ?, ?, ?)", t.Created, t.Started, t.Completed, t.DeploymentID)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) UpdateTask(t *Task) error {
|
|
_, err := d.db.Exec("UPDATE task SET started=?, completed=?, deployment_id=? WHERE id=?", t.Started, t.Completed, t.DeploymentID, t.ID)
|
|
return err
|
|
}
|