start using ssh pipes
This commit is contained in:
parent
1a1713eb14
commit
f616c4ccf9
1
app.go
1
app.go
@ -486,7 +486,6 @@ func (a *App) Transfer() error {
|
|||||||
log.WithFields(log.Fields{"app": a.name, "call": "Mkdir", "attr": dest, "error": err}).Errorf("")
|
log.WithFields(log.Fields{"app": a.name, "call": "Mkdir", "attr": dest, "error": err}).Errorf("")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := TransferZfs(src, dest2); err != nil {
|
if err := TransferZfs(src, dest2); err != nil {
|
||||||
log.WithFields(log.Fields{"app": a.name, "call": "TransferZfs", "src": src, "dest": dest, "error": err}).Errorf("")
|
log.WithFields(log.Fields{"app": a.name, "call": "TransferZfs", "src": src, "dest": dest, "error": err}).Errorf("")
|
||||||
return err
|
return err
|
||||||
|
21
box.go
21
box.go
@ -19,6 +19,7 @@ type Box struct {
|
|||||||
sshPool pool.Pool
|
sshPool pool.Pool
|
||||||
created bool
|
created bool
|
||||||
online bool
|
online bool
|
||||||
|
allowDirectConnect bool
|
||||||
mx sync.Mutex
|
mx sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -30,7 +31,7 @@ type BoxSshPool struct {
|
|||||||
mx sync.Mutex
|
mx sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) NewBox(name, addr, user, key string) (b *Box, err error) {
|
func (c *Config) NewBox(name, addr, user, key string, direct bool) (b *Box, err error) {
|
||||||
log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("starting")
|
log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("starting")
|
||||||
defer log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("done")
|
defer log.WithFields(log.Fields{"name": name, "addr": addr, "user": user, "key": key}).Debugf("done")
|
||||||
|
|
||||||
@ -58,6 +59,7 @@ func (c *Config) NewBox(name, addr, user, key string) (b *Box, err error) {
|
|||||||
sshPool: p,
|
sshPool: p,
|
||||||
online: false,
|
online: false,
|
||||||
created: true,
|
created: true,
|
||||||
|
allowDirectConnect: true, //FIXME use direct
|
||||||
}
|
}
|
||||||
|
|
||||||
b.zfs.box = b
|
b.zfs.box = b
|
||||||
@ -144,8 +146,15 @@ func TransferZfs(from, to Addr) error {
|
|||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
fromSnapshots, toSnapshots []*ZfsSnapshot
|
fromSnapshots, toSnapshots []*ZfsSnapshot
|
||||||
|
directTransfer bool
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if cfg.box[from.Box()].allowDirectConnect && cfg.box[to.Box()].allowDirectConnect {
|
||||||
|
directTransfer = true
|
||||||
|
} else {
|
||||||
|
directTransfer = false
|
||||||
|
}
|
||||||
|
|
||||||
if fromSnapshots, err = from.ValidSnapshots(); err != nil {
|
if fromSnapshots, err = from.ValidSnapshots(); err != nil {
|
||||||
log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": from, "error": err}).Errorf("")
|
log.WithFields(log.Fields{"from": from, "to": to, "call": "ValidSnapshots", "attr": from, "error": err}).Errorf("")
|
||||||
return err
|
return err
|
||||||
@ -162,6 +171,7 @@ func TransferZfs(from, to Addr) error {
|
|||||||
|
|
||||||
if len(toSnapshots) == 0 {
|
if len(toSnapshots) == 0 {
|
||||||
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("initiating destination")
|
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("initiating destination")
|
||||||
|
if directTransfer {
|
||||||
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send " + fromSnapshots[0].String() + " | zfs recv -F " + to.Path()); err != nil {
|
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send " + fromSnapshots[0].String() + " | zfs recv -F " + to.Path()); err != nil {
|
||||||
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
|
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
|
||||||
return err
|
return err
|
||||||
@ -169,6 +179,10 @@ func TransferZfs(from, to Addr) error {
|
|||||||
newToSnapshot := &ZfsSnapshot{name: fromSnapshots[0].name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]}
|
newToSnapshot := &ZfsSnapshot{name: fromSnapshots[0].name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]}
|
||||||
toSnapshots = append(toSnapshots, newToSnapshot)
|
toSnapshots = append(toSnapshots, newToSnapshot)
|
||||||
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(newToSnapshot)
|
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(newToSnapshot)
|
||||||
|
} else {
|
||||||
|
//handle indirect transfer
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fromFromSnapshotId := -1
|
fromFromSnapshotId := -1
|
||||||
@ -190,11 +204,14 @@ func TransferZfs(from, to Addr) error {
|
|||||||
|
|
||||||
if fromFromSnapshotId < len(fromSnapshots)-1 {
|
if fromFromSnapshotId < len(fromSnapshots)-1 {
|
||||||
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("transfering from %s to %s", fromSnapshots[fromFromSnapshotId].name, fromSnapshots[len(fromSnapshots)-1].name)
|
log.WithFields(log.Fields{"from": from, "to": to}).Debugf("transfering from %s to %s", fromSnapshots[fromFromSnapshotId].name, fromSnapshots[len(fromSnapshots)-1].name)
|
||||||
|
if directTransfer {
|
||||||
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send -I " + fromSnapshots[fromFromSnapshotId].String() + " " + fromSnapshots[len(fromSnapshots)-1].String() + " | zfs recv -F " + to.Path()); err != nil {
|
if _, err := to.BoxExec("ssh " + from.Box() + " zfs send -I " + fromSnapshots[fromFromSnapshotId].String() + " " + fromSnapshots[len(fromSnapshots)-1].String() + " | zfs recv -F " + to.Path()); err != nil {
|
||||||
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
|
log.WithFields(log.Fields{"from": from, "to": to, "call": "BoxExec", "error": err}).Errorf("")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// handle indirect transfer
|
||||||
|
}
|
||||||
|
|
||||||
for _, v := range fromSnapshots[fromFromSnapshotId+1:] {
|
for _, v := range fromSnapshots[fromFromSnapshotId+1:] {
|
||||||
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(&ZfsSnapshot{name: v.name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]})
|
cfg.box[to.Box()].zfs.filesystems[to.Path()].AddSnapshot(&ZfsSnapshot{name: v.name, fs: cfg.box[to.Box()].zfs.filesystems[to.Path()]})
|
||||||
|
@ -40,6 +40,7 @@ type BoxConfig struct {
|
|||||||
Addr string `json:"addr"`
|
Addr string `json:"addr"`
|
||||||
User string `json:"user"`
|
User string `json:"user"`
|
||||||
Key string `json:"key"`
|
Key string `json:"key"`
|
||||||
|
AllowDirectConnect bool `json:"allow_direct_connect"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type AppConfig struct {
|
type AppConfig struct {
|
||||||
@ -135,7 +136,7 @@ func LoadConfigByte(conf []byte) (*Config, error) {
|
|||||||
|
|
||||||
c.box = make(map[string]*Box)
|
c.box = make(map[string]*Box)
|
||||||
for k, v := range c.Box {
|
for k, v := range c.Box {
|
||||||
if b, err := c.NewBox(k, v.Addr, v.User, v.Key); err != nil {
|
if b, err := c.NewBox(k, v.Addr, v.User, v.Key, v.AllowDirectConnect); err != nil {
|
||||||
log.WithFields(log.Fields{"call": "NewBox", "attr": k, "error": err}).Errorf("")
|
log.WithFields(log.Fields{"call": "NewBox", "attr": k, "error": err}).Errorf("")
|
||||||
return nil, err
|
return nil, err
|
||||||
} else {
|
} else {
|
||||||
|
69
ssh.go
69
ssh.go
@ -1,7 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -18,6 +18,10 @@ type Ssh struct {
|
|||||||
signer ssh.Signer
|
signer ssh.Signer
|
||||||
config *ssh.ClientConfig
|
config *ssh.ClientConfig
|
||||||
client *ssh.Client
|
client *ssh.Client
|
||||||
|
session *ssh.Session
|
||||||
|
in io.WriteCloser
|
||||||
|
out io.Reader
|
||||||
|
err io.Reader
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSsh(name, addr, user, key string) (*Ssh, error) {
|
func NewSsh(name, addr, user, key string) (*Ssh, error) {
|
||||||
@ -101,27 +105,66 @@ func (s *Ssh) Exec(cmd string) (string, error) {
|
|||||||
log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("starting")
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("starting")
|
||||||
defer log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("done")
|
defer log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("done")
|
||||||
|
|
||||||
session, err := s.client.NewSession()
|
if err := s.ExecPipe(cmd); err != nil {
|
||||||
if err != nil {
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "ssh.ExecPipe", "error": err}).Errorf("")
|
||||||
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "client.NewSession", "error": err}).Errorf("")
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
defer session.Close()
|
defer s.session.Close()
|
||||||
|
|
||||||
if session.Setenv("TZ", cfg.Timezone); err != nil {
|
if err := s.session.Wait(); err != nil {
|
||||||
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.Setenv", "error": err}).Errorf("")
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.Setenv", "error": err}).Errorf("")
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
var bufout, buferr bytes.Buffer
|
buf, err := io.ReadAll(s.out)
|
||||||
session.Stdout = &bufout
|
|
||||||
session.Stderr = &buferr
|
|
||||||
|
|
||||||
err = session.Run(cmd)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.Run", "error": err, "stderr": buferr.String()}).Errorf("")
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "io.ReadAll", "error": err}).Errorf("")
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return bufout.String(), nil
|
return string(buf), nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Ssh) ExecPipe(cmd string) error {
|
||||||
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("starting")
|
||||||
|
defer log.WithFields(log.Fields{"name": s.name, "cmd": cmd}).Debugf("done")
|
||||||
|
|
||||||
|
session, err := s.client.NewSession()
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "client.NewSession", "error": err}).Errorf("")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.session = session
|
||||||
|
|
||||||
|
if s.session.Setenv("TZ", cfg.Timezone); err != nil {
|
||||||
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.Setenv", "error": err}).Errorf("")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.in, err = s.session.StdinPipe(); err != nil {
|
||||||
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.StdinPipe", "error": err}).Errorf("")
|
||||||
|
s.session.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.out, err = s.session.StdoutPipe(); err != nil {
|
||||||
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.StdoutPipe", "error": err}).Errorf("")
|
||||||
|
s.session.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.err, err = s.session.StderrPipe(); err != nil {
|
||||||
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.StderrPipe", "error": err}).Errorf("")
|
||||||
|
s.session.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = s.session.Start(cmd); err != nil {
|
||||||
|
log.WithFields(log.Fields{"name": s.name, "cmd": cmd, "call": "session.Start", "error": err}).Errorf("")
|
||||||
|
s.session.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
// Code generated by version.sh (@generated) DO NOT EDIT.
|
// Code generated by version.sh (@generated) DO NOT EDIT.
|
||||||
package main
|
package main
|
||||||
var githash = "b07a745"
|
var githash = "1a1713e"
|
||||||
var branch = "v2"
|
var branch = "v2"
|
||||||
var buildstamp = "2023-08-17_17:25:15"
|
var buildstamp = "2023-08-21_12:35:47"
|
||||||
var commits = "82"
|
var commits = "83"
|
||||||
var version = "b07a745-b82 - 2023-08-17_17:25:15"
|
var version = "1a1713e-b83 - 2023-08-21_12:35:47"
|
||||||
|
Loading…
Reference in New Issue
Block a user