From 38806b62d1b24e22d566c40c9fa131fd748b2c45 Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 22:50:01 +0800 Subject: [PATCH 01/11] testtest --- bot.go | 4 +- def.go | 1 + job.go | 877 +++++++++++++++++++++++++++++------------------------ msg.go | 93 ------ workers.go | 6 +- 5 files changed, 481 insertions(+), 500 deletions(-) diff --git a/bot.go b/bot.go index 473bdf1..1ef1249 100644 --- a/bot.go +++ b/bot.go @@ -35,8 +35,8 @@ func BotHandlers(b *tb.Bot) { b.Handle("/g_stock", botGStock) - b.Handle("/msg_export", botMsgExport) - b.Handle("/msg_load", botMsgLoad) + b.Handle("/backup_export", botBackupExport) + b.Handle("/backup_import", botBackupImport) b.Handle("/help", botHelp) diff --git a/def.go b/def.go index e9ac189..5f9abc0 100644 --- a/def.go +++ b/def.go @@ -381,6 +381,7 @@ const ( objJobPriority = 1 objJobPriorityRescanMsg = 2 objJobPriorityRescanAllMsg = 3 + objJobPriorityBackup = 4 MQGetMsgWorkers = 12 MQCWMsgQueueSize = 100 diff --git a/job.go b/job.go index e6ab6d7..58050ae 100644 --- a/job.go +++ b/job.go @@ -1,464 +1,537 @@ package main import ( + "bytes" "encoding/json" "fmt" + "log" + "net/http" + "regexp" "strconv" - "strings" "time" + + tb "gopkg.in/tucnak/telebot.v2" ) -func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Time, payload []byte) (int64, error) { - stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id) - VALUES (? , ?);`) - logOnError(err, "createJob : prepare insert obj") - if err != nil { - return 0, err - } - defer stmt.Close() - - res, err := stmt.Exec(objTypeJob, jobTypeID) - s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID) - logOnError(err, s) - if err != nil { - return 0, err - } - - objId, err := res.LastInsertId() - logOnError(err, "createJob : get last insert Id") - if err != nil { - return 0, err - } - - stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, seq_nr, schedule, is_done, in_work, inserted, pulled, started, ended, payload) - VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) - logOnError(err, "createJob : prepare insert obj_job") - if err != nil { - return 0, err - } - defer stmt.Close() - - _, err = stmt.Exec(objId, priority, userID64, objJobStatusNew, schedule.UTC(), time.Now().UTC(), payload) - logOnError(err, "createJob : insert obj_job") - if err != nil { - return 0, err - } - - return objId, nil -} - -func setJobDone(jobId int64) error { - stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) - logOnError(err, "setJobDone : prepare update obj_job") - if err != nil { - return err - } - defer stmt.Close() - - _, err = stmt.Exec(time.Now().UTC(), jobId) - s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobId) - logOnError(err, s) - if err != nil { - return err - } - return nil -} - -func setJobStart(jobId int64) error { - stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`) - logOnError(err, "setJobStart : prepare update obj_job") - if err != nil { - return err - } - defer stmt.Close() - - _, err = stmt.Exec(time.Now().UTC(), jobId) - s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId) - logOnError(err, s) - if err != nil { - return err - } - return nil -} - -func rescheduleJob(jobID64 int64, status int32, schedule time.Time) error { - stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ?, j.status = ? WHERE j.obj_id = ?;`) - logOnError(err, "rescheduleJob : prepare update obj_job") - if err != nil { - return err - } - defer stmt.Close() - - _, err = stmt.Exec(schedule.UTC(), status, jobID64) - s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobID64) - logOnError(err, s) - if err != nil { - return err - } - return nil -} - -func loadCurrentJobs() ([]Job, error) { - var ( - objId int64 - jobTypeId int32 - userID64 int64 - status int32 - payload []byte - jobs []Job - ) - - t := time.Now().UTC() - r := RndInt64() - - _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize) - logOnError(err, "loadCurrentJobs : update intial rows") - - stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") - logOnError(err, "loadCurrentJobs : prepare select statement") - - rows, err := stmt.Query(r) - // rows, err := stmt.Query(time.Now()) - logOnError(err, "loadCurrentJobs : query select statement") - - for rows.Next() { - err = rows.Scan(&objId, &jobTypeId, &status, &userID64, &payload) - logOnError(err, "loadCurrentJobs : scan query rows") - job := Job{ - ID64: objId, - JobTypeID: jobTypeId, - Status: status, - UserID64: userID64, - Payload: payload, +func BotHandlers(b *tb.Bot) { + b.Handle("/hello", func(m *tb.Message) { + s, err := botHello(m) + logOnError(err, "/hello") + if err == nil { + b.Send(m.Sender, s) } - jobs = append(jobs, job) - } - err = rows.Err() - logOnError(err, "loadCurrentJobs : scan end rows") - rows.Close() + }) - err = stmt.Close() - logOnError(err, "loadCurrentJobs : close select statement") + b.Handle("/test", botTest) + b.Handle("/test_html", botTestHTML) + b.Handle("/msg_rescan", botMsgRescan) + b.Handle("/msg_rescan_all", botMsgRescanAll) + b.Handle("/msg_dump", botMsgDump) - return jobs, nil + b.Handle("/parse_rules", botListParsingRules) + b.Handle("/parse_rule", botListParsingRule) + + b.Handle("/timer", botTimer) + + b.Handle("/g_stock", botGStock) + + b.Handle("/backup_export", botBackupExport) + b.Handle("/backup_import", botBackupImport) + + b.Handle("/help", botHelp) + + b.Handle(tb.OnPhoto, botPhoto) + b.Handle(tb.OnChannelPost, botChannelPost) + b.Handle(tb.OnQuery, botQuery) + b.Handle(tb.OnText, botText) + b.Handle(tb.OnDocument, botDocument) + + b.Start() } -func jobRescan(j Job) { - var r JobPayloadRescanMsg +func botPhoto(m *tb.Message) { + fmt.Println("botPhoto :", m.Text) + // photos only +} - err := setJobStart(j.ID64) - logOnError(err, "jobRescan : setJobStart") +func botDocument(m *tb.Message) { + fmt.Printf("botDocument : %s (%d bytes)\n", m.Document.FileName, m.Document.File.FileSize) + // documents only +} - err = json.Unmarshal(j.Payload, &r) - logOnError(err, "jobRescan : Unmarshal payload") +func botHello(m *tb.Message) (string, error) { + fmt.Println("botHello :", m.Text) + if !m.Private() { + fmt.Println("botHello : !m.Private()") + return ``, nil + } + // fmt.Println("Hello payload :", m.Payload) // + PrintText(m) + return `hello world`, nil +} - start := time.Now() +func botChannelPost(m *tb.Message) { + fmt.Println("botChannelPost :", m.Text) + PrintText(m) + // channel posts only +} - ids := getSQLListID64(r.Query) +func botQuery(q *tb.Query) { + fmt.Println("botQuery") + // incoming inline queries +} - if len(ids) > 1 { - for _, id := range ids { - SQLMsgIdentifyQueue <- id +func botText(m *tb.Message) { + fmt.Println("botText :", m.Text) + PrintText(m) + // all the text messages that weren't + // captured by existing handlers +} + +func botHelp(m *tb.Message) { + if !m.Private() { + return + } + c := TGCommand{ + Type: commandReplyMsg, + Text: `/help - this help +/msg_rescan - rescan one message +/msg_rescan_all - rescan all messages +/parse_rules - list parsing rules\n +/parse_rule - detail for one rule +/timer "msg" - schedule msg for client in ETA +/g_stock - check guild's vault +/backup_export - export backup database +/backup_import - import backup database from URL`, + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return +} + +func botTestHTML(m *tb.Message) { + if !m.Private() { + return + } + c := TGCommand{ + Type: commandReplyMsg, + Text: `bold, +bold, +italic, +italic, +inline URL, +inline fixed-width code, +
pre-formatted fixed-width code block
`, + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + ParseMode: cmdParseModeHTML, + } + TGCmdQueue <- c + + return +} + +func botTest(m *tb.Message) { + if !m.Private() { + return + } + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { + clientSendCWMsg(m.Chat.ID, "🏅Me") + + c := TGCommand{ + Type: commandReplyMsg, + Text: "Test sent", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, } - p := JobPayloadSetDone{ - JobID64: j.ID64, - MsgID64: r.MsgID64, - ChatID64: r.ChatID64, - Text: fmt.Sprintf("%d messages processed in %s.", len(ids), time.Since(start)), + TGCmdQueue <- c + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } + return +} + +func botMsgRescan(m *tb.Message) { + if !m.Private() { + return + } + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } + + r := regexp.MustCompile("^[0-9]+$") + if r.MatchString(m.Payload) { + p := JobPayloadRescanMsg{ + Query: fmt.Sprintf("SELECT o.id FROM obj o WHERE o.id = %s AND o.obj_type_id = %d AND o.obj_sub_type_id = %d;", m.Payload, objTypeMessage, objSubTypeMessageUnknown), + MsgID64: int64(m.ID), + ChatID64: m.Chat.ID, } b, _ := json.Marshal(p) - _, err := createJob(objSubTypeJobSetJobDone, objJobPriorityRescanAllMsg, j.UserID64, time.Now().UTC(), b) - logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)") - - } else if len(ids) == 1 { - SQLMsgIdentifyQueue <- ids[0] - err = setJobDone(j.ID64) - logOnError(err, "jobRescan : setJobDone(1)") - if r.MsgID64 != 0 || r.ChatID64 != 0 { - m := TGCommand{ + log.Printf("botMsgRescan : json : %s\n", string(b)) + _, err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanMsg, int64(m.Sender.ID), time.Now(), b) + logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") + if err != nil { + c := TGCommand{ Type: commandReplyMsg, - Text: "One message processed.", - FromMsgID64: r.MsgID64, - FromChatID64: r.ChatID64, + Text: fmt.Sprintf("Error scheduling the rescan for msg #%s", m.Payload), + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, } - TGCmdQueue <- m - } - } else { - err = setJobDone(j.ID64) - logOnError(err, "jobRescan : setJobDone(0)") - if r.MsgID64 != 0 || r.ChatID64 != 0 { - m := TGCommand{ + TGCmdQueue <- c + } else { + c := TGCommand{ Type: commandReplyMsg, - Text: "No message processed.", - FromMsgID64: r.MsgID64, - FromChatID64: r.ChatID64, + Text: fmt.Sprintf("Rescaning msg #%s", m.Payload), + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, } - TGCmdQueue <- m + TGCmdQueue <- c } } + r = regexp.MustCompile("^all$") + if r.MatchString(m.Payload) { + botMsgRescanAll(m) + } return } -func jobSetDone(j Job) { - var r JobPayloadSetDone +func botMsgRescanAll(m *tb.Message) { + if !m.Private() { + return + } - err := setJobStart(j.ID64) - logOnError(err, "jobSetDone : setJobStart") + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } - err = json.Unmarshal(j.Payload, &r) - logOnError(err, "jobSetDone : Unmarshal payload") + p := JobPayloadRescanMsg{ + Query: fmt.Sprintf("SELECT o.id FROM obj o WHERE o.obj_type_id = %d AND o.obj_sub_type_id = %d ORDER BY id ASC;", objTypeMessage, objSubTypeMessageUnknown), + MsgID64: int64(m.ID), + ChatID64: m.Chat.ID, + } + b, _ := json.Marshal(p) + _, err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanAllMsg, int64(m.Sender.ID), time.Now(), b) + logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") - err = setJobDone(r.JobID64) - logOnError(err, "jobSetDone : setJobDone(child)") + if err != nil { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Error scheduling the rescan for all msg.", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Rescaning all msg scheduled.", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } + return +} - err = setJobDone(j.ID64) - logOnError(err, "jobSetDone : setJobDone") +func botBackupExport(m *tb.Message) { + if !m.Private() { + return + } - m := TGCommand{ + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } + + p := JobPayloadBackupExport{ + MsgID64: int64(m.ID), + ChatID64: m.Chat.ID, + } + b, _ := json.Marshal(p) + t := time.Now() + objID64, err := createJob(objSubTypeJobBackupExport, objJobPriorityBackup, int64(m.Chat.ID), t, b) + logOnError(err, "botBackupExport : createJob") + + return +} + +func botBackupImport(m *tb.Message) { + if !m.Private() { + return + } + + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } + + r := regexp.MustCompile("^(http|https)://[a-z0-9./]+.zip$") // https://dump.siteop.biz/20190609163137.backup.zip + if !r.MatchString(m.Payload) { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Missing URL", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } + + p := JobPayloadBackupExport{ + URL: m.Payload, + MsgID64: int64(m.ID), + ChatID64: m.Chat.ID, + } + b, _ := json.Marshal(p) + t := time.Now() + objID64, err := createJob(objSubTypeJobBackupImport, objJobPriorityBackup, int64(m.Chat.ID), t, b) + logOnError(err, "botBackupImport : createJob") + + return +} + +func botMsgDump(m *tb.Message) { + var res string + + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } + + r := regexp.MustCompile("^[0-9]+$") + if r.MatchString(m.Payload) { + objId, _ := strconv.ParseInt(m.Payload, 10, 64) + objTypeId, err := getObjTypeId(objId) + logOnError(err, "botMsgDump : getObjSubTypeId") + if err != nil { + res = `Error retrieving the message` + } else if objTypeId != objTypeMessage { + res = `This is not a message reference` + } else { + cwm, _ := getObjMsg(objId) + b, _ := json.Marshal(cwm) + res = string(b) + } + } else { + res = `/msg_dump ` + } + + c := TGCommand{ Type: commandReplyMsg, - Text: r.Text, - FromMsgID64: r.MsgID64, - FromChatID64: r.ChatID64, + Text: res, + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, } - TGCmdQueue <- m + TGCmdQueue <- c return } -func jobPillage(j Job) { - var r JobPayloadPillage - - err := setJobStart(j.ID64) - logOnError(err, "jobPillage : setJobStart") - - err = json.Unmarshal(j.Payload, &r) - logOnError(err, "jobPillage : Unmarshal payload") - - // check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job - ids := getSQLListID64(` select ox.id - from obj ox - ,obj_msg omx - ,obj op - ,obj_msg omp - ,obj_job oj - where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` - and omx.user_id = oj.user_id - and omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + ` - and omx.obj_id = ox.id - and ox.obj_sub_type_id in (` + strconv.Itoa(objSubTypeMessagePillageGo) + - `, ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + - `, ` + strconv.Itoa(objSubTypeMessagePillageLoss) + - `, ` + strconv.Itoa(objSubTypeMessagePillageWin) + `) - and op.id = ` + strconv.FormatInt(r.ObjID64, 10) + ` - and omp.obj_id = op.id - and omx.date between omp.date and addtime(omp.date, '0 0:3:30.000000') - order by case ox.obj_sub_type_id when ` + strconv.Itoa(objSubTypeMessagePillageWin) + ` then 0 - when ` + strconv.Itoa(objSubTypeMessagePillageLoss) + ` then 1 - when ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` then 2 - when ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` then 3 - else 4 end asc - limit 1;`) - - if len(ids) > 1 { // issue there ? - s := TGCommand{ - Type: commandSendMsg, - Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64), - ToUserID64: j.UserID64, - } - TGCmdQueue <- s - } else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not - m, err := getObjMsg(ids[0]) - logOnError(err, "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)") - if err == nil { - if m.Date.Add(60 * time.Second).After(time.Now().UTC()) { - msgTypeID64, err := getObjSubTypeId(ids[0]) - logOnError(err, "jobPillage : getObjSubTypeId") - if err == nil { - if msgTypeID64 == objSubTypeMessagePillageGo { - s := TGCommand{ - Type: commandSendMsg, - Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)), - ToUserID64: j.UserID64, - } - TGCmdQueue <- s - } else if msgTypeID64 == objSubTypeMessagePillageWin { - s := TGCommand{ - Type: commandSendMsg, - Text: fmt.Sprintf("We avoided a pillage (%s))", m.Date.Format(time.RFC3339)), - ToUserID64: j.UserID64, - } - TGCmdQueue <- s - } else if msgTypeID64 == objSubTypeMessagePillageLoss { - s := TGCommand{ - Type: commandSendMsg, - Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), - ToUserID64: j.UserID64, - } - TGCmdQueue <- s - } else if msgTypeID64 == objSubTypeMessagePillageTimeout { - s := TGCommand{ - Type: commandSendMsg, - Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), - ToUserID64: j.UserID64, - } - TGCmdQueue <- s - } else { - s := TGCommand{ - Type: commandSendMsg, - Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)), - ToUserID64: j.UserID64, - } - TGCmdQueue <- s - } - } - } - } - err = setJobDone(j.ID64) - logOnError(err, "jobSetDone : setJobDone") +func botListParsingRules(m *tb.Message) { + var s string = "" + if !m.Private() { return } - s := TGCommand{ - Type: commandSendMsg, - Text: fmt.Sprintf("No outcome for the pillage yet"), - ToUserID64: j.UserID64, - } - TGCmdQueue <- s - - //no outcome yet, have we sent a "/go" in the last 30 sec ? - ids = getSQLListID64(` select ox.id - from obj ox - ,obj_msg omx - ,obj_job oj - where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` - and omx.user_id = oj.user_id - and omx.sender_user_id = oj.user_id - and omx.obj_id = ox.id - and ox.obj_sub_type_id =` + strconv.Itoa(objSubTypeMessageGo) + ` - and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`) - - if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait - m, err := getObjMsg(ids[0]) - logOnError(err, "jobPillage : getMsg(objSubTypeMessageGo)") - if err == nil { - s := TGCommand{ - Type: commandSendMsg, - Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)), - ToUserID64: j.UserID64, - } - TGCmdQueue <- s - } - err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) - logOnError(err, "jobPillage : rescheduleJob(objSubTypeMessageGo)") - } else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec - clientSendCWMsg(j.UserID64, "/go") - err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) - logOnError(err, "jobPillage : rescheduleJob") - - } - - return -} - -func jobMsgRefresh(j Job) { - var p JobPayloadMsgRefresh - - // identify whether the message has been properly refreshed ? create new job ? reschedule same job ? - err := setJobStart(j.ID64) - logOnError(err, "jobMsgRefresh : setJobStart") - - err = json.Unmarshal(j.Payload, &p) - logOnError(err, "jobMsgRefresh : Unmarshal payload") - - m, err := getObjMsg(p.ObjID64) - - if err != nil && strings.Compare(err.Error(), `sql: no rows in result set`) == 0 { - err = setJobDone(j.ID64) - logOnError(err, "joMsgClient : setJobDone") - return + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { } else { - logOnError(err, "jobMsgRefresh : getObjMsg") - err = setJobDone(j.ID64) - logOnError(err, "joMsgClient : setJobDone") + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c return } - err = delObj(p.ObjID64) - logOnError(err, "jobMsgRefresh : delObj") - - clientRefreshCWMsg(m.UserID64, m.ChatID64, m.ID64) - - err = setJobDone(j.ID64) - logOnError(err, "joMsgClient : setJobDone") - return -} - -func jobMsgClient(j Job) { - var p JobPayloadMsgClient - err := setJobStart(j.ID64) - logOnError(err, "jobMsgClient : setJobStart") - - err = json.Unmarshal(j.Payload, &p) - logOnError(err, "jobMsgClient : Unmarshal payload") - - if err == nil { - clientSendCWMsg(j.UserID64, p.Text) - m := TGCommand{ - Type: commandReplyMsg, - Text: "Message sent.", - FromMsgID64: p.MsgID64, - FromChatID64: p.ChatID64, - } - TGCmdQueue <- m + for _, v := range msgParsingRules { + s = fmt.Sprintf("%s[%d] %s\n", s, v.ID, v.Description) } - err = setJobDone(j.ID64) - logOnError(err, "joMsgClient : setJobDone") + c := TGCommand{ + Type: commandReplyMsg, + Text: s, + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c return } -func jobBackupExport(j Job) { - var p JobPayloadBackupExport - err := setJobStart(j.ID64) - logOnError(err, "jobBackupExport : setJobStart") - - err = json.Unmarshal(j.Payload, &p) - logOnError(err, "jobBackupExport : Unmarshal payload") - - if err == nil { - m := TGCommand{ - Type: commandReplyMsg, - Text: "Backup exported.", - FromMsgID64: p.MsgID64, - FromChatID64: p.ChatID64, - } - TGCmdQueue <- m +func botListParsingRule(m *tb.Message) { + if !m.Private() { + return } - err = setJobDone(j.ID64) - logOnError(err, "jobBackupExport : setJobDone") - return -} - -func jobBackupImport(j Job) { - var p JobPayloadBackupImport - err := setJobStart(j.ID64) - logOnError(err, "jobBackupImport : setJobStart") - - err = json.Unmarshal(j.Payload, &p) - logOnError(err, "jobBackupImport : Unmarshal payload") - - if err == nil { - m := TGCommand{ + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { + } else { + c := TGCommand{ Type: commandReplyMsg, - Text: "Backup imported.", - FromMsgID64: p.MsgID64, - FromChatID64: p.ChatID64, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, } - TGCmdQueue <- m + TGCmdQueue <- c + return + } + + r := regexp.MustCompile("^[0-9]+$") + if r.MatchString(m.Payload) { + i, _ := strconv.ParseInt(m.Payload, 10, 64) + for _, v := range msgParsingRules { + if int64(v.ID) == i { + c := TGCommand{ + Type: commandReplyMsg, + Text: fmt.Sprintf("%s\n", v.Rule), + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } + } + c := TGCommand{ + Type: commandReplyMsg, + Text: fmt.Sprintf("Could not find rule %s\n", m.Payload), + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: fmt.Sprintf("/parse_rule \n"), + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } + return +} + +func botGStock(m *tb.Message) { + if !m.Private() { + return + } + clientSendCWMsg(m.Chat.ID, "/g_stock_res") + clientSendCWMsg(m.Chat.ID, "/g_stock_alch") + clientSendCWMsg(m.Chat.ID, "/g_stock_misc") + clientSendCWMsg(m.Chat.ID, "/g_stock_rec") + clientSendCWMsg(m.Chat.ID, "/g_stock_parts") + clientSendCWMsg(m.Chat.ID, "/g_stock_other") + + c := TGCommand{ + Type: commandReplyMsg, + Text: "Stock requested", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return +} + +func botTimer(m *tb.Message) { + if !m.Private() { + return + } + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } + + r := regexp.MustCompile("^(?P([0-9]*(s|m|h))+) \"(?P(.*))\"$") + if r.MatchString(m.Payload) { + d, err := time.ParseDuration(r.ReplaceAllString(m.Payload, "${Duration}")) + if err != nil { + c := TGCommand{ + Type: commandReplyMsg, + Text: fmt.Sprintf("%s", err), + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } else { + p := JobPayloadMsgClient{ + Text: r.ReplaceAllString(m.Payload, "${Msg}"), + MsgID64: int64(m.ID), + ChatID64: m.Chat.ID, + } + b, _ := json.Marshal(p) + t := time.Now().Add(d) + objID64, err := createJob(objSubTypeJobMsgClient, objJobPriority, int64(m.Chat.ID), t, b) + logOnError(err, "botTimer : createJob") + if err != nil { + c := TGCommand{ + Type: commandReplyMsg, + Text: fmt.Sprintf("%s", err), + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: fmt.Sprintf("Job #%d scheduled at %s", objID64, t.Format(time.RFC850)), + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } + } + } - err = setJobDone(j.ID64) - logOnError(err, "jobBackupImport : setJobDone") return } diff --git a/msg.go b/msg.go index bba3816..8a8b1c7 100644 --- a/msg.go +++ b/msg.go @@ -273,96 +273,3 @@ func parseSubTypeMessagePillageInc(m *ChatWarsMessage, r *regexp.Regexp) (*ChatW return &cwm, nil } - -func zipMessages() ([]byte, error) { - bkp := DataBackup{} - s := new([]ChatWarsMessage) - msgs := *s - ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`) - i := 0 - for _, id := range ids { - m, err := getObjMsg(id) - logOnError(err, "zipMessages : getMsg") - if err == nil { - msgs = append(msgs, *m) - } - i = i + 1 - if i%10000 == 0 { - log.Printf("zipMessages : Exported %d messages.\n", i) - } - - } - - bkp.Messages = msgs - b, err := json.Marshal(bkp) - logOnError(err, "exportMessages : Marshal") - if err != nil { - return nil, err - } - - zbuf := new(bytes.Buffer) - zw := zip.NewWriter(zbuf) - zf, err := zw.Create(`backup.json`) - logOnError(err, "exportMessages : Create") - if err != nil { - return nil, err - } - - _, err = zf.Write(b) - logOnError(err, "exportMessages : Write") - if err != nil { - return nil, err - } - - err = zw.Close() - logOnError(err, "exportMessages : Close") - if err != nil { - return nil, err - } - - return zbuf.Bytes(), nil - -} - -func UnzipMessages(z []byte) error { - log.Printf("UnzipMessages : %d bytes.\n", len(z)) - r := bytes.NewReader(z) - zr, err := zip.NewReader(r, int64(len(z))) - logOnError(err, "UnzipMessages : NewReader") - if err != nil { - return err - } - - for _, f := range zr.File { - log.Printf("File : %s\n", f.Name) - if strings.Compare(f.Name, "backup.json") == 0 { - rc, err := f.Open() - logOnError(err, "UnzipMessages : Open") - if err != nil { - return err - } - data, err := ioutil.ReadAll(rc) - logOnError(err, "UnzipMessages : ReadAll") - if err != nil { - return err - } - log.Printf("UnzipMessages : %d uncompressed bytes.\n", len(data)) - rc.Close() - bkp := DataBackup{} - err = json.Unmarshal(data, &bkp) - logOnError(err, "UnzipMessages : Unmarshal") - if err != nil { - return err - } - log.Printf("UnzipMessages : %d messages.\n", len(bkp.Messages)) - for _, m := range bkp.Messages { - _, err = addObjMsg(m.ID64, m.ChatID64, m.UserID64, m.SenderUserID64, m.Date, m.Text) - } - return nil - } - } - - log.Printf("Not backup file found.\n") - - return nil -} diff --git a/workers.go b/workers.go index 326a63f..9c3c1a7 100644 --- a/workers.go +++ b/workers.go @@ -327,10 +327,10 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) { logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") //log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) err = clientsQueue[c.FromUserID64].Channel.Publish( - "", // exchange + "", // exchange clientsQueue[c.FromUserID64].Queue.Name, // routing key - false, // mandatory - false, // immediate + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: []byte(j), From bc9e58588d8141858dae99f1e7633b5f1950d4a8 Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 22:56:48 +0800 Subject: [PATCH 02/11] test --- job.go | 945 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 472 insertions(+), 473 deletions(-) diff --git a/job.go b/job.go index 58050ae..527fc95 100644 --- a/job.go +++ b/job.go @@ -1,537 +1,536 @@ package main import ( - "bytes" "encoding/json" "fmt" - "log" - "net/http" - "regexp" "strconv" + "strings" "time" - - tb "gopkg.in/tucnak/telebot.v2" ) -func BotHandlers(b *tb.Bot) { - b.Handle("/hello", func(m *tb.Message) { - s, err := botHello(m) - logOnError(err, "/hello") - if err == nil { - b.Send(m.Sender, s) +func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Time, payload []byte) (int64, error) { + stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id) + VALUES (? , ?);`) + logOnError(err, "createJob : prepare insert obj") + if err != nil { + return 0, err + } + defer stmt.Close() + + res, err := stmt.Exec(objTypeJob, jobTypeID) + s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID) + logOnError(err, s) + if err != nil { + return 0, err + } + + objId, err := res.LastInsertId() + logOnError(err, "createJob : get last insert Id") + if err != nil { + return 0, err + } + + stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, seq_nr, schedule, is_done, in_work, inserted, pulled, started, ended, payload) + VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) + logOnError(err, "createJob : prepare insert obj_job") + if err != nil { + return 0, err + } + defer stmt.Close() + + _, err = stmt.Exec(objId, priority, userID64, objJobStatusNew, schedule.UTC(), time.Now().UTC(), payload) + logOnError(err, "createJob : insert obj_job") + if err != nil { + return 0, err + } + + return objId, nil +} + +func setJobDone(jobId int64) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) + logOnError(err, "setJobDone : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(time.Now().UTC(), jobId) + s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func setJobStart(jobId int64) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`) + logOnError(err, "setJobStart : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(time.Now().UTC(), jobId) + s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func rescheduleJob(jobID64 int64, status int32, schedule time.Time) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ?, j.status = ? WHERE j.obj_id = ?;`) + logOnError(err, "rescheduleJob : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(schedule.UTC(), status, jobID64) + s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobID64) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func loadCurrentJobs() ([]Job, error) { + var ( + objId int64 + jobTypeId int32 + userID64 int64 + status int32 + payload []byte + jobs []Job + ) + + t := time.Now().UTC() + r := RndInt64() + + _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize) + logOnError(err, "loadCurrentJobs : update intial rows") + + stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") + logOnError(err, "loadCurrentJobs : prepare select statement") + + rows, err := stmt.Query(r) + // rows, err := stmt.Query(time.Now()) + logOnError(err, "loadCurrentJobs : query select statement") + + for rows.Next() { + err = rows.Scan(&objId, &jobTypeId, &status, &userID64, &payload) + logOnError(err, "loadCurrentJobs : scan query rows") + job := Job{ + ID64: objId, + JobTypeID: jobTypeId, + Status: status, + UserID64: userID64, + Payload: payload, } - }) - - b.Handle("/test", botTest) - b.Handle("/test_html", botTestHTML) - b.Handle("/msg_rescan", botMsgRescan) - b.Handle("/msg_rescan_all", botMsgRescanAll) - b.Handle("/msg_dump", botMsgDump) - - b.Handle("/parse_rules", botListParsingRules) - b.Handle("/parse_rule", botListParsingRule) - - b.Handle("/timer", botTimer) - - b.Handle("/g_stock", botGStock) - - b.Handle("/backup_export", botBackupExport) - b.Handle("/backup_import", botBackupImport) - - b.Handle("/help", botHelp) - - b.Handle(tb.OnPhoto, botPhoto) - b.Handle(tb.OnChannelPost, botChannelPost) - b.Handle(tb.OnQuery, botQuery) - b.Handle(tb.OnText, botText) - b.Handle(tb.OnDocument, botDocument) - - b.Start() -} - -func botPhoto(m *tb.Message) { - fmt.Println("botPhoto :", m.Text) - // photos only -} - -func botDocument(m *tb.Message) { - fmt.Printf("botDocument : %s (%d bytes)\n", m.Document.FileName, m.Document.File.FileSize) - // documents only -} - -func botHello(m *tb.Message) (string, error) { - fmt.Println("botHello :", m.Text) - if !m.Private() { - fmt.Println("botHello : !m.Private()") - return ``, nil + jobs = append(jobs, job) } - // fmt.Println("Hello payload :", m.Payload) // - PrintText(m) - return `hello world`, nil + err = rows.Err() + logOnError(err, "loadCurrentJobs : scan end rows") + rows.Close() + + err = stmt.Close() + logOnError(err, "loadCurrentJobs : close select statement") + + return jobs, nil } -func botChannelPost(m *tb.Message) { - fmt.Println("botChannelPost :", m.Text) - PrintText(m) - // channel posts only -} +func jobRescan(j Job) { + var r JobPayloadRescanMsg -func botQuery(q *tb.Query) { - fmt.Println("botQuery") - // incoming inline queries -} + err := setJobStart(j.ID64) + logOnError(err, "jobRescan : setJobStart") -func botText(m *tb.Message) { - fmt.Println("botText :", m.Text) - PrintText(m) - // all the text messages that weren't - // captured by existing handlers -} + err = json.Unmarshal(j.Payload, &r) + logOnError(err, "jobRescan : Unmarshal payload") -func botHelp(m *tb.Message) { - if !m.Private() { - return - } - c := TGCommand{ - Type: commandReplyMsg, - Text: `/help - this help -/msg_rescan - rescan one message -/msg_rescan_all - rescan all messages -/parse_rules - list parsing rules\n -/parse_rule - detail for one rule -/timer "msg" - schedule msg for client in ETA -/g_stock - check guild's vault -/backup_export - export backup database -/backup_import - import backup database from URL`, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return -} + start := time.Now() -func botTestHTML(m *tb.Message) { - if !m.Private() { - return - } - c := TGCommand{ - Type: commandReplyMsg, - Text: `bold, -bold, -italic, -italic, -inline URL, -inline fixed-width code, -
pre-formatted fixed-width code block
`, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - ParseMode: cmdParseModeHTML, - } - TGCmdQueue <- c + ids := getSQLListID64(r.Query) - return -} - -func botTest(m *tb.Message) { - if !m.Private() { - return - } - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - clientSendCWMsg(m.Chat.ID, "🏅Me") - - c := TGCommand{ - Type: commandReplyMsg, - Text: "Test sent", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + if len(ids) > 1 { + for _, id := range ids { + SQLMsgIdentifyQueue <- id } - TGCmdQueue <- c - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } - return -} - -func botMsgRescan(m *tb.Message) { - if !m.Private() { - return - } - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^[0-9]+$") - if r.MatchString(m.Payload) { - p := JobPayloadRescanMsg{ - Query: fmt.Sprintf("SELECT o.id FROM obj o WHERE o.id = %s AND o.obj_type_id = %d AND o.obj_sub_type_id = %d;", m.Payload, objTypeMessage, objSubTypeMessageUnknown), - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, + p := JobPayloadSetDone{ + JobID64: j.ID64, + MsgID64: r.MsgID64, + ChatID64: r.ChatID64, + Text: fmt.Sprintf("%d messages processed in %s.", len(ids), time.Since(start)), } b, _ := json.Marshal(p) - log.Printf("botMsgRescan : json : %s\n", string(b)) - _, err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanMsg, int64(m.Sender.ID), time.Now(), b) - logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") - if err != nil { - c := TGCommand{ + _, err := createJob(objSubTypeJobSetJobDone, objJobPriorityRescanAllMsg, j.UserID64, time.Now().UTC(), b) + logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)") + + } else if len(ids) == 1 { + SQLMsgIdentifyQueue <- ids[0] + err = setJobDone(j.ID64) + logOnError(err, "jobRescan : setJobDone(1)") + if r.MsgID64 != 0 || r.ChatID64 != 0 { + m := TGCommand{ Type: commandReplyMsg, - Text: fmt.Sprintf("Error scheduling the rescan for msg #%s", m.Payload), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + Text: "One message processed.", + FromMsgID64: r.MsgID64, + FromChatID64: r.ChatID64, } - TGCmdQueue <- c - } else { - c := TGCommand{ + TGCmdQueue <- m + } + } else { + err = setJobDone(j.ID64) + logOnError(err, "jobRescan : setJobDone(0)") + if r.MsgID64 != 0 || r.ChatID64 != 0 { + m := TGCommand{ Type: commandReplyMsg, - Text: fmt.Sprintf("Rescaning msg #%s", m.Payload), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + Text: "No message processed.", + FromMsgID64: r.MsgID64, + FromChatID64: r.ChatID64, } - TGCmdQueue <- c + TGCmdQueue <- m } } - r = regexp.MustCompile("^all$") - if r.MatchString(m.Payload) { - botMsgRescanAll(m) - } return } -func botMsgRescanAll(m *tb.Message) { - if !m.Private() { - return - } +func jobSetDone(j Job) { + var r JobPayloadSetDone - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } + err := setJobStart(j.ID64) + logOnError(err, "jobSetDone : setJobStart") - p := JobPayloadRescanMsg{ - Query: fmt.Sprintf("SELECT o.id FROM obj o WHERE o.obj_type_id = %d AND o.obj_sub_type_id = %d ORDER BY id ASC;", objTypeMessage, objSubTypeMessageUnknown), - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, - } - b, _ := json.Marshal(p) - _, err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanAllMsg, int64(m.Sender.ID), time.Now(), b) - logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") + err = json.Unmarshal(j.Payload, &r) + logOnError(err, "jobSetDone : Unmarshal payload") - if err != nil { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Error scheduling the rescan for all msg.", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Rescaning all msg scheduled.", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } - return -} + err = setJobDone(r.JobID64) + logOnError(err, "jobSetDone : setJobDone(child)") -func botBackupExport(m *tb.Message) { - if !m.Private() { - return - } + err = setJobDone(j.ID64) + logOnError(err, "jobSetDone : setJobDone") - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - p := JobPayloadBackupExport{ - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, - } - b, _ := json.Marshal(p) - t := time.Now() - objID64, err := createJob(objSubTypeJobBackupExport, objJobPriorityBackup, int64(m.Chat.ID), t, b) - logOnError(err, "botBackupExport : createJob") - - return -} - -func botBackupImport(m *tb.Message) { - if !m.Private() { - return - } - - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^(http|https)://[a-z0-9./]+.zip$") // https://dump.siteop.biz/20190609163137.backup.zip - if !r.MatchString(m.Payload) { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Missing URL", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - p := JobPayloadBackupExport{ - URL: m.Payload, - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, - } - b, _ := json.Marshal(p) - t := time.Now() - objID64, err := createJob(objSubTypeJobBackupImport, objJobPriorityBackup, int64(m.Chat.ID), t, b) - logOnError(err, "botBackupImport : createJob") - - return -} - -func botMsgDump(m *tb.Message) { - var res string - - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^[0-9]+$") - if r.MatchString(m.Payload) { - objId, _ := strconv.ParseInt(m.Payload, 10, 64) - objTypeId, err := getObjTypeId(objId) - logOnError(err, "botMsgDump : getObjSubTypeId") - if err != nil { - res = `Error retrieving the message` - } else if objTypeId != objTypeMessage { - res = `This is not a message reference` - } else { - cwm, _ := getObjMsg(objId) - b, _ := json.Marshal(cwm) - res = string(b) - } - } else { - res = `/msg_dump ` - } - - c := TGCommand{ + m := TGCommand{ Type: commandReplyMsg, - Text: res, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + Text: r.Text, + FromMsgID64: r.MsgID64, + FromChatID64: r.ChatID64, } - TGCmdQueue <- c + TGCmdQueue <- m return } -func botListParsingRules(m *tb.Message) { - var s string = "" - if !m.Private() { - return - } +func jobPillage(j Job) { + var r JobPayloadPillage - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + err := setJobStart(j.ID64) + logOnError(err, "jobPillage : setJobStart") + + err = json.Unmarshal(j.Payload, &r) + logOnError(err, "jobPillage : Unmarshal payload") + + // check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job + ids := getSQLListID64(` select ox.id + from obj ox + ,obj_msg omx + ,obj op + ,obj_msg omp + ,obj_job oj + where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` + and omx.user_id = oj.user_id + and omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + ` + and omx.obj_id = ox.id + and ox.obj_sub_type_id in (` + strconv.Itoa(objSubTypeMessagePillageGo) + + `, ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + + `, ` + strconv.Itoa(objSubTypeMessagePillageLoss) + + `, ` + strconv.Itoa(objSubTypeMessagePillageWin) + `) + and op.id = ` + strconv.FormatInt(r.ObjID64, 10) + ` + and omp.obj_id = op.id + and omx.date between omp.date and addtime(omp.date, '0 0:3:30.000000') + order by case ox.obj_sub_type_id when ` + strconv.Itoa(objSubTypeMessagePillageWin) + ` then 0 + when ` + strconv.Itoa(objSubTypeMessagePillageLoss) + ` then 1 + when ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` then 2 + when ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` then 3 + else 4 end asc + limit 1;`) + + if len(ids) > 1 { // issue there ? + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64), + ToUserID64: j.UserID64, } - TGCmdQueue <- c - return - } - - for _, v := range msgParsingRules { - s = fmt.Sprintf("%s[%d] %s\n", s, v.ID, v.Description) - } - - c := TGCommand{ - Type: commandReplyMsg, - Text: s, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return -} - -func botListParsingRule(m *tb.Message) { - if !m.Private() { - return - } - - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^[0-9]+$") - if r.MatchString(m.Payload) { - i, _ := strconv.ParseInt(m.Payload, 10, 64) - for _, v := range msgParsingRules { - if int64(v.ID) == i { - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("%s\n", v.Rule), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + TGCmdQueue <- s + } else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not + m, err := getObjMsg(ids[0]) + logOnError(err, "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)") + if err == nil { + if m.Date.Add(60 * time.Second).After(time.Now().UTC()) { + msgTypeID64, err := getObjSubTypeId(ids[0]) + logOnError(err, "jobPillage : getObjSubTypeId") + if err == nil { + if msgTypeID64 == objSubTypeMessagePillageGo { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } else if msgTypeID64 == objSubTypeMessagePillageWin { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We avoided a pillage (%s))", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } else if msgTypeID64 == objSubTypeMessagePillageLoss { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } else if msgTypeID64 == objSubTypeMessagePillageTimeout { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } else { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } } - TGCmdQueue <- c - return } } - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("Could not find rule %s\n", m.Payload), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("/parse_rule \n"), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c + err = setJobDone(j.ID64) + logOnError(err, "jobSetDone : setJobDone") + return } + + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("No outcome for the pillage yet"), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + + //no outcome yet, have we sent a "/go" in the last 30 sec ? + ids = getSQLListID64(` select ox.id + from obj ox + ,obj_msg omx + ,obj_job oj + where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` + and omx.user_id = oj.user_id + and omx.sender_user_id = oj.user_id + and omx.obj_id = ox.id + and ox.obj_sub_type_id =` + strconv.Itoa(objSubTypeMessageGo) + ` + and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`) + + if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait + m, err := getObjMsg(ids[0]) + logOnError(err, "jobPillage : getMsg(objSubTypeMessageGo)") + if err == nil { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } + err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) + logOnError(err, "jobPillage : rescheduleJob(objSubTypeMessageGo)") + } else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec + clientSendCWMsg(j.UserID64, "/go") + err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) + logOnError(err, "jobPillage : rescheduleJob") + + } + return } -func botGStock(m *tb.Message) { - if !m.Private() { +func jobMsgRefresh(j Job) { + var p JobPayloadMsgRefresh + + // identify whether the message has been properly refreshed ? create new job ? reschedule same job ? + err := setJobStart(j.ID64) + logOnError(err, "jobMsgRefresh : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobMsgRefresh : Unmarshal payload") + + m, err := getObjMsg(p.ObjID64) + + if err != nil && strings.Compare(err.Error(), `sql: no rows in result set`) == 0 { + err = setJobDone(j.ID64) + logOnError(err, "joMsgClient : setJobDone") + return + } else { + logOnError(err, "jobMsgRefresh : getObjMsg") + err = setJobDone(j.ID64) + logOnError(err, "joMsgClient : setJobDone") return } - clientSendCWMsg(m.Chat.ID, "/g_stock_res") - clientSendCWMsg(m.Chat.ID, "/g_stock_alch") - clientSendCWMsg(m.Chat.ID, "/g_stock_misc") - clientSendCWMsg(m.Chat.ID, "/g_stock_rec") - clientSendCWMsg(m.Chat.ID, "/g_stock_parts") - clientSendCWMsg(m.Chat.ID, "/g_stock_other") - c := TGCommand{ + err = delObj(p.ObjID64) + logOnError(err, "jobMsgRefresh : delObj") + + clientRefreshCWMsg(m.UserID64, m.ChatID64, m.ID64) + + err = setJobDone(j.ID64) + logOnError(err, "joMsgClient : setJobDone") + return +} + +func jobMsgClient(j Job) { + var p JobPayloadMsgClient + err := setJobStart(j.ID64) + logOnError(err, "jobMsgClient : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobMsgClient : Unmarshal payload") + + if err == nil { + clientSendCWMsg(j.UserID64, p.Text) + m := TGCommand{ + Type: commandReplyMsg, + Text: "Message sent.", + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m + } + + err = setJobDone(j.ID64) + logOnError(err, "joMsgClient : setJobDone") + return +} + +func jobBackupExport(j Job) { + var p JobPayloadBackupExport + err := setJobStart(j.ID64) + logOnError(err, "jobBackupExport : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobBackupExport : Unmarshal payload") + + bkp := DataBackup{} + start := time.Now() + milestone := time.Now() + + s := new([]ChatWarsMessage) + msgs := *s + ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`) + + txt := fmt.Sprintf("Backing up %d messages.", len(ids)) + m := TGCommand{ Type: commandReplyMsg, - Text: "Stock requested", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + Text: txt, + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, } - TGCmdQueue <- c - return -} + TGCmdQueue <- m -func botTimer(m *tb.Message) { - if !m.Private() { - return - } - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + i := 0 + for _, id := range ids { + m, err := getObjMsg(id) + logOnError(err, "jobBackupExport : getMsg") + if err == nil { + msgs = append(msgs, *m) } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^(?P([0-9]*(s|m|h))+) \"(?P(.*))\"$") - if r.MatchString(m.Payload) { - d, err := time.ParseDuration(r.ReplaceAllString(m.Payload, "${Duration}")) - if err != nil { - c := TGCommand{ + i = i + 1 + if time.Now().Add(1 * time.Minute).After(milestone) { + txt := fmt.Sprintf("Exported %d/%d messages.", i, len(ids)) + m = TGCommand{ Type: commandReplyMsg, - Text: fmt.Sprintf("%s", err), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } else { - p := JobPayloadMsgClient{ - Text: r.ReplaceAllString(m.Payload, "${Msg}"), - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, - } - b, _ := json.Marshal(p) - t := time.Now().Add(d) - objID64, err := createJob(objSubTypeJobMsgClient, objJobPriority, int64(m.Chat.ID), t, b) - logOnError(err, "botTimer : createJob") - if err != nil { - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("%s", err), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("Job #%d scheduled at %s", objID64, t.Format(time.RFC850)), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c + Text: txt, + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, } + TGCmdQueue <- m + milestone = time.Now() } - } + bkp.Messages = msgs + b, err := json.Marshal(bkp) + logOnError(err, "jobBackupExport : Marshal") + + m = TGCommand{ + Type: commandReplyMsg, + Text: `Compressing archive`, + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m + + zbuf := new(bytes.Buffer) + zw := zip.NewWriter(zbuf) + zf, err := zw.Create(`backup.json`) + logOnError(err, "jobBackupExport : Create") + + _, err = zf.Write(b) + logOnError(err, "jobBackupExport : Write") + + err = zw.Close() + logOnError(err, "jobBackupExport : Close") + + d := tb.Document{} + d.File = tb.FromReader(bytes.NewReader(zbuf.Bytes())) + d.FileName = fmt.Sprintf("%s.backup.zip", start.Format("20060102150405")) + d.Caption = d.FileName + d.MIME = `application/zip` + + m = TGCommand{ + Type: commandReplyMsg, + Text: `Export done.`, + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m + + m = TGCommand{ + Type: commandSendDocument, + Document: d, + ToChatID64: p.ChatID64, + } + TGCmdQueue <- m + + err = setJobDone(j.ID64) + logOnError(err, "jobBackupExport : setJobDone") + return +} + +func jobBackupImport(j Job) { + var p JobPayloadBackupImport + err := setJobStart(j.ID64) + logOnError(err, "jobBackupImport : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobBackupImport : Unmarshal payload") + + if err == nil { + m := TGCommand{ + Type: commandReplyMsg, + Text: "Backup imported.", + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m + } + + err = setJobDone(j.ID64) + logOnError(err, "jobBackupImport : setJobDone") return } From 7d4ae4b6685f2db9030a332ceedbfebe632dc314 Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 23:38:12 +0800 Subject: [PATCH 03/11] test --- job.go | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 8 deletions(-) diff --git a/job.go b/job.go index 527fc95..9532007 100644 --- a/job.go +++ b/job.go @@ -520,15 +520,70 @@ func jobBackupImport(j Job) { err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobBackupImport : Unmarshal payload") - if err == nil { - m := TGCommand{ - Type: commandReplyMsg, - Text: "Backup imported.", - FromMsgID64: p.MsgID64, - FromChatID64: p.ChatID64, - } - TGCmdQueue <- m + resp, err := http.Get(p.URL) + logOnError(err, "jobBackupImport : Get") + defer resp.Body.Close() + + buf := new(bytes.Buffer) + buf.ReadFrom(resp.Body) + + m := TGCommand{ + Type: commandReplyMsg, + Text: "File downloaded.", + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, } + TGCmdQueue <- m + + z := buf.Bytes() + r := bytes.NewReader(z) + zr, err := zip.NewReader(r, int64(len(z))) + + for _, f := range zr.File { + if strings.Compare(f.Name, "backup.json") == 0 { + rc, err := f.Open() + logOnError(err, "jobBackupImport : Open") + if err != nil { + return err + } + data, err := ioutil.ReadAll(rc) + logOnError(err, "jobBackupImport : ReadAll") + if err != nil { + return err + } + log.Printf("jobBackupImport : %d uncompressed bytes.\n", len(data)) + rc.Close() + bkp := DataBackup{} + err = json.Unmarshal(data, &bkp) + logOnError(err, "jobBackupImport : Unmarshal") + if err != nil { + return err + } + for _, m := range bkp.Messages { + _, err = addObjMsg(m.ID64, m.ChatID64, m.UserID64, m.SenderUserID64, m.Date, m.Text) + } + + m := TGCommand{ + Type: commandReplyMsg, + Text: "Backup restored.", + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m + + err = setJobDone(j.ID64) + logOnError(err, "jobBackupImport : setJobDone") + return + } + } + + m := TGCommand{ + Type: commandReplyMsg, + Text: "Not backup file found in archive.", + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m err = setJobDone(j.ID64) logOnError(err, "jobBackupImport : setJobDone") From 2a53fa5d22e4ebbb25638e11468652359c868ab8 Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 23:39:04 +0800 Subject: [PATCH 04/11] test --- bot.go | 109 +++++++++++++++++++++++++-------------------------------- job.go | 3 ++ msg.go | 4 --- 3 files changed, 50 insertions(+), 66 deletions(-) diff --git a/bot.go b/bot.go index 1ef1249..3ffb076 100644 --- a/bot.go +++ b/bot.go @@ -162,6 +162,16 @@ func botMsgRescan(m *tb.Message) { if !m.Private() { return } + if _, ok := clientsKeepAlive[m.Chat.ID]; !ok && m.Chat.ID != cfg.Bot.Admin { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } r := regexp.MustCompile("^[0-9]+$") if r.MatchString(m.Payload) { p := JobPayloadRescanMsg{ @@ -202,6 +212,16 @@ func botMsgRescanAll(m *tb.Message) { if !m.Private() { return } + if _, ok := clientsKeepAlive[m.Chat.ID]; !ok && m.Chat.ID != cfg.Bot.Admin { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + return + } p := JobPayloadRescanMsg{ Query: fmt.Sprintf("SELECT o.id FROM obj o WHERE o.obj_type_id = %d AND o.obj_sub_type_id = %d ORDER BY id ASC;", objTypeMessage, objSubTypeMessageUnknown), MsgID64: int64(m.ID), @@ -231,103 +251,68 @@ func botMsgRescanAll(m *tb.Message) { return } -func botMsgExport(m *tb.Message) { +func botBackupExport(m *tb.Message) { if !m.Private() { return } - - c := TGCommand{ - Type: commandReplyMsg, - Text: `Starting messages export`, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - - b, err := zipMessages() - log.Printf("botMsgExportAll : Data returned.\n") - logOnError(err, "botMsgExportAll : exportMessages") - if err != nil { + if _, ok := clientsKeepAlive[m.Chat.ID]; !ok && m.Chat.ID != cfg.Bot.Admin { c := TGCommand{ Type: commandReplyMsg, - Text: `Error exporting messages`, + Text: "Client not registered", FromMsgID64: int64(m.ID), FromChatID64: m.Chat.ID, } TGCmdQueue <- c - return - } else { - text := fmt.Sprintf("Document size : %d bytes.", len(b)) - c := TGCommand{ - Type: commandReplyMsg, - Text: text, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c } - d := tb.Document{} - d.File = tb.FromReader(bytes.NewReader(b)) - d.FileName = fmt.Sprintf("%s.backup.zip", time.Now().Format("20060102150405")) - d.Caption = d.FileName - d.MIME = `application/zip` - - c = TGCommand{ - Type: commandSendDocument, - Document: d, - ToChatID64: m.Chat.ID, + p := JobPayloadBackupExport{ + MsgID64: int64(m.ID), + ChatID64: m.Chat.ID, } - TGCmdQueue <- c + b, _ := json.Marshal(p) + _, err := createJob(objSubTypeJobBackupExport, objJobPriorityBackup, int64(m.Sender.ID), time.Now(), b) + logOnError(err, "botBackupExport : createJob(objSubTypeJobBackupExport)") return } -func botMsgLoad(m *tb.Message) { +func botBackupImport(m *tb.Message) { if !m.Private() { return } - r := regexp.MustCompile("^(http|https)://[a-z0-9./]+.zip$") // https://dump.siteop.biz/20190609163137.backup.zip - if r.MatchString(m.Payload) { - - resp, err := http.Get(m.Payload) - logOnError(err, "botMsgLoad : Get") - defer resp.Body.Close() - - buf := new(bytes.Buffer) - buf.ReadFrom(resp.Body) - + if _, ok := clientsKeepAlive[m.Chat.ID]; !ok && m.Chat.ID != cfg.Bot.Admin { c := TGCommand{ Type: commandReplyMsg, - Text: "File downloaded.", + Text: "Client not registered", FromMsgID64: int64(m.ID), FromChatID64: m.Chat.ID, } TGCmdQueue <- c - - err = UnzipMessages(buf.Bytes()) - logOnError(err, "botMsgLoad : UnzipMessages") - - c = TGCommand{ - Type: commandReplyMsg, - Text: "Messages injected.", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - - } else { + return + } + r := regexp.MustCompile(`^((http[s]?\:)\/\/)?([^\?\:\/#]+)(\:([0-9]+))?(\/[^\?\#]*)?(\?([^#]*))?(#.*)?.zip$`) + if !r.MatchString(m.Payload) { c := TGCommand{ Type: commandReplyMsg, - Text: "No file", + Text: "URL not valid.", FromMsgID64: int64(m.ID), FromChatID64: m.Chat.ID, } TGCmdQueue <- c + return } + p := JobPayloadBackupExport{ + URL: m.Payload, + MsgID64: int64(m.ID), + ChatID64: m.Chat.ID, + } + b, _ := json.Marshal(p) + _, err := createJob(objSubTypeJobBackupImport, objJobPriorityBackup, int64(m.Sender.ID), time.Now(), b) + logOnError(err, "botBackupImport : createJob(objSubTypeJobBackupImport)") return + } func botMsgDump(m *tb.Message) { diff --git a/job.go b/job.go index 9532007..beb8b80 100644 --- a/job.go +++ b/job.go @@ -1,8 +1,11 @@ package main import ( + "archive/zip" + "bytes" "encoding/json" "fmt" + "io/ioutil" "strconv" "strings" "time" diff --git a/msg.go b/msg.go index 8a8b1c7..4d49ece 100644 --- a/msg.go +++ b/msg.go @@ -1,11 +1,7 @@ package main import ( - "archive/zip" - "bytes" - "encoding/json" "fmt" - "io/ioutil" "log" "regexp" "strconv" From a91b1bba168ad10f440ca609dd53e6e677079023 Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 23:40:24 +0800 Subject: [PATCH 05/11] test --- bot.go | 2 -- job.go | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/bot.go b/bot.go index 3ffb076..788c1ed 100644 --- a/bot.go +++ b/bot.go @@ -1,11 +1,9 @@ package main import ( - "bytes" "encoding/json" "fmt" "log" - "net/http" "regexp" "strconv" "time" diff --git a/job.go b/job.go index beb8b80..c714380 100644 --- a/job.go +++ b/job.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" "strconv" "strings" "time" From 82800b4a0a729b3a61eea82fbdda0a6b72c2b95a Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 23:41:31 +0800 Subject: [PATCH 06/11] test --- job.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/job.go b/job.go index c714380..b67d28c 100644 --- a/job.go +++ b/job.go @@ -448,10 +448,10 @@ func jobBackupExport(j Job) { i := 0 for _, id := range ids { - m, err := getObjMsg(id) + msg, err := getObjMsg(id) logOnError(err, "jobBackupExport : getMsg") if err == nil { - msgs = append(msgs, *m) + msgs = append(msgs, *msg) } i = i + 1 if time.Now().Add(1 * time.Minute).After(milestone) { From 6da33a53e4d1ebd7dbef15ffa4f98fbc9ddf57be Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 23:53:29 +0800 Subject: [PATCH 07/11] test --- job.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/job.go b/job.go index b67d28c..4756c78 100644 --- a/job.go +++ b/job.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "log" "net/http" "strconv" "strings" @@ -548,12 +549,12 @@ func jobBackupImport(j Job) { rc, err := f.Open() logOnError(err, "jobBackupImport : Open") if err != nil { - return err + return } data, err := ioutil.ReadAll(rc) logOnError(err, "jobBackupImport : ReadAll") if err != nil { - return err + return } log.Printf("jobBackupImport : %d uncompressed bytes.\n", len(data)) rc.Close() @@ -561,7 +562,7 @@ func jobBackupImport(j Job) { err = json.Unmarshal(data, &bkp) logOnError(err, "jobBackupImport : Unmarshal") if err != nil { - return err + return } for _, m := range bkp.Messages { _, err = addObjMsg(m.ID64, m.ChatID64, m.UserID64, m.SenderUserID64, m.Date, m.Text) From fc63ef8fbcac6d49a8e3cf7b7f1478a2b109efe4 Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 23:55:31 +0800 Subject: [PATCH 08/11] test --- bot.go | 2 +- job.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/bot.go b/bot.go index 788c1ed..2cb11e8 100644 --- a/bot.go +++ b/bot.go @@ -301,7 +301,7 @@ func botBackupImport(m *tb.Message) { return } - p := JobPayloadBackupExport{ + p := JobPayloadBackupImport{ URL: m.Payload, MsgID64: int64(m.ID), ChatID64: m.Chat.ID, diff --git a/job.go b/job.go index 4756c78..2aab1f2 100644 --- a/job.go +++ b/job.go @@ -11,6 +11,8 @@ import ( "strconv" "strings" "time" + + tb "gopkg.in/tucnak/telebot.v2" ) func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Time, payload []byte) (int64, error) { @@ -582,7 +584,7 @@ func jobBackupImport(j Job) { } } - m := TGCommand{ + m = TGCommand{ Type: commandReplyMsg, Text: "Not backup file found in archive.", FromMsgID64: p.MsgID64, From c2d5a3d0597f4b4a14a35bef764d851f6be8db18 Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 23:58:19 +0800 Subject: [PATCH 09/11] test --- job.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/job.go b/job.go index 2aab1f2..bb6467a 100644 --- a/job.go +++ b/job.go @@ -566,8 +566,8 @@ func jobBackupImport(j Job) { if err != nil { return } - for _, m := range bkp.Messages { - _, err = addObjMsg(m.ID64, m.ChatID64, m.UserID64, m.SenderUserID64, m.Date, m.Text) + for _, msg := range bkp.Messages { + MQCWMsgQueue <- msg } m := TGCommand{ From 06f8be55b425181ae28b39ff9f7cc919006744fb Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 11 Jun 2019 23:59:09 +0800 Subject: [PATCH 10/11] test --- bot.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bot.go b/bot.go index 2cb11e8..87c320b 100644 --- a/bot.go +++ b/bot.go @@ -99,8 +99,8 @@ func botHelp(m *tb.Message) { /parse_rule - detail for one rule /timer "msg" - schedule msg for client in ETA /g_stock - check guild's vault -/msg_export - export message database -/msg_load - import message database from URL`, +/backup_export - export message database +/backup_import - import message database from URL`, FromMsgID64: int64(m.ID), FromChatID64: m.Chat.ID, } From de06d9c590485c46c217907fc11ae871803b5202 Mon Sep 17 00:00:00 2001 From: shoopea Date: Wed, 12 Jun 2019 00:00:20 +0800 Subject: [PATCH 11/11] test --- job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job.go b/job.go index bb6467a..f72123b 100644 --- a/job.go +++ b/job.go @@ -457,7 +457,7 @@ func jobBackupExport(j Job) { msgs = append(msgs, *msg) } i = i + 1 - if time.Now().Add(1 * time.Minute).After(milestone) { + if time.Now().After(milestone.Add(1 * time.Minute)) { txt := fmt.Sprintf("Exported %d/%d messages.", i, len(ids)) m = TGCommand{ Type: commandReplyMsg,