Merge remote-tracking branch 'origin/master'

This commit is contained in:
shoopea 2019-06-14 11:06:12 +08:00
commit 6ea9dd94e8
5 changed files with 205 additions and 184 deletions

119
bot.go
View File

@ -1,11 +1,9 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"regexp"
"strconv"
"time"
@ -35,8 +33,8 @@ func BotHandlers(b *tb.Bot) {
b.Handle("/g_stock", botGStock)
b.Handle("/msg_export", botMsgExport)
b.Handle("/msg_load", botMsgLoad)
b.Handle("/backup_export", botBackupExport)
b.Handle("/backup_import", botBackupImport)
b.Handle("/help", botHelp)
@ -101,8 +99,8 @@ func botHelp(m *tb.Message) {
/parse_rule <id> - detail for one rule
/timer <ETA> "msg" - schedule msg for client in ETA
/g_stock - check guild's vault
/msg_export - export message database
/msg_load <URL> - import message database from URL`,
/backup_export - export message database
/backup_import <URL> - import message database from URL`,
FromMsgID64: int64(m.ID),
FromChatID64: m.Chat.ID,
}
@ -162,6 +160,16 @@ func botMsgRescan(m *tb.Message) {
if !m.Private() {
return
}
if _, ok := clientsKeepAlive[m.Chat.ID]; !ok && m.Chat.ID != cfg.Bot.Admin {
c := TGCommand{
Type: commandReplyMsg,
Text: "Client not registered",
FromMsgID64: int64(m.ID),
FromChatID64: m.Chat.ID,
}
TGCmdQueue <- c
return
}
r := regexp.MustCompile("^[0-9]+$")
if r.MatchString(m.Payload) {
p := JobPayloadRescanMsg{
@ -202,6 +210,16 @@ func botMsgRescanAll(m *tb.Message) {
if !m.Private() {
return
}
if _, ok := clientsKeepAlive[m.Chat.ID]; !ok && m.Chat.ID != cfg.Bot.Admin {
c := TGCommand{
Type: commandReplyMsg,
Text: "Client not registered",
FromMsgID64: int64(m.ID),
FromChatID64: m.Chat.ID,
}
TGCmdQueue <- c
return
}
p := JobPayloadRescanMsg{
Query: fmt.Sprintf("SELECT o.id FROM obj o WHERE o.obj_type_id = %d AND o.obj_sub_type_id = %d ORDER BY id ASC;", objTypeMessage, objSubTypeMessageUnknown),
MsgID64: int64(m.ID),
@ -231,103 +249,68 @@ func botMsgRescanAll(m *tb.Message) {
return
}
func botMsgExport(m *tb.Message) {
func botBackupExport(m *tb.Message) {
if !m.Private() {
return
}
c := TGCommand{
Type: commandReplyMsg,
Text: `Starting messages export`,
FromMsgID64: int64(m.ID),
FromChatID64: m.Chat.ID,
}
TGCmdQueue <- c
b, err := zipMessages()
log.Printf("botMsgExportAll : Data returned.\n")
logOnError(err, "botMsgExportAll : exportMessages")
if err != nil {
if _, ok := clientsKeepAlive[m.Chat.ID]; !ok && m.Chat.ID != cfg.Bot.Admin {
c := TGCommand{
Type: commandReplyMsg,
Text: `Error exporting messages`,
Text: "Client not registered",
FromMsgID64: int64(m.ID),
FromChatID64: m.Chat.ID,
}
TGCmdQueue <- c
return
} else {
text := fmt.Sprintf("Document size : %d bytes.", len(b))
c := TGCommand{
Type: commandReplyMsg,
Text: text,
FromMsgID64: int64(m.ID),
FromChatID64: m.Chat.ID,
}
TGCmdQueue <- c
}
d := tb.Document{}
d.File = tb.FromReader(bytes.NewReader(b))
d.FileName = fmt.Sprintf("%s.backup.zip", time.Now().Format("20060102150405"))
d.Caption = d.FileName
d.MIME = `application/zip`
c = TGCommand{
Type: commandSendDocument,
Document: d,
ToChatID64: m.Chat.ID,
p := JobPayloadBackupExport{
MsgID64: int64(m.ID),
ChatID64: m.Chat.ID,
}
TGCmdQueue <- c
b, _ := json.Marshal(p)
_, err := createJob(objSubTypeJobBackupExport, objJobPriorityBackup, int64(m.Sender.ID), time.Now(), b)
logOnError(err, "botBackupExport : createJob(objSubTypeJobBackupExport)")
return
}
func botMsgLoad(m *tb.Message) {
func botBackupImport(m *tb.Message) {
if !m.Private() {
return
}
r := regexp.MustCompile("^(http|https)://[a-z0-9./]+.zip$") // https://dump.siteop.biz/20190609163137.backup.zip
if r.MatchString(m.Payload) {
resp, err := http.Get(m.Payload)
logOnError(err, "botMsgLoad : Get")
defer resp.Body.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
if _, ok := clientsKeepAlive[m.Chat.ID]; !ok && m.Chat.ID != cfg.Bot.Admin {
c := TGCommand{
Type: commandReplyMsg,
Text: "File downloaded.",
Text: "Client not registered",
FromMsgID64: int64(m.ID),
FromChatID64: m.Chat.ID,
}
TGCmdQueue <- c
err = UnzipMessages(buf.Bytes())
logOnError(err, "botMsgLoad : UnzipMessages")
c = TGCommand{
Type: commandReplyMsg,
Text: "Messages injected.",
FromMsgID64: int64(m.ID),
FromChatID64: m.Chat.ID,
}
TGCmdQueue <- c
} else {
return
}
r := regexp.MustCompile(`^((http[s]?\:)\/\/)?([^\?\:\/#]+)(\:([0-9]+))?(\/[^\?\#]*)?(\?([^#]*))?(#.*)?.zip$`)
if !r.MatchString(m.Payload) {
c := TGCommand{
Type: commandReplyMsg,
Text: "No file",
Text: "URL not valid.",
FromMsgID64: int64(m.ID),
FromChatID64: m.Chat.ID,
}
TGCmdQueue <- c
return
}
p := JobPayloadBackupImport{
URL: m.Payload,
MsgID64: int64(m.ID),
ChatID64: m.Chat.ID,
}
b, _ := json.Marshal(p)
_, err := createJob(objSubTypeJobBackupImport, objJobPriorityBackup, int64(m.Sender.ID), time.Now(), b)
logOnError(err, "botBackupImport : createJob(objSubTypeJobBackupImport)")
return
}
func botMsgDump(m *tb.Message) {

1
def.go
View File

@ -381,6 +381,7 @@ const (
objJobPriority = 1
objJobPriorityRescanMsg = 2
objJobPriorityRescanAllMsg = 3
objJobPriorityBackup = 4
MQGetMsgWorkers = 12
MQCWMsgQueueSize = 100

166
job.go
View File

@ -1,11 +1,18 @@
package main
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
"time"
tb "gopkg.in/tucnak/telebot.v2"
)
func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Time, payload []byte) (int64, error) {
@ -425,15 +432,87 @@ func jobBackupExport(j Job) {
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobBackupExport : Unmarshal payload")
if err == nil {
m := TGCommand{
Type: commandReplyMsg,
Text: "Backup exported.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
bkp := DataBackup{}
start := time.Now()
milestone := time.Now()
s := new([]ChatWarsMessage)
msgs := *s
ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`)
txt := fmt.Sprintf("Backing up %d messages.", len(ids))
m := TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
i := 0
for _, id := range ids {
msg, err := getObjMsg(id)
logOnError(err, "jobBackupExport : getMsg")
if err == nil {
msgs = append(msgs, *msg)
}
i = i + 1
if time.Now().After(milestone.Add(1 * time.Minute)) {
txt := fmt.Sprintf("Exported %d/%d messages.", i, len(ids))
m = TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
milestone = time.Now()
}
}
bkp.Messages = msgs
b, err := json.Marshal(bkp)
logOnError(err, "jobBackupExport : Marshal")
m = TGCommand{
Type: commandReplyMsg,
Text: `Compressing archive`,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
zbuf := new(bytes.Buffer)
zw := zip.NewWriter(zbuf)
zf, err := zw.Create(`backup.json`)
logOnError(err, "jobBackupExport : Create")
_, err = zf.Write(b)
logOnError(err, "jobBackupExport : Write")
err = zw.Close()
logOnError(err, "jobBackupExport : Close")
d := tb.Document{}
d.File = tb.FromReader(bytes.NewReader(zbuf.Bytes()))
d.FileName = fmt.Sprintf("%s.backup.zip", start.Format("20060102150405"))
d.Caption = d.FileName
d.MIME = `application/zip`
m = TGCommand{
Type: commandReplyMsg,
Text: `Export done.`,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
m = TGCommand{
Type: commandSendDocument,
Document: d,
ToChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupExport : setJobDone")
@ -448,15 +527,70 @@ func jobBackupImport(j Job) {
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobBackupImport : Unmarshal payload")
if err == nil {
m := TGCommand{
Type: commandReplyMsg,
Text: "Backup imported.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
resp, err := http.Get(p.URL)
logOnError(err, "jobBackupImport : Get")
defer resp.Body.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
m := TGCommand{
Type: commandReplyMsg,
Text: "File downloaded.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
z := buf.Bytes()
r := bytes.NewReader(z)
zr, err := zip.NewReader(r, int64(len(z)))
for _, f := range zr.File {
if strings.Compare(f.Name, "backup.json") == 0 {
rc, err := f.Open()
logOnError(err, "jobBackupImport : Open")
if err != nil {
return
}
data, err := ioutil.ReadAll(rc)
logOnError(err, "jobBackupImport : ReadAll")
if err != nil {
return
}
log.Printf("jobBackupImport : %d uncompressed bytes.\n", len(data))
rc.Close()
bkp := DataBackup{}
err = json.Unmarshal(data, &bkp)
logOnError(err, "jobBackupImport : Unmarshal")
if err != nil {
return
}
for _, msg := range bkp.Messages {
MQCWMsgQueue <- msg
}
m := TGCommand{
Type: commandReplyMsg,
Text: "Backup restored.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupImport : setJobDone")
return
}
}
m = TGCommand{
Type: commandReplyMsg,
Text: "Not backup file found in archive.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupImport : setJobDone")

97
msg.go
View File

@ -1,11 +1,7 @@
package main
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"regexp"
"strconv"
@ -273,96 +269,3 @@ func parseSubTypeMessagePillageInc(m *ChatWarsMessage, r *regexp.Regexp) (*ChatW
return &cwm, nil
}
func zipMessages() ([]byte, error) {
bkp := DataBackup{}
s := new([]ChatWarsMessage)
msgs := *s
ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`)
i := 0
for _, id := range ids {
m, err := getObjMsg(id)
logOnError(err, "zipMessages : getMsg")
if err == nil {
msgs = append(msgs, *m)
}
i = i + 1
if i%10000 == 0 {
log.Printf("zipMessages : Exported %d messages.\n", i)
}
}
bkp.Messages = msgs
b, err := json.Marshal(bkp)
logOnError(err, "exportMessages : Marshal")
if err != nil {
return nil, err
}
zbuf := new(bytes.Buffer)
zw := zip.NewWriter(zbuf)
zf, err := zw.Create(`backup.json`)
logOnError(err, "exportMessages : Create")
if err != nil {
return nil, err
}
_, err = zf.Write(b)
logOnError(err, "exportMessages : Write")
if err != nil {
return nil, err
}
err = zw.Close()
logOnError(err, "exportMessages : Close")
if err != nil {
return nil, err
}
return zbuf.Bytes(), nil
}
func UnzipMessages(z []byte) error {
log.Printf("UnzipMessages : %d bytes.\n", len(z))
r := bytes.NewReader(z)
zr, err := zip.NewReader(r, int64(len(z)))
logOnError(err, "UnzipMessages : NewReader")
if err != nil {
return err
}
for _, f := range zr.File {
log.Printf("File : %s\n", f.Name)
if strings.Compare(f.Name, "backup.json") == 0 {
rc, err := f.Open()
logOnError(err, "UnzipMessages : Open")
if err != nil {
return err
}
data, err := ioutil.ReadAll(rc)
logOnError(err, "UnzipMessages : ReadAll")
if err != nil {
return err
}
log.Printf("UnzipMessages : %d uncompressed bytes.\n", len(data))
rc.Close()
bkp := DataBackup{}
err = json.Unmarshal(data, &bkp)
logOnError(err, "UnzipMessages : Unmarshal")
if err != nil {
return err
}
log.Printf("UnzipMessages : %d messages.\n", len(bkp.Messages))
for _, m := range bkp.Messages {
_, err = addObjMsg(m.ID64, m.ChatID64, m.UserID64, m.SenderUserID64, m.Date, m.Text)
}
return nil
}
}
log.Printf("Not backup file found.\n")
return nil
}

View File

@ -327,10 +327,10 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
//log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
err = clientsQueue[c.FromUserID64].Channel.Publish(
"", // exchange
"", // exchange
clientsQueue[c.FromUserID64].Queue.Name, // routing key
false, // mandatory
false, // immediate
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(j),