package main import ( "errors" "regexp" "sync" "github.com/silenceper/pool" log "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" ) type Box struct { name string addr string user string key string zfs *BoxZfs sshPool pool.Pool created bool online bool allowDirectConnect bool mx sync.Mutex } type BoxSshPool struct { signer ssh.Signer config *ssh.ClientConfig client *ssh.Client logged bool mx sync.Mutex } func (c *Config) NewBox(name, addr, user, key string, direct bool) (b *Box, err error) { log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("starting") defer log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("done") re := regexp.MustCompile(boxNamePattern) if !re.MatchString(name) { err := errors.New("invalid name") log.WithFields(log.Fields{"name": b.name, "error": err}).Errorf("") return nil, err } p, err := NewSshPool(name, addr, user, key) if err != nil { log.WithFields(log.Fields{"name": b.name, "call": "NewSshPool", "error": err}).Errorf("") return nil, err } b = &Box{ name: name, addr: addr, user: user, key: key, zfs: &BoxZfs{ online: false, }, sshPool: p, online: false, created: true, allowDirectConnect: true, //FIXME use direct } b.zfs.box = b return b, nil } func (b *Box) Open() error { log.WithFields(log.Fields{"name": b.name}).Debugf("starting") defer log.WithFields(log.Fields{"name": b.name}).Debugf("done") b.mx.Lock() defer b.mx.Unlock() if b.online { return nil } hostname, err := b.Exec("hostname") if err != nil { log.WithFields(log.Fields{"name": b.name, "call": "Exec", "attr": "hostname", "error": err}).Errorf("") return err } log.WithFields(log.Fields{"name": b.name}).Debugf("hostname : %s", hostname) b.online = true if err := b.zfs.Open(); err != nil { log.WithFields(log.Fields{"name": b.name, "call": "zfs.Open", "error": err}).Errorf("") return err } return nil } func (b *Box) Close() error { log.WithFields(log.Fields{"name": b.name}).Debugf("starting") defer log.WithFields(log.Fields{"name": b.name}).Debugf("done") b.mx.Lock() defer b.mx.Unlock() if !b.online { return nil } if err := b.zfs.Close(); err != nil { log.WithFields(log.Fields{"name": b.name, "call": "zfs.Close", "error": err}).Errorf("") return err } b.online = false return nil } func (b *Box) Exec(cmd string) (r string, err error) { log.WithFields(log.Fields{"name": b.name, "cmd": cmd}).Debugf("starting") defer log.WithFields(log.Fields{"name": b.name, "cmd": cmd}).Debugf("done") if !b.created { err := errors.New("box not initialized") log.WithFields(log.Fields{"name": b.name, "error": err}).Errorf("") return "", err } v, err := b.sshPool.Get() if err != nil { log.WithFields(log.Fields{"name": b.name, "error": err, "call": "SshPool.Get"}).Errorf("") return "", err } defer b.sshPool.Put(v) s := v.(*Ssh) return s.Exec(cmd) } func TransferZfs(from Addr, to []Addr) (int, error) { log.WithFields(log.Fields{"from": from, "to": to}).Debugf("starting") defer log.WithFields(log.Fields{"from": from, "to": to}).Debugf("done") count := 0 dests := make([]Addr, 0) for _, dest := range to { if cfg.box[from.Box()].allowDirectConnect && cfg.box[dest.Box()].allowDirectConnect { if err := TransferDirectZfs(from, dest); err != nil { log.WithFields(log.Fields{"from": from, "to": to, "call": "TransferDirectZfs", "attr": dest, "error": err}).Errorf("") return count, err } else { count++ } } else { dests = append(dests, dest) } } return count, nil } func TransferDirectZfs(from, to Addr) error { log.WithFields(log.Fields{"from": from, "to": to}).Debugf("starting") defer log.WithFields(log.Fields{"from": from, "to": to}).Debugf("done") var ( err error fromSnapshots, toSnapshots []*ZfsSnapshot ) if fromSnapshots, err = from.ValidSnapshots(); err != nil { log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": from, "error": err}).Errorf("") return err } if len(fromSnapshots) == 0 { return nil } if toSnapshots, err = to.ValidSnapshots(); err != nil { log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": to, "error": err}).Errorf("") return err } if len(toSnapshots) == 0 { log.WithFields(log.Fields{"from": from, "to": to}).Debugf("initiating destination") if _, err := to.BoxExec("ssh " + from.Box() + " zfs send " + fromSnapshots[0].String() + " | zfs recv -F " + to.Path()); err != nil { log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("") return err } newToSnapshot := &ZfsSnapshot{name: fromSnapshots[0].name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]} toSnapshots = append(toSnapshots, newToSnapshot) cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(newToSnapshot) } fromFromSnapshotId := -1 lastToSnapshot := toSnapshots[len(toSnapshots)-1] log.WithFields(log.Fields{"from": from, "to": to}).Debugf("searching last snapshot %s", lastToSnapshot.String()) for id, v := range fromSnapshots { if v.name == lastToSnapshot.name { fromFromSnapshotId = id log.WithFields(log.Fields{"from": from, "to": to}).Debugf("found %s", v.String()) break } } if fromFromSnapshotId == -1 { err := errors.New("zfs snapshot unsync") log.WithFields(log.Fields{"from": from, "to": to, "error": err}).Errorf("") return err } if fromFromSnapshotId < len(fromSnapshots)-1 { log.WithFields(log.Fields{"from": from, "to": to}).Debugf("transfering from %s to %s", fromSnapshots[fromFromSnapshotId].name, fromSnapshots[len(fromSnapshots)-1].name) if _, err := to.BoxExec("ssh " + from.Box() + " zfs send -I " + fromSnapshots[fromFromSnapshotId].String() + " " + fromSnapshots[len(fromSnapshots)-1].String() + " | zfs recv -F " + to.Path()); err != nil { log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("") return err } for _, v := range fromSnapshots[fromFromSnapshotId+1:] { cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(&ZfsSnapshot{name: v.name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]}) } } return nil }