This commit is contained in:
shoopea
2023-06-29 22:58:24 +02:00
parent fb02d525d2
commit 35e234533c
18 changed files with 1646 additions and 1441 deletions

528
box.go
View File

@@ -1,423 +1,205 @@
package main
import (
"bytes"
"encoding/csv"
"fmt"
"log"
"strings"
"errors"
"regexp"
"sync"
"github.com/silenceper/pool"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
type Box struct {
Addr string `json:"addr"`
User string `json:"user"`
Key string `json:"key"`
Name string `json:"-"`
ssh *SSHConfig
zfs *ZFSConfig
online bool
name string
addr string
user string
key string
zfs *BoxZfs
sshPool pool.Pool
created bool
online bool
mx sync.Mutex
}
func (b *Box) ZFSTakeSnapshot(schedule, path string) (err error) {
if *debugFlag {
log.Printf("Box.ZFSTakeSnapshot : %s : Taking snapshot on %s for %s", b.Name, path, schedule)
}
if !b.online {
err = fmt.Errorf("box offline")
return
}
err = b.SnapshotInitialize()
if err != nil {
return
}
b.zfs.M.Lock()
defer b.zfs.M.Unlock()
timestamp := cfg.Now.Format("2006-01-02_15.04.05")
name := fmt.Sprintf("%s-%s--%s", schedule, timestamp, cfg.Zfsnap[schedule])
_, err = b.ssh.exec("zfs snapshot " + path + "@" + name)
if err != nil {
return
}
b.zfs.SnapshotAdded = true
b.zfs.SnapshotList = append(b.zfs.SnapshotList, Snapshot(path+"@"+name))
return
type BoxSshPool struct {
signer ssh.Signer
config *ssh.ClientConfig
client *ssh.Client
logged bool
mx sync.Mutex
}
func (b *Box) ZFSGetLastSnapshot(path string) (last Snapshot, err error) {
if *debugFlag {
log.Printf("Box.ZFSGetLastSnapshot : %s : Start %s (%d snapshots)", b.Name, path, len(b.zfs.SnapshotList))
func (c *Config) NewBox(name, addr, user, key string) (b *Box, err error) {
log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("starting")
defer log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("done")
re := regexp.MustCompile(boxNamePattern)
if !re.MatchString(name) {
err := errors.New("invalid name")
log.WithFields(log.Fields{"name": b.name, "error": err}).Errorf("")
return nil, err
}
if !b.online {
err = fmt.Errorf("box offline")
return
}
err = b.SnapshotInitialize()
p, err := NewSshPool(name, addr, user, key)
if err != nil {
return last, err
log.WithFields(log.Fields{"name": b.name, "call": "NewSshPool", "error": err}).Errorf("")
return nil, err
}
b.zfs.M.Lock()
defer b.zfs.M.Unlock()
b = &Box{
name: name,
addr: addr,
user: user,
key: key,
zfs: &BoxZfs{
online: false,
},
sshPool: p,
online: false,
created: true,
}
for _, v := range b.zfs.SnapshotList {
if v.Path() == path {
last = v
}
}
if len(string(last)) == 0 {
err = fmt.Errorf("no snapshot")
}
return
b.zfs.box = b
return b, nil
}
func (b *Box) ZFSIsLastSnapshot(src Snapshot) (is bool, err error) {
if *debugFlag {
log.Printf("Box.ZFSIsLastSnapshot : %s : Start %s", b.Name, string(src))
}
func (b *Box) Open() error {
log.WithFields(log.Fields{"name": b.name}).Debugf("starting")
defer log.WithFields(log.Fields{"name": b.name}).Debugf("done")
if !b.online {
err = fmt.Errorf("box offline")
return
}
b.mx.Lock()
defer b.mx.Unlock()
err = b.SnapshotInitialize()
if err != nil {
return
}
_, err = b.ZFSGetNextSnapshot(src)
if err != nil {
if err.Error() == "no snapshot" {
is = true
err = nil
}
} else {
is = false
}
return
}
func (b *Box) ZFSGetFirstSnapshot(path string) (first Snapshot, err error) {
if *debugFlag {
log.Printf("Box.ZFSGetFirstSnapshot : %s : Start %s", b.Name, path)
}
if !b.online {
err = fmt.Errorf("box offline")
return
}
err = b.SnapshotInitialize()
if err != nil {
return
}
b.zfs.M.Lock()
defer b.zfs.M.Unlock()
for _, v := range b.zfs.SnapshotList {
if v.Path() == path {
first = v
return
}
}
err = fmt.Errorf("no snapshot")
return
}
func (b *Box) ZFSGetNextSnapshot(src Snapshot) (next Snapshot, err error) {
if *debugFlag {
log.Printf("Box.ZFSGetNextSnapshot : %s : Start %s", b.Name, string(src))
}
if !b.online {
err = fmt.Errorf("box offline")
return
}
err = b.SnapshotInitialize()
if err != nil {
return
}
b.zfs.M.Lock()
defer b.zfs.M.Unlock()
for id, v := range b.zfs.SnapshotList {
if v == src {
if len(b.zfs.SnapshotList) > id+1 {
next = b.zfs.SnapshotList[id+1]
if next.Path() == src.Path() {
return
} else {
err = fmt.Errorf("no snapshot")
return
}
} else {
err = fmt.Errorf("no snapshot")
return
}
}
}
err = fmt.Errorf("no snapshot")
return
}
func (b *Box) ZFSUpdateSnapshotList() (err error) {
if *debugFlag {
log.Printf("Box.ZFSUpdateSnapshotList : %s : Start", b.Name)
}
if !b.online {
err = fmt.Errorf("box offline")
return
}
b.zfs.M.Lock()
if b.zfs.SnapshotDeleted || b.zfs.SnapshotAdded {
b.zfs.SnapshotInitialized = false
}
b.zfs.M.Unlock()
err = b.SnapshotInitialize()
return
}
func (b *Box) ZFSGetSnapshotList() (snaps []Snapshot, err error) {
if *debugFlag {
log.Printf("Box.ZFSGetSnapshotList : %s : Start", b.Name)
}
if !b.online {
err = fmt.Errorf("box offline")
return
}
err = b.SnapshotInitialize()
if err != nil {
return
}
b.zfs.M.Lock()
defer b.zfs.M.Unlock()
snaps = b.zfs.SnapshotList
return
}
func (b *Box) SnapshotInitialize() (err error) {
if *debugFlag {
log.Printf("Box.SnapshotInitialize : %s : Start", b.Name)
}
if !b.online {
err = fmt.Errorf("box offline")
return
}
b.zfs.M.Lock()
defer b.zfs.M.Unlock()
if b.zfs.SnapshotInitialized {
if b.online {
return nil
}
if *debugFlag {
log.Printf("Box.SnapshotInitialize : %s : Start", b.Name)
}
b.zfs.SnapshotList = make([]Snapshot, 0)
var buf *bytes.Buffer
buf, err = b.SSHExec("zfs list -H -t snapshot -o name")
csvReader := csv.NewReader(buf)
csvReader.Comma = '\t'
csvReader.FieldsPerRecord = 1
csvData, err := csvReader.ReadAll()
hostname, err := b.Exec("hostname")
if err != nil {
if *debugFlag {
log.Printf("Box.SnapshotInitialize : %s : csvReader.ReadAll() : %s", b.Name, err)
}
log.WithFields(log.Fields{"name": b.name, "call": "Exec", "attr": "hostname", "error": err}).Errorf("")
return err
}
for _, rec := range csvData {
b.zfs.SnapshotList = append(b.zfs.SnapshotList, Snapshot(rec[0]))
}
log.WithFields(log.Fields{"name": b.name}).Debugf("hostname : %s", hostname)
if *debugFlag {
log.Printf("Box.SnapshotInitialize : %s : read %d zfs snapshots", b.Name, len(b.zfs.SnapshotList))
}
b.online = true
b.zfs.SnapshotInitialized = true
b.zfs.SnapshotAdded = false
b.zfs.SnapshotDeleted = false
if err := b.zfs.Open(); err != nil {
log.WithFields(log.Fields{"name": b.name, "call": "zfs.Open", "error": err}).Errorf("")
return err
}
return nil
}
func (b *Box) ZFSUpdateList() (err error) {
if *debugFlag {
log.Printf("Box.ZFSUpdateList : %s : Start", b.Name)
}
func (b *Box) Close() error {
log.WithFields(log.Fields{"name": b.name}).Debugf("starting")
defer log.WithFields(log.Fields{"name": b.name}).Debugf("done")
b.mx.Lock()
defer b.mx.Unlock()
if !b.online {
err = fmt.Errorf("box offline")
return
}
b.zfs.M.Lock()
if b.zfs.ZFSDeleted || b.zfs.ZFSAdded {
b.zfs.ZFSInitialized = false
}
b.zfs.M.Unlock()
err = b.ZFSInitialize()
return
}
func (b *Box) ZFSIsZFS(path string) bool {
if *debugFlag {
log.Printf("Box.ZFSIsZFS : %s : Start %s", b.Name, path)
}
if !b.online {
return false
}
err := b.ZFSInitialize()
if err != nil {
return false
}
b.zfs.M.Lock()
defer b.zfs.M.Unlock()
if _, ok := b.zfs.ZFSMap[path]; ok {
return true
}
return false
}
func (b *Box) ZFSCreateZFS(path string) (err error) {
if *debugFlag {
log.Printf("Box.ZFSCreateZFS : %s : Start %s", b.Name, path)
}
if !b.online {
err = fmt.Errorf("box offline")
return
}
err = b.ZFSInitialize()
if err != nil {
return
}
b.zfs.M.Lock()
defer b.zfs.M.Unlock()
p := strings.Split(path, `/`)
var base string
for _, d := range p {
if base == "" {
base = d
} else {
base = base + `/` + d
}
if _, ok := b.zfs.ZFSMap[base]; !ok {
if *debugFlag {
log.Printf("Box.ZFSCreateZFS : Creating %s:%s", b.Name, base)
}
_, err = b.SSHExec("zfs create -o mountpoint=none " + base)
if err != nil {
if *debugFlag {
log.Printf("Box.ZFSCreateZFS : %s : SSHExec : %s", b.Name, err)
}
return
}
b.zfs.ZFSMap[base] = "none"
b.zfs.ZFSAdded = true
}
}
return
}
func (b *Box) ZFSInitialize() (err error) {
b.zfs.M.Lock()
defer b.zfs.M.Unlock()
if *debugFlag {
log.Printf("Box.ZFSInitialize : %s : Start", b.Name)
}
if !b.online {
err = fmt.Errorf("box offline")
return
}
if b.zfs.ZFSInitialized {
return nil
}
if *debugFlag {
log.Printf("Box.ZFSInitialize : %s : Start", b.Name)
}
b.zfs.ZFSMap = make(map[string]string)
var buf *bytes.Buffer
buf, err = b.SSHExec("zfs list -H -o name,mountpoint")
csvReader := csv.NewReader(buf)
csvReader.Comma = '\t'
csvReader.FieldsPerRecord = 2
csvData, err := csvReader.ReadAll()
if err != nil {
if *debugFlag {
log.Printf("Box.ZFSInitialize : %s : csvReader.ReadAll() : %s", b.Name, err)
}
if err := b.zfs.Close(); err != nil {
log.WithFields(log.Fields{"name": b.name, "call": "zfs.Close", "error": err}).Errorf("")
return err
}
for _, rec := range csvData {
b.zfs.ZFSMap[rec[0]] = rec[1]
}
b.zfs.ZFSInitialized = true
b.zfs.ZFSAdded = false
b.zfs.ZFSDeleted = false
b.online = false
return nil
}
func (b *Box) SSHExec(cmd string) (buf *bytes.Buffer, err error) {
if !b.online {
err = fmt.Errorf("box offline")
return
func (b *Box) Exec(cmd string) (r string, err error) {
log.WithFields(log.Fields{"name": b.name, "cmd": cmd}).Debugf("starting")
defer log.WithFields(log.Fields{"name": b.name, "cmd": cmd}).Debugf("done")
if !b.created {
err := errors.New("box not initialized")
log.WithFields(log.Fields{"name": b.name, "error": err}).Errorf("")
return "", err
}
buf, err = b.ssh.exec(cmd)
return
v, err := b.sshPool.Get()
if err != nil {
log.WithFields(log.Fields{"name": b.name, "error": err, "call": "SshPool.Get"}).Errorf("")
return "", err
}
defer b.sshPool.Put(v)
s := v.(*Ssh)
return s.Exec(cmd)
}
func (b *Box) Host() string {
s := strings.Split(string(b.Addr), `:`)
return s[0]
func TransferZfs(from, to Addr) error {
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("starting")
defer log.WithFields(log.Fields{"from": from, "to": to}).Debugf("done")
var (
err error
fromSnapshots, toSnapshots []*ZfsSnapshot
)
if fromSnapshots, err = from.ValidSnapshots(); err != nil {
log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": from, "error": err}).Errorf("")
return err
}
if len(fromSnapshots) == 0 {
return nil
}
if toSnapshots, err = to.ValidSnapshots(); err != nil {
log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": to, "error": err}).Errorf("")
return err
}
if len(toSnapshots) == 0 {
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("initiating destination")
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send " + fromSnapshots[0].String() + " | zfs recv -F " + to.Path()); err != nil {
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
return err
}
newToSnapshot := &ZfsSnapshot{name: fromSnapshots[0].name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]}
toSnapshots = append(toSnapshots, newToSnapshot)
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(newToSnapshot)
}
fromFromSnapshotId := -1
lastToSnapshot := toSnapshots[len(toSnapshots)-1]
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("searching last snapshot %s", lastToSnapshot.String())
for id, v := range fromSnapshots {
if v.name == lastToSnapshot.name {
fromFromSnapshotId = id
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("found %s", v.String())
break
}
}
if fromFromSnapshotId == -1 {
err := errors.New("zfs snapshot unsync")
log.WithFields(log.Fields{"from": from, "to": to, "error": err}).Errorf("")
return err
}
if fromSnapshots[fromFromSnapshotId].name != lastToSnapshot.name {
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("transfering from %s to %s", fromSnapshots[fromFromSnapshotId].name, fromSnapshots[len(fromSnapshots)-1].name)
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send -I " + fromSnapshots[fromFromSnapshotId].String() + " " + fromSnapshots[len(fromSnapshots)-1].String() + " | zfs recv -F " + to.Path()); err != nil {
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
return err
}
for _, v := range fromSnapshots[fromFromSnapshotId:] {
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(&ZfsSnapshot{name: v.name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]})
}
}
return nil
}