237 lines
6.3 KiB
Go
237 lines
6.3 KiB
Go
package main
|
|
|
|
import (
|
|
"errors"
|
|
"regexp"
|
|
"sync"
|
|
|
|
"github.com/silenceper/pool"
|
|
log "github.com/sirupsen/logrus"
|
|
"golang.org/x/crypto/ssh"
|
|
)
|
|
|
|
type Box struct {
|
|
name string
|
|
addr string
|
|
user string
|
|
key string
|
|
zfs *BoxZfs
|
|
sshPool pool.Pool
|
|
created bool
|
|
online bool
|
|
allowDirectConnect bool
|
|
mx sync.Mutex
|
|
}
|
|
|
|
type BoxSshPool struct {
|
|
signer ssh.Signer
|
|
config *ssh.ClientConfig
|
|
client *ssh.Client
|
|
logged bool
|
|
mx sync.Mutex
|
|
}
|
|
|
|
func (c *Config) NewBox(name, addr, user, key string, direct bool) (b *Box, err error) {
|
|
log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("starting")
|
|
defer log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("done")
|
|
|
|
re := regexp.MustCompile(boxNamePattern)
|
|
if !re.MatchString(name) {
|
|
err := errors.New("invalid name")
|
|
log.WithFields(log.Fields{"name": b.name, "error": err}).Errorf("")
|
|
return nil, err
|
|
}
|
|
|
|
p, err := NewSshPool(name, addr, user, key)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{"name": b.name, "call": "NewSshPool", "error": err}).Errorf("")
|
|
return nil, err
|
|
}
|
|
|
|
b = &Box{
|
|
name: name,
|
|
addr: addr,
|
|
user: user,
|
|
key: key,
|
|
zfs: &BoxZfs{
|
|
online: false,
|
|
},
|
|
sshPool: p,
|
|
online: false,
|
|
created: true,
|
|
allowDirectConnect: true, //FIXME use direct
|
|
}
|
|
|
|
b.zfs.box = b
|
|
|
|
return b, nil
|
|
}
|
|
|
|
func (b *Box) Open() error {
|
|
log.WithFields(log.Fields{"name": b.name}).Debugf("starting")
|
|
defer log.WithFields(log.Fields{"name": b.name}).Debugf("done")
|
|
|
|
b.mx.Lock()
|
|
defer b.mx.Unlock()
|
|
|
|
if b.online {
|
|
return nil
|
|
}
|
|
|
|
hostname, err := b.Exec("hostname")
|
|
if err != nil {
|
|
log.WithFields(log.Fields{"name": b.name, "call": "Exec", "attr": "hostname", "error": err}).Errorf("")
|
|
return err
|
|
}
|
|
|
|
log.WithFields(log.Fields{"name": b.name}).Debugf("hostname : %s", hostname)
|
|
|
|
b.online = true
|
|
|
|
if err := b.zfs.Open(); err != nil {
|
|
log.WithFields(log.Fields{"name": b.name, "call": "zfs.Open", "error": err}).Errorf("")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *Box) Close() error {
|
|
log.WithFields(log.Fields{"name": b.name}).Debugf("starting")
|
|
defer log.WithFields(log.Fields{"name": b.name}).Debugf("done")
|
|
|
|
b.mx.Lock()
|
|
defer b.mx.Unlock()
|
|
|
|
if !b.online {
|
|
return nil
|
|
}
|
|
|
|
if err := b.zfs.Close(); err != nil {
|
|
log.WithFields(log.Fields{"name": b.name, "call": "zfs.Close", "error": err}).Errorf("")
|
|
return err
|
|
}
|
|
|
|
b.online = false
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *Box) Exec(cmd string) (r string, err error) {
|
|
log.WithFields(log.Fields{"name": b.name, "cmd": cmd}).Debugf("starting")
|
|
defer log.WithFields(log.Fields{"name": b.name, "cmd": cmd}).Debugf("done")
|
|
|
|
if !b.created {
|
|
err := errors.New("box not initialized")
|
|
log.WithFields(log.Fields{"name": b.name, "error": err}).Errorf("")
|
|
return "", err
|
|
}
|
|
|
|
v, err := b.sshPool.Get()
|
|
if err != nil {
|
|
log.WithFields(log.Fields{"name": b.name, "error": err, "call": "SshPool.Get"}).Errorf("")
|
|
return "", err
|
|
}
|
|
|
|
defer b.sshPool.Put(v)
|
|
s := v.(*Ssh)
|
|
|
|
return s.Exec(cmd)
|
|
}
|
|
|
|
func TransferZfs(from Addr, to []Addr) (int, error) {
|
|
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("starting")
|
|
defer log.WithFields(log.Fields{"from": from, "to": to}).Debugf("done")
|
|
|
|
count := 0
|
|
|
|
dests := make([]Addr, 0)
|
|
for _, dest := range to {
|
|
if cfg.box[from.Box()].allowDirectConnect && cfg.box[dest.Box()].allowDirectConnect {
|
|
if err := TransferDirectZfs(from, dest); err != nil {
|
|
log.WithFields(log.Fields{"from": from, "to": to, "call": "TransferDirectZfs", "attr": dest, "error": err}).Errorf("")
|
|
return count, err
|
|
} else {
|
|
count++
|
|
}
|
|
} else {
|
|
dests = append(dests, dest)
|
|
}
|
|
}
|
|
|
|
return count, nil
|
|
|
|
}
|
|
|
|
func TransferDirectZfs(from, to Addr) error {
|
|
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("starting")
|
|
defer log.WithFields(log.Fields{"from": from, "to": to}).Debugf("done")
|
|
|
|
var (
|
|
err error
|
|
fromSnapshots, toSnapshots []*ZfsSnapshot
|
|
)
|
|
|
|
if fromSnapshots, err = from.ValidSnapshots(); err != nil {
|
|
log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": from, "error": err}).Errorf("")
|
|
return err
|
|
}
|
|
|
|
if len(fromSnapshots) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if toSnapshots, err = to.ValidSnapshots(); err != nil {
|
|
log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": to, "error": err}).Errorf("")
|
|
return err
|
|
}
|
|
|
|
if len(toSnapshots) == 0 {
|
|
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("initiating destination")
|
|
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send " + fromSnapshots[0].String() + " | zfs recv -F " + to.Path()); err != nil {
|
|
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
|
|
return err
|
|
}
|
|
newToSnapshot := &ZfsSnapshot{name: fromSnapshots[0].name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]}
|
|
toSnapshots = append(toSnapshots, newToSnapshot)
|
|
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(newToSnapshot)
|
|
|
|
}
|
|
|
|
fromFromSnapshotId := len(fromSnapshots) - 1
|
|
fromToSnapshotId := -1
|
|
for fromFromSnapshotId >= 0 {
|
|
fromToSnapshotId = len(toSnapshots) - 1
|
|
for fromToSnapshotId >= 0 {
|
|
if fromSnapshots[fromFromSnapshotId].name == toSnapshots[fromToSnapshotId].name {
|
|
break
|
|
}
|
|
fromToSnapshotId = fromToSnapshotId - 1
|
|
}
|
|
if fromToSnapshotId >= 0 {
|
|
break
|
|
}
|
|
fromFromSnapshotId = fromFromSnapshotId - 1
|
|
}
|
|
|
|
if fromFromSnapshotId == -1 {
|
|
err := errors.New("zfs snapshot unsync")
|
|
log.WithFields(log.Fields{"from": from, "to": to, "error": err}).Errorf("")
|
|
return err
|
|
}
|
|
|
|
if fromFromSnapshotId < len(fromSnapshots)-1 {
|
|
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("transfering from %s to %s", fromSnapshots[fromFromSnapshotId].name, fromSnapshots[len(fromSnapshots)-1].name)
|
|
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send -I " + fromSnapshots[fromFromSnapshotId].String() + " " + fromSnapshots[len(fromSnapshots)-1].String() + " | zfs recv -F " + to.Path()); err != nil {
|
|
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
|
|
return err
|
|
}
|
|
|
|
for _, v := range fromSnapshots[fromFromSnapshotId+1:] {
|
|
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(&ZfsSnapshot{name: v.name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|