From f616c4ccf97577f8c32acc2d8d14800408398e4a Mon Sep 17 00:00:00 2001 From: shoopea Date: Mon, 21 Aug 2023 14:37:47 +0200 Subject: [PATCH] start using ssh pipes --- app.go | 1 - box.go | 63 ++++++++++++++++++++++++++++---------------- config.go | 9 ++++--- ssh.go | 77 ++++++++++++++++++++++++++++++++++++++++++------------ version.go | 8 +++--- 5 files changed, 109 insertions(+), 49 deletions(-) diff --git a/app.go b/app.go index d2ffa13..5a08c65 100644 --- a/app.go +++ b/app.go @@ -486,7 +486,6 @@ func (a *App) Transfer() error { log.WithFields(log.Fields{"app": a.name, "call": "Mkdir", "attr": dest, "error": err}).Errorf("") return err } - if err := TransferZfs(src, dest2); err != nil { log.WithFields(log.Fields{"app": a.name, "call": "TransferZfs", "src": src, "dest": dest, "error": err}).Errorf("") return err diff --git a/box.go b/box.go index 35bb7d2..a9ff95d 100644 --- a/box.go +++ b/box.go @@ -11,15 +11,16 @@ import ( ) type Box struct { - name string - addr string - user string - key string - zfs *BoxZfs - sshPool pool.Pool - created bool - online bool - mx sync.Mutex + name string + addr string + user string + key string + zfs *BoxZfs + sshPool pool.Pool + created bool + online bool + allowDirectConnect bool + mx sync.Mutex } type BoxSshPool struct { @@ -30,7 +31,7 @@ type BoxSshPool struct { mx sync.Mutex } -func (c *Config) NewBox(name, addr, user, key string) (b *Box, err error) { +func (c *Config) NewBox(name, addr, user, key string, direct bool) (b *Box, err error) { log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("starting") defer log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("done") @@ -55,9 +56,10 @@ func (c *Config) NewBox(name, addr, user, key string) (b *Box, err error) { zfs: &BoxZfs{ online: false, }, - sshPool: p, - online: false, - created: true, + sshPool: p, + online: false, + created: true, + allowDirectConnect: true, //FIXME use direct } b.zfs.box = b @@ -144,8 +146,15 @@ func TransferZfs(from, to Addr) error { var ( err error fromSnapshots, toSnapshots []*ZfsSnapshot + directTransfer bool ) + if cfg.box[from.Box()].allowDirectConnect && cfg.box[to.Box()].allowDirectConnect { + directTransfer = true + } else { + directTransfer = false + } + if fromSnapshots, err = from.ValidSnapshots(); err != nil { log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": from, "error": err}).Errorf("") return err @@ -162,13 +171,18 @@ func TransferZfs(from, to Addr) error { if len(toSnapshots) == 0 { log.WithFields(log.Fields{"from": from, "to": to}).Debugf("initiating destination") - if _, err := to.BoxExec("ssh " + from.Box() + " zfs send " + fromSnapshots[0].String() + " | zfs recv -F " + to.Path()); err != nil { - log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("") - return err + if directTransfer { + if _, err := to.BoxExec("ssh " + from.Box() + " zfs send " + fromSnapshots[0].String() + " | zfs recv -F " + to.Path()); err != nil { + log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("") + return err + } + newToSnapshot := &ZfsSnapshot{name: fromSnapshots[0].name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]} + toSnapshots = append(toSnapshots, newToSnapshot) + cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(newToSnapshot) + } else { + //handle indirect transfer } - newToSnapshot := &ZfsSnapshot{name: fromSnapshots[0].name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]} - toSnapshots = append(toSnapshots, newToSnapshot) - cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(newToSnapshot) + } fromFromSnapshotId := -1 @@ -190,10 +204,13 @@ func TransferZfs(from, to Addr) error { if fromFromSnapshotId < len(fromSnapshots)-1 { log.WithFields(log.Fields{"from": from, "to": to}).Debugf("transfering from %s to %s", fromSnapshots[fromFromSnapshotId].name, fromSnapshots[len(fromSnapshots)-1].name) - - if _, err := to.BoxExec("ssh " + from.Box() + " zfs send -I " + fromSnapshots[fromFromSnapshotId].String() + " " + fromSnapshots[len(fromSnapshots)-1].String() + " | zfs recv -F " + to.Path()); err != nil { - log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("") - return err + if directTransfer { + if _, err := to.BoxExec("ssh " + from.Box() + " zfs send -I " + fromSnapshots[fromFromSnapshotId].String() + " " + fromSnapshots[len(fromSnapshots)-1].String() + " | zfs recv -F " + to.Path()); err != nil { + log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("") + return err + } + } else { + // handle indirect transfer } for _, v := range fromSnapshots[fromFromSnapshotId+1:] { diff --git a/config.go b/config.go index ca9bee7..bb314cc 100644 --- a/config.go +++ b/config.go @@ -37,9 +37,10 @@ var ( var sampleCfg []byte type BoxConfig struct { - Addr string `json:"addr"` - User string `json:"user"` - Key string `json:"key"` + Addr string `json:"addr"` + User string `json:"user"` + Key string `json:"key"` + AllowDirectConnect bool `json:"allow_direct_connect"` } type AppConfig struct { @@ -135,7 +136,7 @@ func LoadConfigByte(conf []byte) (*Config, error) { c.box = make(map[string]*Box) for k, v := range c.Box { - if b, err := c.NewBox(k, v.Addr, v.User, v.Key); err != nil { + if b, err := c.NewBox(k, v.Addr, v.User, v.Key, v.AllowDirectConnect); err != nil { log.WithFields(log.Fields{"call": "NewBox", "attr": k, "error": err}).Errorf("") return nil, err } else { diff --git a/ssh.go b/ssh.go index fa722a8..557a5e9 100644 --- a/ssh.go +++ b/ssh.go @@ -1,7 +1,7 @@ package main import ( - "bytes" + "io" "os" "time" @@ -14,10 +14,14 @@ const SshDialTimeout = time.Duration(10 * time.Second) const SshInactivityTimeout = time.Duration(time.Minute) type Ssh struct { - name string - signer ssh.Signer - config *ssh.ClientConfig - client *ssh.Client + name string + signer ssh.Signer + config *ssh.ClientConfig + client *ssh.Client + session *ssh.Session + in io.WriteCloser + out io.Reader + err io.Reader } func NewSsh(name, addr, user, key string) (*Ssh, error) { @@ -101,27 +105,66 @@ func (s *Ssh) Exec(cmd string) (string, error) { log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("starting") defer log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("done") - session, err := s.client.NewSession() - if err != nil { - log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "client.NewSession", "error": err}).Errorf("") + if err := s.ExecPipe(cmd); err != nil { + log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "ssh.ExecPipe", "error": err}).Errorf("") return "", err } - defer session.Close() + defer s.session.Close() - if session.Setenv("TZ", cfg.Timezone); err != nil { + if err := s.session.Wait(); err != nil { log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.Setenv", "error": err}).Errorf("") return "", err } - var bufout, buferr bytes.Buffer - session.Stdout = &bufout - session.Stderr = &buferr - - err = session.Run(cmd) + buf, err := io.ReadAll(s.out) if err != nil { - log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.Run", "error": err, "stderr": buferr.String()}).Errorf("") + log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "io.ReadAll", "error": err}).Errorf("") return "", err } - return bufout.String(), nil + return string(buf), nil + +} + +func (s *Ssh) ExecPipe(cmd string) error { + log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("starting") + defer log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("done") + + session, err := s.client.NewSession() + if err != nil { + log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "client.NewSession", "error": err}).Errorf("") + return err + } + s.session = session + + if s.session.Setenv("TZ", cfg.Timezone); err != nil { + log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.Setenv", "error": err}).Errorf("") + return err + } + + if s.in, err = s.session.StdinPipe(); err != nil { + log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.StdinPipe", "error": err}).Errorf("") + s.session.Close() + return err + } + + if s.out, err = s.session.StdoutPipe(); err != nil { + log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.StdoutPipe", "error": err}).Errorf("") + s.session.Close() + return err + } + + if s.err, err = s.session.StderrPipe(); err != nil { + log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.StderrPipe", "error": err}).Errorf("") + s.session.Close() + return err + } + + if err = s.session.Start(cmd); err != nil { + log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.Start", "error": err}).Errorf("") + s.session.Close() + return err + } + + return nil } diff --git a/version.go b/version.go index f4704b2..071afc2 100644 --- a/version.go +++ b/version.go @@ -1,7 +1,7 @@ // Code generated by version.sh (@generated) DO NOT EDIT. package main -var githash = "b07a745" +var githash = "1a1713e" var branch = "v2" -var buildstamp = "2023-08-17_17:25:15" -var commits = "82" -var version = "b07a745-b82 - 2023-08-17_17:25:15" +var buildstamp = "2023-08-21_12:35:47" +var commits = "83" +var version = "1a1713e-b83 - 2023-08-21_12:35:47"