backup/box.go

237 lines
6.3 KiB
Go

package main
import (
"errors"
"regexp"
"sync"
"github.com/silenceper/pool"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
type Box struct {
name string
addr string
user string
key string
zfs *BoxZfs
sshPool pool.Pool
created bool
online bool
allowDirectConnect bool
mx sync.Mutex
}
type BoxSshPool struct {
signer ssh.Signer
config *ssh.ClientConfig
client *ssh.Client
logged bool
mx sync.Mutex
}
func (c *Config) NewBox(name, addr, user, key string, direct bool) (b *Box, err error) {
log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("starting")
defer log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("done")
re := regexp.MustCompile(boxNamePattern)
if !re.MatchString(name) {
err := errors.New("invalid name")
log.WithFields(log.Fields{"name": b.name, "error": err}).Errorf("")
return nil, err
}
p, err := NewSshPool(name, addr, user, key)
if err != nil {
log.WithFields(log.Fields{"name": b.name, "call": "NewSshPool", "error": err}).Errorf("")
return nil, err
}
b = &Box{
name: name,
addr: addr,
user: user,
key: key,
zfs: &BoxZfs{
online: false,
},
sshPool: p,
online: false,
created: true,
allowDirectConnect: true, //FIXME use direct
}
b.zfs.box = b
return b, nil
}
func (b *Box) Open() error {
log.WithFields(log.Fields{"name": b.name}).Debugf("starting")
defer log.WithFields(log.Fields{"name": b.name}).Debugf("done")
b.mx.Lock()
defer b.mx.Unlock()
if b.online {
return nil
}
hostname, err := b.Exec("hostname")
if err != nil {
log.WithFields(log.Fields{"name": b.name, "call": "Exec", "attr": "hostname", "error": err}).Errorf("")
return err
}
log.WithFields(log.Fields{"name": b.name}).Debugf("hostname : %s", hostname)
b.online = true
if err := b.zfs.Open(); err != nil {
log.WithFields(log.Fields{"name": b.name, "call": "zfs.Open", "error": err}).Errorf("")
return err
}
return nil
}
func (b *Box) Close() error {
log.WithFields(log.Fields{"name": b.name}).Debugf("starting")
defer log.WithFields(log.Fields{"name": b.name}).Debugf("done")
b.mx.Lock()
defer b.mx.Unlock()
if !b.online {
return nil
}
if err := b.zfs.Close(); err != nil {
log.WithFields(log.Fields{"name": b.name, "call": "zfs.Close", "error": err}).Errorf("")
return err
}
b.online = false
return nil
}
func (b *Box) Exec(cmd string) (r string, err error) {
log.WithFields(log.Fields{"name": b.name, "cmd": cmd}).Debugf("starting")
defer log.WithFields(log.Fields{"name": b.name, "cmd": cmd}).Debugf("done")
if !b.created {
err := errors.New("box not initialized")
log.WithFields(log.Fields{"name": b.name, "error": err}).Errorf("")
return "", err
}
v, err := b.sshPool.Get()
if err != nil {
log.WithFields(log.Fields{"name": b.name, "error": err, "call": "SshPool.Get"}).Errorf("")
return "", err
}
defer b.sshPool.Put(v)
s := v.(*Ssh)
return s.Exec(cmd)
}
func TransferZfs(from Addr, to []Addr) (int, error) {
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("starting")
defer log.WithFields(log.Fields{"from": from, "to": to}).Debugf("done")
count := 0
dests := make([]Addr, 0)
for _, dest := range to {
if cfg.box[from.Box()].allowDirectConnect && cfg.box[dest.Box()].allowDirectConnect {
if err := TransferDirectZfs(from, dest); err != nil {
log.WithFields(log.Fields{"from": from, "to": to, "call": "TransferDirectZfs", "attr": dest, "error": err}).Errorf("")
return count, err
} else {
count++
}
} else {
dests = append(dests, dest)
}
}
return count, nil
}
func TransferDirectZfs(from, to Addr) error {
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("starting")
defer log.WithFields(log.Fields{"from": from, "to": to}).Debugf("done")
var (
err error
fromSnapshots, toSnapshots []*ZfsSnapshot
)
if fromSnapshots, err = from.ValidSnapshots(); err != nil {
log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": from, "error": err}).Errorf("")
return err
}
if len(fromSnapshots) == 0 {
return nil
}
if toSnapshots, err = to.ValidSnapshots(); err != nil {
log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": to, "error": err}).Errorf("")
return err
}
if len(toSnapshots) == 0 {
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("initiating destination")
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send " + fromSnapshots[0].String() + " | zfs recv -F " + to.Path()); err != nil {
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
return err
}
newToSnapshot := &ZfsSnapshot{name: fromSnapshots[0].name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]}
toSnapshots = append(toSnapshots, newToSnapshot)
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(newToSnapshot)
}
fromFromSnapshotId := len(fromSnapshots) - 1
fromToSnapshotId := -1
for fromFromSnapshotId >= 0 {
fromToSnapshotId = len(toSnapshots) - 1
for fromToSnapshotId >= 0 {
if fromSnapshots[fromFromSnapshotId].name == toSnapshots[fromToSnapshotId].name {
break
}
fromToSnapshotId = fromToSnapshotId - 1
}
if fromToSnapshotId >= 0 {
break
}
fromFromSnapshotId = fromFromSnapshotId - 1
}
if fromFromSnapshotId == -1 {
err := errors.New("zfs snapshot unsync")
log.WithFields(log.Fields{"from": from, "to": to, "error": err}).Errorf("")
return err
}
if fromFromSnapshotId < len(fromSnapshots)-1 {
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("transfering from %s to %s", fromSnapshots[fromFromSnapshotId].name, fromSnapshots[len(fromSnapshots)-1].name)
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send -I " + fromSnapshots[fromFromSnapshotId].String() + " " + fromSnapshots[len(fromSnapshots)-1].String() + " | zfs recv -F " + to.Path()); err != nil {
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
return err
}
for _, v := range fromSnapshots[fromFromSnapshotId+1:] {
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(&ZfsSnapshot{name: v.name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]})
}
}
return nil
}