diff --git a/job.go b/job.go index 58050ae..527fc95 100644 --- a/job.go +++ b/job.go @@ -1,537 +1,536 @@ package main import ( - "bytes" "encoding/json" "fmt" - "log" - "net/http" - "regexp" "strconv" + "strings" "time" - - tb "gopkg.in/tucnak/telebot.v2" ) -func BotHandlers(b *tb.Bot) { - b.Handle("/hello", func(m *tb.Message) { - s, err := botHello(m) - logOnError(err, "/hello") - if err == nil { - b.Send(m.Sender, s) +func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Time, payload []byte) (int64, error) { + stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id) + VALUES (? , ?);`) + logOnError(err, "createJob : prepare insert obj") + if err != nil { + return 0, err + } + defer stmt.Close() + + res, err := stmt.Exec(objTypeJob, jobTypeID) + s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID) + logOnError(err, s) + if err != nil { + return 0, err + } + + objId, err := res.LastInsertId() + logOnError(err, "createJob : get last insert Id") + if err != nil { + return 0, err + } + + stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, seq_nr, schedule, is_done, in_work, inserted, pulled, started, ended, payload) + VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) + logOnError(err, "createJob : prepare insert obj_job") + if err != nil { + return 0, err + } + defer stmt.Close() + + _, err = stmt.Exec(objId, priority, userID64, objJobStatusNew, schedule.UTC(), time.Now().UTC(), payload) + logOnError(err, "createJob : insert obj_job") + if err != nil { + return 0, err + } + + return objId, nil +} + +func setJobDone(jobId int64) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) + logOnError(err, "setJobDone : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(time.Now().UTC(), jobId) + s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func setJobStart(jobId int64) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`) + logOnError(err, "setJobStart : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(time.Now().UTC(), jobId) + s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func rescheduleJob(jobID64 int64, status int32, schedule time.Time) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ?, j.status = ? WHERE j.obj_id = ?;`) + logOnError(err, "rescheduleJob : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(schedule.UTC(), status, jobID64) + s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobID64) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func loadCurrentJobs() ([]Job, error) { + var ( + objId int64 + jobTypeId int32 + userID64 int64 + status int32 + payload []byte + jobs []Job + ) + + t := time.Now().UTC() + r := RndInt64() + + _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize) + logOnError(err, "loadCurrentJobs : update intial rows") + + stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") + logOnError(err, "loadCurrentJobs : prepare select statement") + + rows, err := stmt.Query(r) + // rows, err := stmt.Query(time.Now()) + logOnError(err, "loadCurrentJobs : query select statement") + + for rows.Next() { + err = rows.Scan(&objId, &jobTypeId, &status, &userID64, &payload) + logOnError(err, "loadCurrentJobs : scan query rows") + job := Job{ + ID64: objId, + JobTypeID: jobTypeId, + Status: status, + UserID64: userID64, + Payload: payload, } - }) - - b.Handle("/test", botTest) - b.Handle("/test_html", botTestHTML) - b.Handle("/msg_rescan", botMsgRescan) - b.Handle("/msg_rescan_all", botMsgRescanAll) - b.Handle("/msg_dump", botMsgDump) - - b.Handle("/parse_rules", botListParsingRules) - b.Handle("/parse_rule", botListParsingRule) - - b.Handle("/timer", botTimer) - - b.Handle("/g_stock", botGStock) - - b.Handle("/backup_export", botBackupExport) - b.Handle("/backup_import", botBackupImport) - - b.Handle("/help", botHelp) - - b.Handle(tb.OnPhoto, botPhoto) - b.Handle(tb.OnChannelPost, botChannelPost) - b.Handle(tb.OnQuery, botQuery) - b.Handle(tb.OnText, botText) - b.Handle(tb.OnDocument, botDocument) - - b.Start() -} - -func botPhoto(m *tb.Message) { - fmt.Println("botPhoto :", m.Text) - // photos only -} - -func botDocument(m *tb.Message) { - fmt.Printf("botDocument : %s (%d bytes)\n", m.Document.FileName, m.Document.File.FileSize) - // documents only -} - -func botHello(m *tb.Message) (string, error) { - fmt.Println("botHello :", m.Text) - if !m.Private() { - fmt.Println("botHello : !m.Private()") - return ``, nil + jobs = append(jobs, job) } - // fmt.Println("Hello payload :", m.Payload) // - PrintText(m) - return `hello world`, nil + err = rows.Err() + logOnError(err, "loadCurrentJobs : scan end rows") + rows.Close() + + err = stmt.Close() + logOnError(err, "loadCurrentJobs : close select statement") + + return jobs, nil } -func botChannelPost(m *tb.Message) { - fmt.Println("botChannelPost :", m.Text) - PrintText(m) - // channel posts only -} +func jobRescan(j Job) { + var r JobPayloadRescanMsg -func botQuery(q *tb.Query) { - fmt.Println("botQuery") - // incoming inline queries -} + err := setJobStart(j.ID64) + logOnError(err, "jobRescan : setJobStart") -func botText(m *tb.Message) { - fmt.Println("botText :", m.Text) - PrintText(m) - // all the text messages that weren't - // captured by existing handlers -} + err = json.Unmarshal(j.Payload, &r) + logOnError(err, "jobRescan : Unmarshal payload") -func botHelp(m *tb.Message) { - if !m.Private() { - return - } - c := TGCommand{ - Type: commandReplyMsg, - Text: `/help - this help -/msg_rescan - rescan one message -/msg_rescan_all - rescan all messages -/parse_rules - list parsing rules\n -/parse_rule - detail for one rule -/timer "msg" - schedule msg for client in ETA -/g_stock - check guild's vault -/backup_export - export backup database -/backup_import - import backup database from URL`, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return -} + start := time.Now() -func botTestHTML(m *tb.Message) { - if !m.Private() { - return - } - c := TGCommand{ - Type: commandReplyMsg, - Text: `bold, -bold, -italic, -italic, -inline URL, -inline fixed-width code, -
pre-formatted fixed-width code block
`, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - ParseMode: cmdParseModeHTML, - } - TGCmdQueue <- c + ids := getSQLListID64(r.Query) - return -} - -func botTest(m *tb.Message) { - if !m.Private() { - return - } - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - clientSendCWMsg(m.Chat.ID, "🏅Me") - - c := TGCommand{ - Type: commandReplyMsg, - Text: "Test sent", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + if len(ids) > 1 { + for _, id := range ids { + SQLMsgIdentifyQueue <- id } - TGCmdQueue <- c - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } - return -} - -func botMsgRescan(m *tb.Message) { - if !m.Private() { - return - } - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^[0-9]+$") - if r.MatchString(m.Payload) { - p := JobPayloadRescanMsg{ - Query: fmt.Sprintf("SELECT o.id FROM obj o WHERE o.id = %s AND o.obj_type_id = %d AND o.obj_sub_type_id = %d;", m.Payload, objTypeMessage, objSubTypeMessageUnknown), - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, + p := JobPayloadSetDone{ + JobID64: j.ID64, + MsgID64: r.MsgID64, + ChatID64: r.ChatID64, + Text: fmt.Sprintf("%d messages processed in %s.", len(ids), time.Since(start)), } b, _ := json.Marshal(p) - log.Printf("botMsgRescan : json : %s\n", string(b)) - _, err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanMsg, int64(m.Sender.ID), time.Now(), b) - logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") - if err != nil { - c := TGCommand{ + _, err := createJob(objSubTypeJobSetJobDone, objJobPriorityRescanAllMsg, j.UserID64, time.Now().UTC(), b) + logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)") + + } else if len(ids) == 1 { + SQLMsgIdentifyQueue <- ids[0] + err = setJobDone(j.ID64) + logOnError(err, "jobRescan : setJobDone(1)") + if r.MsgID64 != 0 || r.ChatID64 != 0 { + m := TGCommand{ Type: commandReplyMsg, - Text: fmt.Sprintf("Error scheduling the rescan for msg #%s", m.Payload), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + Text: "One message processed.", + FromMsgID64: r.MsgID64, + FromChatID64: r.ChatID64, } - TGCmdQueue <- c - } else { - c := TGCommand{ + TGCmdQueue <- m + } + } else { + err = setJobDone(j.ID64) + logOnError(err, "jobRescan : setJobDone(0)") + if r.MsgID64 != 0 || r.ChatID64 != 0 { + m := TGCommand{ Type: commandReplyMsg, - Text: fmt.Sprintf("Rescaning msg #%s", m.Payload), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + Text: "No message processed.", + FromMsgID64: r.MsgID64, + FromChatID64: r.ChatID64, } - TGCmdQueue <- c + TGCmdQueue <- m } } - r = regexp.MustCompile("^all$") - if r.MatchString(m.Payload) { - botMsgRescanAll(m) - } return } -func botMsgRescanAll(m *tb.Message) { - if !m.Private() { - return - } +func jobSetDone(j Job) { + var r JobPayloadSetDone - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } + err := setJobStart(j.ID64) + logOnError(err, "jobSetDone : setJobStart") - p := JobPayloadRescanMsg{ - Query: fmt.Sprintf("SELECT o.id FROM obj o WHERE o.obj_type_id = %d AND o.obj_sub_type_id = %d ORDER BY id ASC;", objTypeMessage, objSubTypeMessageUnknown), - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, - } - b, _ := json.Marshal(p) - _, err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanAllMsg, int64(m.Sender.ID), time.Now(), b) - logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") + err = json.Unmarshal(j.Payload, &r) + logOnError(err, "jobSetDone : Unmarshal payload") - if err != nil { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Error scheduling the rescan for all msg.", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Rescaning all msg scheduled.", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } - return -} + err = setJobDone(r.JobID64) + logOnError(err, "jobSetDone : setJobDone(child)") -func botBackupExport(m *tb.Message) { - if !m.Private() { - return - } + err = setJobDone(j.ID64) + logOnError(err, "jobSetDone : setJobDone") - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - p := JobPayloadBackupExport{ - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, - } - b, _ := json.Marshal(p) - t := time.Now() - objID64, err := createJob(objSubTypeJobBackupExport, objJobPriorityBackup, int64(m.Chat.ID), t, b) - logOnError(err, "botBackupExport : createJob") - - return -} - -func botBackupImport(m *tb.Message) { - if !m.Private() { - return - } - - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^(http|https)://[a-z0-9./]+.zip$") // https://dump.siteop.biz/20190609163137.backup.zip - if !r.MatchString(m.Payload) { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Missing URL", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - p := JobPayloadBackupExport{ - URL: m.Payload, - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, - } - b, _ := json.Marshal(p) - t := time.Now() - objID64, err := createJob(objSubTypeJobBackupImport, objJobPriorityBackup, int64(m.Chat.ID), t, b) - logOnError(err, "botBackupImport : createJob") - - return -} - -func botMsgDump(m *tb.Message) { - var res string - - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^[0-9]+$") - if r.MatchString(m.Payload) { - objId, _ := strconv.ParseInt(m.Payload, 10, 64) - objTypeId, err := getObjTypeId(objId) - logOnError(err, "botMsgDump : getObjSubTypeId") - if err != nil { - res = `Error retrieving the message` - } else if objTypeId != objTypeMessage { - res = `This is not a message reference` - } else { - cwm, _ := getObjMsg(objId) - b, _ := json.Marshal(cwm) - res = string(b) - } - } else { - res = `/msg_dump ` - } - - c := TGCommand{ + m := TGCommand{ Type: commandReplyMsg, - Text: res, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + Text: r.Text, + FromMsgID64: r.MsgID64, + FromChatID64: r.ChatID64, } - TGCmdQueue <- c + TGCmdQueue <- m return } -func botListParsingRules(m *tb.Message) { - var s string = "" - if !m.Private() { - return - } +func jobPillage(j Job) { + var r JobPayloadPillage - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + err := setJobStart(j.ID64) + logOnError(err, "jobPillage : setJobStart") + + err = json.Unmarshal(j.Payload, &r) + logOnError(err, "jobPillage : Unmarshal payload") + + // check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job + ids := getSQLListID64(` select ox.id + from obj ox + ,obj_msg omx + ,obj op + ,obj_msg omp + ,obj_job oj + where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` + and omx.user_id = oj.user_id + and omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + ` + and omx.obj_id = ox.id + and ox.obj_sub_type_id in (` + strconv.Itoa(objSubTypeMessagePillageGo) + + `, ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + + `, ` + strconv.Itoa(objSubTypeMessagePillageLoss) + + `, ` + strconv.Itoa(objSubTypeMessagePillageWin) + `) + and op.id = ` + strconv.FormatInt(r.ObjID64, 10) + ` + and omp.obj_id = op.id + and omx.date between omp.date and addtime(omp.date, '0 0:3:30.000000') + order by case ox.obj_sub_type_id when ` + strconv.Itoa(objSubTypeMessagePillageWin) + ` then 0 + when ` + strconv.Itoa(objSubTypeMessagePillageLoss) + ` then 1 + when ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` then 2 + when ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` then 3 + else 4 end asc + limit 1;`) + + if len(ids) > 1 { // issue there ? + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64), + ToUserID64: j.UserID64, } - TGCmdQueue <- c - return - } - - for _, v := range msgParsingRules { - s = fmt.Sprintf("%s[%d] %s\n", s, v.ID, v.Description) - } - - c := TGCommand{ - Type: commandReplyMsg, - Text: s, - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return -} - -func botListParsingRule(m *tb.Message) { - if !m.Private() { - return - } - - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^[0-9]+$") - if r.MatchString(m.Payload) { - i, _ := strconv.ParseInt(m.Payload, 10, 64) - for _, v := range msgParsingRules { - if int64(v.ID) == i { - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("%s\n", v.Rule), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + TGCmdQueue <- s + } else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not + m, err := getObjMsg(ids[0]) + logOnError(err, "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)") + if err == nil { + if m.Date.Add(60 * time.Second).After(time.Now().UTC()) { + msgTypeID64, err := getObjSubTypeId(ids[0]) + logOnError(err, "jobPillage : getObjSubTypeId") + if err == nil { + if msgTypeID64 == objSubTypeMessagePillageGo { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } else if msgTypeID64 == objSubTypeMessagePillageWin { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We avoided a pillage (%s))", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } else if msgTypeID64 == objSubTypeMessagePillageLoss { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } else if msgTypeID64 == objSubTypeMessagePillageTimeout { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } else { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } } - TGCmdQueue <- c - return } } - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("Could not find rule %s\n", m.Payload), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("/parse_rule \n"), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c + err = setJobDone(j.ID64) + logOnError(err, "jobSetDone : setJobDone") + return } + + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("No outcome for the pillage yet"), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + + //no outcome yet, have we sent a "/go" in the last 30 sec ? + ids = getSQLListID64(` select ox.id + from obj ox + ,obj_msg omx + ,obj_job oj + where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` + and omx.user_id = oj.user_id + and omx.sender_user_id = oj.user_id + and omx.obj_id = ox.id + and ox.obj_sub_type_id =` + strconv.Itoa(objSubTypeMessageGo) + ` + and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`) + + if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait + m, err := getObjMsg(ids[0]) + logOnError(err, "jobPillage : getMsg(objSubTypeMessageGo)") + if err == nil { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + } + err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) + logOnError(err, "jobPillage : rescheduleJob(objSubTypeMessageGo)") + } else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec + clientSendCWMsg(j.UserID64, "/go") + err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) + logOnError(err, "jobPillage : rescheduleJob") + + } + return } -func botGStock(m *tb.Message) { - if !m.Private() { +func jobMsgRefresh(j Job) { + var p JobPayloadMsgRefresh + + // identify whether the message has been properly refreshed ? create new job ? reschedule same job ? + err := setJobStart(j.ID64) + logOnError(err, "jobMsgRefresh : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobMsgRefresh : Unmarshal payload") + + m, err := getObjMsg(p.ObjID64) + + if err != nil && strings.Compare(err.Error(), `sql: no rows in result set`) == 0 { + err = setJobDone(j.ID64) + logOnError(err, "joMsgClient : setJobDone") + return + } else { + logOnError(err, "jobMsgRefresh : getObjMsg") + err = setJobDone(j.ID64) + logOnError(err, "joMsgClient : setJobDone") return } - clientSendCWMsg(m.Chat.ID, "/g_stock_res") - clientSendCWMsg(m.Chat.ID, "/g_stock_alch") - clientSendCWMsg(m.Chat.ID, "/g_stock_misc") - clientSendCWMsg(m.Chat.ID, "/g_stock_rec") - clientSendCWMsg(m.Chat.ID, "/g_stock_parts") - clientSendCWMsg(m.Chat.ID, "/g_stock_other") - c := TGCommand{ + err = delObj(p.ObjID64) + logOnError(err, "jobMsgRefresh : delObj") + + clientRefreshCWMsg(m.UserID64, m.ChatID64, m.ID64) + + err = setJobDone(j.ID64) + logOnError(err, "joMsgClient : setJobDone") + return +} + +func jobMsgClient(j Job) { + var p JobPayloadMsgClient + err := setJobStart(j.ID64) + logOnError(err, "jobMsgClient : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobMsgClient : Unmarshal payload") + + if err == nil { + clientSendCWMsg(j.UserID64, p.Text) + m := TGCommand{ + Type: commandReplyMsg, + Text: "Message sent.", + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m + } + + err = setJobDone(j.ID64) + logOnError(err, "joMsgClient : setJobDone") + return +} + +func jobBackupExport(j Job) { + var p JobPayloadBackupExport + err := setJobStart(j.ID64) + logOnError(err, "jobBackupExport : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobBackupExport : Unmarshal payload") + + bkp := DataBackup{} + start := time.Now() + milestone := time.Now() + + s := new([]ChatWarsMessage) + msgs := *s + ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`) + + txt := fmt.Sprintf("Backing up %d messages.", len(ids)) + m := TGCommand{ Type: commandReplyMsg, - Text: "Stock requested", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + Text: txt, + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, } - TGCmdQueue <- c - return -} + TGCmdQueue <- m -func botTimer(m *tb.Message) { - if !m.Private() { - return - } - if _, ok := clientsKeepAlive[m.Chat.ID]; ok { - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: "Client not registered", - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, + i := 0 + for _, id := range ids { + m, err := getObjMsg(id) + logOnError(err, "jobBackupExport : getMsg") + if err == nil { + msgs = append(msgs, *m) } - TGCmdQueue <- c - return - } - - r := regexp.MustCompile("^(?P([0-9]*(s|m|h))+) \"(?P(.*))\"$") - if r.MatchString(m.Payload) { - d, err := time.ParseDuration(r.ReplaceAllString(m.Payload, "${Duration}")) - if err != nil { - c := TGCommand{ + i = i + 1 + if time.Now().Add(1 * time.Minute).After(milestone) { + txt := fmt.Sprintf("Exported %d/%d messages.", i, len(ids)) + m = TGCommand{ Type: commandReplyMsg, - Text: fmt.Sprintf("%s", err), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } else { - p := JobPayloadMsgClient{ - Text: r.ReplaceAllString(m.Payload, "${Msg}"), - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, - } - b, _ := json.Marshal(p) - t := time.Now().Add(d) - objID64, err := createJob(objSubTypeJobMsgClient, objJobPriority, int64(m.Chat.ID), t, b) - logOnError(err, "botTimer : createJob") - if err != nil { - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("%s", err), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c - } else { - c := TGCommand{ - Type: commandReplyMsg, - Text: fmt.Sprintf("Job #%d scheduled at %s", objID64, t.Format(time.RFC850)), - FromMsgID64: int64(m.ID), - FromChatID64: m.Chat.ID, - } - TGCmdQueue <- c + Text: txt, + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, } + TGCmdQueue <- m + milestone = time.Now() } - } + bkp.Messages = msgs + b, err := json.Marshal(bkp) + logOnError(err, "jobBackupExport : Marshal") + + m = TGCommand{ + Type: commandReplyMsg, + Text: `Compressing archive`, + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m + + zbuf := new(bytes.Buffer) + zw := zip.NewWriter(zbuf) + zf, err := zw.Create(`backup.json`) + logOnError(err, "jobBackupExport : Create") + + _, err = zf.Write(b) + logOnError(err, "jobBackupExport : Write") + + err = zw.Close() + logOnError(err, "jobBackupExport : Close") + + d := tb.Document{} + d.File = tb.FromReader(bytes.NewReader(zbuf.Bytes())) + d.FileName = fmt.Sprintf("%s.backup.zip", start.Format("20060102150405")) + d.Caption = d.FileName + d.MIME = `application/zip` + + m = TGCommand{ + Type: commandReplyMsg, + Text: `Export done.`, + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m + + m = TGCommand{ + Type: commandSendDocument, + Document: d, + ToChatID64: p.ChatID64, + } + TGCmdQueue <- m + + err = setJobDone(j.ID64) + logOnError(err, "jobBackupExport : setJobDone") + return +} + +func jobBackupImport(j Job) { + var p JobPayloadBackupImport + err := setJobStart(j.ID64) + logOnError(err, "jobBackupImport : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobBackupImport : Unmarshal payload") + + if err == nil { + m := TGCommand{ + Type: commandReplyMsg, + Text: "Backup imported.", + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + } + TGCmdQueue <- m + } + + err = setJobDone(j.ID64) + logOnError(err, "jobBackupImport : setJobDone") return }