package main import ( "archive/zip" "bytes" "encoding/json" "fmt" "io/ioutil" "strconv" "strings" "time" ) func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Time, payload []byte) (int64, error) { stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id) VALUES (? , ?);`) logOnError(err, "createJob : prepare insert obj") if err != nil { return 0, err } defer stmt.Close() res, err := stmt.Exec(objTypeJob, jobTypeID) s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID) logOnError(err, s) if err != nil { return 0, err } objId, err := res.LastInsertId() logOnError(err, "createJob : get last insert Id") if err != nil { return 0, err } stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, seq_nr, schedule, is_done, in_work, inserted, pulled, started, ended, payload) VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) logOnError(err, "createJob : prepare insert obj_job") if err != nil { return 0, err } defer stmt.Close() _, err = stmt.Exec(objId, priority, userID64, objJobStatusNew, schedule.UTC(), time.Now().UTC(), payload) logOnError(err, "createJob : insert obj_job") if err != nil { return 0, err } return objId, nil } func setJobDone(jobId int64) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) logOnError(err, "setJobDone : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(time.Now().UTC(), jobId) s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobId) logOnError(err, s) if err != nil { return err } return nil } func setJobStart(jobId int64) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`) logOnError(err, "setJobStart : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(time.Now().UTC(), jobId) s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId) logOnError(err, s) if err != nil { return err } return nil } func rescheduleJob(jobID64 int64, status int32, schedule time.Time) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ?, j.status = ? WHERE j.obj_id = ?;`) logOnError(err, "rescheduleJob : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(schedule.UTC(), status, jobID64) s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobID64) logOnError(err, s) if err != nil { return err } return nil } func loadCurrentJobs() ([]Job, error) { var ( objId int64 jobTypeId int32 userID64 int64 status int32 payload []byte jobs []Job ) t := time.Now().UTC() r := RndInt64() _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize) logOnError(err, "loadCurrentJobs : update intial rows") stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") logOnError(err, "loadCurrentJobs : prepare select statement") rows, err := stmt.Query(r) // rows, err := stmt.Query(time.Now()) logOnError(err, "loadCurrentJobs : query select statement") for rows.Next() { err = rows.Scan(&objId, &jobTypeId, &status, &userID64, &payload) logOnError(err, "loadCurrentJobs : scan query rows") job := Job{ ID64: objId, JobTypeID: jobTypeId, Status: status, UserID64: userID64, Payload: payload, } jobs = append(jobs, job) } err = rows.Err() logOnError(err, "loadCurrentJobs : scan end rows") rows.Close() err = stmt.Close() logOnError(err, "loadCurrentJobs : close select statement") return jobs, nil } func jobRescan(j Job) { var r JobPayloadRescanMsg err := setJobStart(j.ID64) logOnError(err, "jobRescan : setJobStart") err = json.Unmarshal(j.Payload, &r) logOnError(err, "jobRescan : Unmarshal payload") start := time.Now() ids := getSQLListID64(r.Query) if len(ids) > 1 { for _, id := range ids { SQLMsgIdentifyQueue <- id } p := JobPayloadSetDone{ JobID64: j.ID64, MsgID64: r.MsgID64, ChatID64: r.ChatID64, Text: fmt.Sprintf("%d messages processed in %s.", len(ids), time.Since(start)), } b, _ := json.Marshal(p) _, err := createJob(objSubTypeJobSetJobDone, objJobPriorityRescanAllMsg, j.UserID64, time.Now().UTC(), b) logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)") } else if len(ids) == 1 { SQLMsgIdentifyQueue <- ids[0] err = setJobDone(j.ID64) logOnError(err, "jobRescan : setJobDone(1)") if r.MsgID64 != 0 || r.ChatID64 != 0 { m := TGCommand{ Type: commandReplyMsg, Text: "One message processed.", FromMsgID64: r.MsgID64, FromChatID64: r.ChatID64, } TGCmdQueue <- m } } else { err = setJobDone(j.ID64) logOnError(err, "jobRescan : setJobDone(0)") if r.MsgID64 != 0 || r.ChatID64 != 0 { m := TGCommand{ Type: commandReplyMsg, Text: "No message processed.", FromMsgID64: r.MsgID64, FromChatID64: r.ChatID64, } TGCmdQueue <- m } } return } func jobSetDone(j Job) { var r JobPayloadSetDone err := setJobStart(j.ID64) logOnError(err, "jobSetDone : setJobStart") err = json.Unmarshal(j.Payload, &r) logOnError(err, "jobSetDone : Unmarshal payload") err = setJobDone(r.JobID64) logOnError(err, "jobSetDone : setJobDone(child)") err = setJobDone(j.ID64) logOnError(err, "jobSetDone : setJobDone") m := TGCommand{ Type: commandReplyMsg, Text: r.Text, FromMsgID64: r.MsgID64, FromChatID64: r.ChatID64, } TGCmdQueue <- m return } func jobPillage(j Job) { var r JobPayloadPillage err := setJobStart(j.ID64) logOnError(err, "jobPillage : setJobStart") err = json.Unmarshal(j.Payload, &r) logOnError(err, "jobPillage : Unmarshal payload") // check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job ids := getSQLListID64(` select ox.id from obj ox ,obj_msg omx ,obj op ,obj_msg omp ,obj_job oj where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` and omx.user_id = oj.user_id and omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + ` and omx.obj_id = ox.id and ox.obj_sub_type_id in (` + strconv.Itoa(objSubTypeMessagePillageGo) + `, ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + `, ` + strconv.Itoa(objSubTypeMessagePillageLoss) + `, ` + strconv.Itoa(objSubTypeMessagePillageWin) + `) and op.id = ` + strconv.FormatInt(r.ObjID64, 10) + ` and omp.obj_id = op.id and omx.date between omp.date and addtime(omp.date, '0 0:3:30.000000') order by case ox.obj_sub_type_id when ` + strconv.Itoa(objSubTypeMessagePillageWin) + ` then 0 when ` + strconv.Itoa(objSubTypeMessagePillageLoss) + ` then 1 when ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` then 2 when ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` then 3 else 4 end asc limit 1;`) if len(ids) > 1 { // issue there ? s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not m, err := getObjMsg(ids[0]) logOnError(err, "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)") if err == nil { if m.Date.Add(60 * time.Second).After(time.Now().UTC()) { msgTypeID64, err := getObjSubTypeId(ids[0]) logOnError(err, "jobPillage : getObjSubTypeId") if err == nil { if msgTypeID64 == objSubTypeMessagePillageGo { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if msgTypeID64 == objSubTypeMessagePillageWin { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We avoided a pillage (%s))", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if msgTypeID64 == objSubTypeMessagePillageLoss { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if msgTypeID64 == objSubTypeMessagePillageTimeout { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } } } } err = setJobDone(j.ID64) logOnError(err, "jobSetDone : setJobDone") return } s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("No outcome for the pillage yet"), ToUserID64: j.UserID64, } TGCmdQueue <- s //no outcome yet, have we sent a "/go" in the last 30 sec ? ids = getSQLListID64(` select ox.id from obj ox ,obj_msg omx ,obj_job oj where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` and omx.user_id = oj.user_id and omx.sender_user_id = oj.user_id and omx.obj_id = ox.id and ox.obj_sub_type_id =` + strconv.Itoa(objSubTypeMessageGo) + ` and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`) if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait m, err := getObjMsg(ids[0]) logOnError(err, "jobPillage : getMsg(objSubTypeMessageGo)") if err == nil { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) logOnError(err, "jobPillage : rescheduleJob(objSubTypeMessageGo)") } else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec clientSendCWMsg(j.UserID64, "/go") err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) logOnError(err, "jobPillage : rescheduleJob") } return } func jobMsgRefresh(j Job) { var p JobPayloadMsgRefresh // identify whether the message has been properly refreshed ? create new job ? reschedule same job ? err := setJobStart(j.ID64) logOnError(err, "jobMsgRefresh : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobMsgRefresh : Unmarshal payload") m, err := getObjMsg(p.ObjID64) if err != nil && strings.Compare(err.Error(), `sql: no rows in result set`) == 0 { err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } else { logOnError(err, "jobMsgRefresh : getObjMsg") err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } err = delObj(p.ObjID64) logOnError(err, "jobMsgRefresh : delObj") clientRefreshCWMsg(m.UserID64, m.ChatID64, m.ID64) err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } func jobMsgClient(j Job) { var p JobPayloadMsgClient err := setJobStart(j.ID64) logOnError(err, "jobMsgClient : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobMsgClient : Unmarshal payload") if err == nil { clientSendCWMsg(j.UserID64, p.Text) m := TGCommand{ Type: commandReplyMsg, Text: "Message sent.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m } err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } func jobBackupExport(j Job) { var p JobPayloadBackupExport err := setJobStart(j.ID64) logOnError(err, "jobBackupExport : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobBackupExport : Unmarshal payload") bkp := DataBackup{} start := time.Now() milestone := time.Now() s := new([]ChatWarsMessage) msgs := *s ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`) txt := fmt.Sprintf("Backing up %d messages.", len(ids)) m := TGCommand{ Type: commandReplyMsg, Text: txt, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m i := 0 for _, id := range ids { m, err := getObjMsg(id) logOnError(err, "jobBackupExport : getMsg") if err == nil { msgs = append(msgs, *m) } i = i + 1 if time.Now().Add(1 * time.Minute).After(milestone) { txt := fmt.Sprintf("Exported %d/%d messages.", i, len(ids)) m = TGCommand{ Type: commandReplyMsg, Text: txt, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m milestone = time.Now() } } bkp.Messages = msgs b, err := json.Marshal(bkp) logOnError(err, "jobBackupExport : Marshal") m = TGCommand{ Type: commandReplyMsg, Text: `Compressing archive`, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m zbuf := new(bytes.Buffer) zw := zip.NewWriter(zbuf) zf, err := zw.Create(`backup.json`) logOnError(err, "jobBackupExport : Create") _, err = zf.Write(b) logOnError(err, "jobBackupExport : Write") err = zw.Close() logOnError(err, "jobBackupExport : Close") d := tb.Document{} d.File = tb.FromReader(bytes.NewReader(zbuf.Bytes())) d.FileName = fmt.Sprintf("%s.backup.zip", start.Format("20060102150405")) d.Caption = d.FileName d.MIME = `application/zip` m = TGCommand{ Type: commandReplyMsg, Text: `Export done.`, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m m = TGCommand{ Type: commandSendDocument, Document: d, ToChatID64: p.ChatID64, } TGCmdQueue <- m err = setJobDone(j.ID64) logOnError(err, "jobBackupExport : setJobDone") return } func jobBackupImport(j Job) { var p JobPayloadBackupImport err := setJobStart(j.ID64) logOnError(err, "jobBackupImport : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobBackupImport : Unmarshal payload") resp, err := http.Get(p.URL) logOnError(err, "jobBackupImport : Get") defer resp.Body.Close() buf := new(bytes.Buffer) buf.ReadFrom(resp.Body) m := TGCommand{ Type: commandReplyMsg, Text: "File downloaded.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m z := buf.Bytes() r := bytes.NewReader(z) zr, err := zip.NewReader(r, int64(len(z))) for _, f := range zr.File { if strings.Compare(f.Name, "backup.json") == 0 { rc, err := f.Open() logOnError(err, "jobBackupImport : Open") if err != nil { return err } data, err := ioutil.ReadAll(rc) logOnError(err, "jobBackupImport : ReadAll") if err != nil { return err } log.Printf("jobBackupImport : %d uncompressed bytes.\n", len(data)) rc.Close() bkp := DataBackup{} err = json.Unmarshal(data, &bkp) logOnError(err, "jobBackupImport : Unmarshal") if err != nil { return err } for _, m := range bkp.Messages { _, err = addObjMsg(m.ID64, m.ChatID64, m.UserID64, m.SenderUserID64, m.Date, m.Text) } m := TGCommand{ Type: commandReplyMsg, Text: "Backup restored.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m err = setJobDone(j.ID64) logOnError(err, "jobBackupImport : setJobDone") return } } m := TGCommand{ Type: commandReplyMsg, Text: "Not backup file found in archive.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m err = setJobDone(j.ID64) logOnError(err, "jobBackupImport : setJobDone") return }