diff --git a/bot.go b/bot.go index d3a3f1e..4051a4d 100644 --- a/bot.go +++ b/bot.go @@ -76,9 +76,9 @@ func botMsgRescan(m *tb.Message) (string, error) { r := regexp.MustCompile("^[0-9]+$") if r.MatchString(m.Payload) { p := JobPayloadRescanMsg{} - fmt.Sprintf(p.Query, "SELECT o.id from obj o where o.id = %d and o.obj_type_id = %d and o.obj_sub_type_id = %d;", m.Payload, objTypeMessage, objSubTypeMessageUnknown) + fmt.Sprintf(p.Query, "SELECT o.id from obj o where o.id = %s and o.obj_type_id = %d and o.obj_sub_type_id = %d;", m.Payload, objTypeMessage, objSubTypeMessageUnknown) b, _ := json.Marshal(p) - err := createJob(objSubTypeJobRescanMsg, 2, time.Now(), b) + err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanMsg, time.Now(), b) logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") if err != nil { return "Error scheduling the rescan for msg #" + m.Payload, nil @@ -92,7 +92,7 @@ func botMsgRescan(m *tb.Message) (string, error) { p := JobPayloadRescanMsg{} fmt.Sprintf(p.Query, "SELECT o.id from obj o where o.obj_type_id = %d and o.obj_sub_type_id = %d;", objTypeMessage, objSubTypeMessageUnknown) b, _ := json.Marshal(p) - err := createJob(objSubTypeJobRescanMsg, 3, time.Now(), b) + err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanAllMsg, time.Now(), b) logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") if err != nil { return "Error scheduling the rescan for all msg", nil diff --git a/def.go b/def.go index 18dfda6..31577ed 100644 --- a/def.go +++ b/def.go @@ -45,6 +45,7 @@ type BotMsg struct { type Job struct { ID64 int64 JobTypeID int32 + Status int32 Payload []byte } @@ -72,6 +73,10 @@ type JobPayloadRescanMsg struct { Query string `json:"query"` } +type JobPayloadSetDone struct { + JobID64 int64 `json:"job_id"` +} + const ( objTypeUser = 1 objTypeGuild = 2 @@ -106,7 +111,7 @@ const ( objSubTypeJobWithdrawal = 604 objSubTypeJobGStock = 605 objSubTypeJobRescanMsg = 606 - objSubTypeJobMarkAsDone = 607 + objSubTypeJobSetJobDone = 607 objSubTypeItemResource = 701 objSubTypeItemAlch = 702 objSubTypeItemMisc = 703 @@ -118,6 +123,11 @@ const ( objJonStatusPending = 10 objJobStatusDone = 20 + objJobPriority = 1 + objJobPriorityRescanMsg = 2 + objJobPriorityRescanChildMsg = 3 + objJobPriorityRescanAllMsg = 4 + MQGetMsgWorkers = 3 SQLCWMsgWorkers = 6 SQLIdentifyMsgWorkers = 6 diff --git a/job.go b/job.go new file mode 100644 index 0000000..bd092de --- /dev/null +++ b/job.go @@ -0,0 +1,46 @@ +package main + +import ( + "fmt" + "log" +) + +func jobRescan(Job j) { + err := setJobStart(j.ID64) + logOnError(err, "jobRescan : setJobStart") + r := JobPayloadRescanMsg{} + err = json.Unmarshal(j.payload, &r) + ids := getSQLListID64(r.Query) + if len(ids) > 1 { + for _, id := range ids { + p := JobPayloadRescanMsg{} + fmt.Sprintf(p.Query, "SELECT o.id from obj o where o.id = %d and o.obj_type_id = %d and o.obj_sub_type_id = %d;", id, objTypeMessage, objSubTypeMessageUnknown) + b, _ := json.Marshal(p) + err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanChildMsg, time.Now(), b) + logOnError(err, "jobRescan : createJob(objSubTypeJobRescanMsg)") + } + p := JobPayloadSetDone{ + JobID64: j.ID64, + } + b, _ := json.Marshal(p) + err := createJob(objSubTypeJobSetJobDone, objJobPriority, time.Now(), b) + logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)") + } else if len(ids) == 1 { + SQLMsgIdentifyQueue <- ids[0] + err = setJobDone(j.ID64) + logOnError(err, "jobRescan : setJobDone") + } + return +} + +func jobSetDone(Job j) { + err := setJobStart(j.ID64) + logOnError(err, "jobSetDone : setJobStart") + r := JobPayloadSetDone{} + err = json.Unmarshal(j.payload, &r) + err = setJobDone(r.JobID64) + logOnError(err, "jobSetDone : setJobDone(child)") + err = setJobDone(j.ID64) + logOnError(err, "jobSetDone : setJobDone") + return +} diff --git a/main.go b/main.go index 0db4312..e9cfc6c 100644 --- a/main.go +++ b/main.go @@ -116,7 +116,7 @@ func main() { go SQLJobWorker(w) } - fmt.Println("Started !") + log.Println("Bot started !") // Main loop for { diff --git a/sql.go b/sql.go index fc343c2..d57fb1b 100644 --- a/sql.go +++ b/sql.go @@ -199,8 +199,11 @@ func initDB() { ,priority SMALLINT NOT NULL ,status SMALLINT NOT NULL ,schedule DATETIME NOT NULL - ,start TIMESTAMP - ,end TIMESTAMP + ,is_done TINYINT NOT NULL + ,in_work TINYINT NOT NULL + ,pulled TIMESTAMP + ,started TIMESTAMP + ,ended TIMESTAMP ,payload VARCHAR(4000) ,FOREIGN KEY (obj_id) REFERENCES obj(id) ON DELETE CASCADE ) ENGINE = InnoDB CHARSET=utf8mb4 COLLATE utf8mb4_unicode_ci;`) @@ -240,7 +243,7 @@ func initDB() { ,(` + strconv.Itoa(objSubTypeJobWithdrawal) + `, "job_withdraw", "Withdrawal job", ` + strconv.Itoa(objTypeJob) + `) ,(` + strconv.Itoa(objSubTypeJobGStock) + `, "job_gstock", "GStock job", ` + strconv.Itoa(objTypeJob) + `) ,(` + strconv.Itoa(objSubTypeJobRescanMsg) + `, "job_rescan_msg", "Rescan message job", ` + strconv.Itoa(objTypeJob) + `) - ,(` + strconv.Itoa(objSubTypeJobMarkAsDone) + `, "job_mark_as_done", "Mark job as done job", ` + strconv.Itoa(objTypeJob) + `) + ,(` + strconv.Itoa(objSubTypeJobSetJobDone) + `, "job_set_done", "Set job as done job", ` + strconv.Itoa(objTypeJob) + `) ,(` + strconv.Itoa(objSubTypeItemResource) + `, "item_res", "Time", ` + strconv.Itoa(objTypeItem) + `) ,(` + strconv.Itoa(objSubTypeItemAlch) + `, "item_alch", "Time", ` + strconv.Itoa(objTypeItem) + `) ,(` + strconv.Itoa(objSubTypeItemMisc) + `, "item_misc", "Time", ` + strconv.Itoa(objTypeItem) + `) @@ -1353,15 +1356,15 @@ func createJob(job_type_id int32, priority int32, schedule time.Time, payload [] return err } - stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, status, schedule, start, end, payload) - VALUES (?, ?, ?, ?, NULL, NULL, ?);`) + stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, status, schedule, is_done, in_work, inserted, pulled, started, end, payload) + VALUES (?, ?, ?, ?, 0, 0, , ?, NULL, NULL, NULL, ?);`) logOnError(err, "createJob : prepare insert obj_job") if err != nil { return err } defer stmt.Close() - _, err = stmt.Exec(objId, priority, objJobStatusNew, schedule, payload) + _, err = stmt.Exec(objId, priority, objJobStatusNew, schedule, time.Now(), payload) logOnError(err, "createJob : insert obj_job") if err != nil { return err @@ -1370,6 +1373,40 @@ func createJob(job_type_id int32, priority int32, schedule time.Time, payload [] return nil } +func setJobDone(jobId int64) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) + logOnError(err, "setJobDone : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + res, err := stmt.Exec(time.Now(), jobId) + s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func setJobStart(jobId int64) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.ended = ? WHERE j.obj_id = ?;`) + logOnError(err, "setJobStart : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + res, err := stmt.Exec(time.Now(), jobId) + s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + func loadMsgParsingRules() (m map[int]MessageParsingRule, err error) { var ( id int32 @@ -1420,6 +1457,7 @@ func loadCurrentJobs() ([]Job, error) { var ( objId int64 jobTypeId int32 + status int32 payload []byte jobs []Job ) @@ -1431,18 +1469,19 @@ func loadCurrentJobs() ([]Job, error) { } defer tx.Rollback() - stmt, err := tx.Prepare("SELECT o.id, o.obj_sub_type_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.status = ? AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ? FOR UPDATE;") + stmt, err := tx.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 and j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ? FOR UPDATE;") logOnError(err, "loadCurrentJobs : prepare select statement") - rows, err := stmt.Query(objJobStatusNew, time.Now(), SQLJobSliceSize) + rows, err := stmt.Query(time.Now(), SQLJobSliceSize) logOnError(err, "loadCurrentJobs : query select statement") for rows.Next() { - err = rows.Scan(&objId, &jobTypeId, &payload) + err = rows.Scan(&objId, &jobTypeId, &status, &payload) logOnError(err, "loadCurrentJobs : scan query rows") job := Job{ ID64: objId, JobTypeID: jobTypeId, + Status: status, Payload: payload, } jobs = append(jobs, job) @@ -1454,11 +1493,11 @@ func loadCurrentJobs() ([]Job, error) { err = stmt.Close() logOnError(err, "loadCurrentJobs : close select statement") - stmt, err = tx.Prepare("UPDATE obj_job j SET j.status = ? WHERE j.obj_id = ?;") + stmt, err = tx.Prepare("UPDATE obj_job j SET j.in_work = 1, j.pulled = ? WHERE j.obj_id = ?;") logOnError(err, "loadCurrentJobs : prepare update statement") for _, job := range jobs { - _, err = stmt.Exec(objJonStatusPending, job.ID64) + _, err = stmt.Exec(time.Now(), job.ID64) logOnError(err, "loadCurrentJobs : updating row") } @@ -1470,3 +1509,23 @@ func loadCurrentJobs() ([]Job, error) { return jobs, nil } + +func getSQLListID64(q string) []int64 { + var id int64 + + rows, err := db.Query(q) + s := fmt.Sprintf("getSQLListID64 : Query(%s)", q) + logOnError(err, s) + + ids := make([]int64) + + for rows.Next() { + err = rows.Scan(&id) + logOnError(err, "getSQLListID64 : scan next val") + ids = append(ids, id) + } + err = rows.Err() + loglOnError(err, "getSQLListID64 : query end") + rows.Close() + return ids +} diff --git a/workers.go b/workers.go index f2896e9..51b8de9 100644 --- a/workers.go +++ b/workers.go @@ -146,11 +146,16 @@ func SQLJobWorker(id int) { if len(jobs) > 0 { log.Printf("SQLJobWorker["+strconv.Itoa(id)+"] : %d jobs.\n", len(jobs)) } - /* - for _, j := range jobs { - + for _, j := range jobs { + switch j.JobTypeID { + case objSubTypeJobRescanMsg: + jobRescan(j) + case objSubTypeJobSetJobDone: + jobSetDone(j) + default: + log.Printf("SQLJobWorker["+strconv.Itoa(id)+"] : No handler for job type #%d.\n", j.JobTypeID) } - */ + } if len(jobs) < SQLJobSliceSize { time.Sleep(100 * time.Millisecond) }