package main import ( "encoding/json" "fmt" "strconv" "time" ) func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Time, payload []byte) (int64, error) { stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id) VALUES (? , ?);`) logOnError(err, "createJob : prepare insert obj") if err != nil { return 0, err } defer stmt.Close() res, err := stmt.Exec(objTypeJob, jobTypeID) s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID) logOnError(err, s) if err != nil { return 0, err } objId, err := res.LastInsertId() logOnError(err, "createJob : get last insert Id") if err != nil { return 0, err } stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, seq_nr, schedule, is_done, in_work, inserted, pulled, started, ended, payload) VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) logOnError(err, "createJob : prepare insert obj_job") if err != nil { return 0, err } defer stmt.Close() _, err = stmt.Exec(objId, priority, userID64, objJobStatusNew, schedule.UTC(), time.Now().UTC(), payload) logOnError(err, "createJob : insert obj_job") if err != nil { return 0, err } return objId, nil } func setJobDone(jobId int64) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) logOnError(err, "setJobDone : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(time.Now().UTC(), jobId) s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobId) logOnError(err, s) if err != nil { return err } return nil } func setJobStart(jobId int64) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`) logOnError(err, "setJobStart : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(time.Now().UTC(), jobId) s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId) logOnError(err, s) if err != nil { return err } return nil } func rescheduleJob(jobID64 int64, status int32, schedule time.Time) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ?, j.status = ? WHERE j.obj_id = ?;`) logOnError(err, "rescheduleJob : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(schedule.UTC(), status, jobID64) s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobID64) logOnError(err, s) if err != nil { return err } return nil } func loadCurrentJobs() ([]Job, error) { var ( objId int64 jobTypeId int32 userID64 int64 status int32 payload []byte jobs []Job ) t := time.Now().UTC() r := RndInt64() _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize) logOnError(err, "loadCurrentJobs : update intial rows") stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") logOnError(err, "loadCurrentJobs : prepare select statement") rows, err := stmt.Query(r) // rows, err := stmt.Query(time.Now()) logOnError(err, "loadCurrentJobs : query select statement") for rows.Next() { err = rows.Scan(&objId, &jobTypeId, &status, &userID64, &payload) logOnError(err, "loadCurrentJobs : scan query rows") job := Job{ ID64: objId, JobTypeID: jobTypeId, Status: status, UserID64: userID64, Payload: payload, } jobs = append(jobs, job) } err = rows.Err() logOnError(err, "loadCurrentJobs : scan end rows") rows.Close() err = stmt.Close() logOnError(err, "loadCurrentJobs : close select statement") return jobs, nil } func jobRescan(j Job) { var r JobPayloadRescanMsg err := setJobStart(j.ID64) logOnError(err, "jobRescan : setJobStart") err = json.Unmarshal(j.Payload, &r) logOnError(err, "jobRescan : Unmarshal payload") start := time.Now() ids := getSQLListID64(r.Query) if len(ids) > 1 { for _, id := range ids { SQLMsgIdentifyQueue <- id } p := JobPayloadSetDone{ JobID64: j.ID64, MsgID64: r.MsgID64, ChatID64: r.ChatID64, Text: fmt.Sprintf("%d messages processed in %s.", len(ids), time.Since(start)), } b, _ := json.Marshal(p) _, err := createJob(objSubTypeJobSetJobDone, objJobPriorityRescanAllMsg, j.UserID64, time.Now().UTC(), b) logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)") } else if len(ids) == 1 { SQLMsgIdentifyQueue <- ids[0] err = setJobDone(j.ID64) logOnError(err, "jobRescan : setJobDone(1)") if r.MsgID64 != 0 || r.ChatID64 != 0 { m := TGCommand{ Type: commandReplyMsg, Text: "One message processed.", FromMsgID64: r.MsgID64, FromChatID64: r.ChatID64, } TGCmdQueue <- m } } else { err = setJobDone(j.ID64) logOnError(err, "jobRescan : setJobDone(0)") if r.MsgID64 != 0 || r.ChatID64 != 0 { m := TGCommand{ Type: commandReplyMsg, Text: "No message processed.", FromMsgID64: r.MsgID64, FromChatID64: r.ChatID64, } TGCmdQueue <- m } } return } func jobSetDone(j Job) { var r JobPayloadSetDone err := setJobStart(j.ID64) logOnError(err, "jobSetDone : setJobStart") err = json.Unmarshal(j.Payload, &r) logOnError(err, "jobSetDone : Unmarshal payload") err = setJobDone(r.JobID64) logOnError(err, "jobSetDone : setJobDone(child)") err = setJobDone(j.ID64) logOnError(err, "jobSetDone : setJobDone") m := TGCommand{ Type: commandReplyMsg, Text: r.Text, FromMsgID64: r.MsgID64, FromChatID64: r.ChatID64, } TGCmdQueue <- m return } func jobPillage(j Job) { var r JobPayloadPillage err := setJobStart(j.ID64) logOnError(err, "jobPillage : setJobStart") err = json.Unmarshal(j.Payload, &r) logOnError(err, "jobPillage : Unmarshal payload") // check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job ids := getSQLListID64(` select ox.id from obj ox ,obj_msg omx ,obj op ,obj_msg omp ,obj_job oj where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` and omx.user_id = oj.user_id and omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + ` and omx.obj_id = ox.id and ox.obj_sub_type_id in (` + strconv.Itoa(objSubTypeMessagePillageGo) + `, ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + `, ` + strconv.Itoa(objSubTypeMessagePillageLoss) + `, ` + strconv.Itoa(objSubTypeMessagePillageWin) + `) and op.id = ` + strconv.FormatInt(r.ObjID64, 10) + ` and omp.obj_id = op.id and omx.date between omp.date and addtime(omp.date, '0 0:3:30.000000') order by case ox.obj_sub_type_id when ` + strconv.Itoa(objSubTypeMessagePillageWin) + ` then 0 when ` + strconv.Itoa(objSubTypeMessagePillageLoss) + ` then 1 when ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` then 2 when ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` then 3 else 4 end asc limit 1;`) if len(ids) > 1 { // issue there ? s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not m, err := getObjMsg(ids[0]) logOnError(err, "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)") if err == nil { if m.Date.Add(60 * time.Second).After(time.Now().UTC()) { msgTypeID64, err := getObjSubTypeId(ids[0]) logOnError(err, "jobPillage : getObjSubTypeId") if err == nil { if msgTypeID64 == objSubTypeMessagePillageGo { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if msgTypeID64 == objSubTypeMessagePillageWin { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We avoided a pillage (%s))", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if msgTypeID64 == objSubTypeMessagePillageLoss { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if msgTypeID64 == objSubTypeMessagePillageTimeout { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } } } } err = setJobDone(j.ID64) logOnError(err, "jobSetDone : setJobDone") return } s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("No outcome for the pillage yet"), ToUserID64: j.UserID64, } TGCmdQueue <- s //no outcome yet, have we sent a "/go" in the last 30 sec ? ids = getSQLListID64(` select ox.id from obj ox ,obj_msg omx ,obj_job oj where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` and omx.user_id = oj.user_id and omx.sender_user_id = oj.user_id and omx.obj_id = ox.id and ox.obj_sub_type_id =` + strconv.Itoa(objSubTypeMessageGo) + ` and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`) if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait m, err := getObjMsg(ids[0]) logOnError(err, "jobPillage : getMsg(objSubTypeMessageGo)") if err == nil { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) logOnError(err, "jobPillage : rescheduleJob(objSubTypeMessageGo)") } else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec clientSendCWMsg(j.UserID64, "/go") err = rescheduleJob(j.ID64, j.Status+1, time.Now().Add(30*time.Second).UTC()) logOnError(err, "jobPillage : rescheduleJob") } return } func jobMsgRefresh(j Job) { var p JobPayloadMsgRefresh // identify whether the message has been properly refreshed ? create new job ? reschedule same job ? err := setJobStart(j.ID64) logOnError(err, "jobMsgRefresh : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobMsgRefresh : Unmarshal payload") m, err := getObjMsg(p.ObjID64) logOnError(err, "jobMsgRefresh : getObjMsg") err = delObj(p.ObjID64) logOnError(err, "jobMsgRefresh : delObj") clientRefreshCWMsg(m.UserID64, m.ChatID64, m.ID64) err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } func jobMsgClient(j Job) { var p JobPayloadMsgClient err := setJobStart(j.ID64) logOnError(err, "jobMsgClient : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobMsgClient : Unmarshal payload") if err == nil { clientSendCWMsg(j.UserID64, p.Text) m := TGCommand{ Type: commandReplyMsg, Text: "Message sent.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m } err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } func jobBackupExport(j Job) { var p JobPayloadBackupExport err := setJobStart(j.ID64) logOnError(err, "jobBackupExport : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobBackupExport : Unmarshal payload") if err == nil { m := TGCommand{ Type: commandReplyMsg, Text: "Backup exported.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m } err = setJobDone(j.ID64) logOnError(err, "jobBackupExport : setJobDone") return } func jobBackupImport(j Job) { var p JobPayloadBackupImport err := setJobStart(j.ID64) logOnError(err, "jobBackupImport : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobBackupImport : Unmarshal payload") if err == nil { m := TGCommand{ Type: commandReplyMsg, Text: "Backup imported.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m } err = setJobDone(j.ID64) logOnError(err, "jobBackupImport : setJobDone") return }