This commit is contained in:
shoopea 2019-05-11 12:54:12 +08:00
parent ef2310d26d
commit 55b1acda1d
6 changed files with 140 additions and 20 deletions

6
bot.go
View File

@ -76,9 +76,9 @@ func botMsgRescan(m *tb.Message) (string, error) {
r := regexp.MustCompile("^[0-9]+$")
if r.MatchString(m.Payload) {
p := JobPayloadRescanMsg{}
fmt.Sprintf(p.Query, "SELECT o.id from obj o where o.id = %d and o.obj_type_id = %d and o.obj_sub_type_id = %d;", m.Payload, objTypeMessage, objSubTypeMessageUnknown)
fmt.Sprintf(p.Query, "SELECT o.id from obj o where o.id = %s and o.obj_type_id = %d and o.obj_sub_type_id = %d;", m.Payload, objTypeMessage, objSubTypeMessageUnknown)
b, _ := json.Marshal(p)
err := createJob(objSubTypeJobRescanMsg, 2, time.Now(), b)
err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanMsg, time.Now(), b)
logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)")
if err != nil {
return "Error scheduling the rescan for msg #" + m.Payload, nil
@ -92,7 +92,7 @@ func botMsgRescan(m *tb.Message) (string, error) {
p := JobPayloadRescanMsg{}
fmt.Sprintf(p.Query, "SELECT o.id from obj o where o.obj_type_id = %d and o.obj_sub_type_id = %d;", objTypeMessage, objSubTypeMessageUnknown)
b, _ := json.Marshal(p)
err := createJob(objSubTypeJobRescanMsg, 3, time.Now(), b)
err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanAllMsg, time.Now(), b)
logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)")
if err != nil {
return "Error scheduling the rescan for all msg", nil

12
def.go
View File

@ -45,6 +45,7 @@ type BotMsg struct {
type Job struct {
ID64 int64
JobTypeID int32
Status int32
Payload []byte
}
@ -72,6 +73,10 @@ type JobPayloadRescanMsg struct {
Query string `json:"query"`
}
type JobPayloadSetDone struct {
JobID64 int64 `json:"job_id"`
}
const (
objTypeUser = 1
objTypeGuild = 2
@ -106,7 +111,7 @@ const (
objSubTypeJobWithdrawal = 604
objSubTypeJobGStock = 605
objSubTypeJobRescanMsg = 606
objSubTypeJobMarkAsDone = 607
objSubTypeJobSetJobDone = 607
objSubTypeItemResource = 701
objSubTypeItemAlch = 702
objSubTypeItemMisc = 703
@ -118,6 +123,11 @@ const (
objJonStatusPending = 10
objJobStatusDone = 20
objJobPriority = 1
objJobPriorityRescanMsg = 2
objJobPriorityRescanChildMsg = 3
objJobPriorityRescanAllMsg = 4
MQGetMsgWorkers = 3
SQLCWMsgWorkers = 6
SQLIdentifyMsgWorkers = 6

46
job.go Normal file
View File

@ -0,0 +1,46 @@
package main
import (
"fmt"
"log"
)
func jobRescan(Job j) {
err := setJobStart(j.ID64)
logOnError(err, "jobRescan : setJobStart")
r := JobPayloadRescanMsg{}
err = json.Unmarshal(j.payload, &r)
ids := getSQLListID64(r.Query)
if len(ids) > 1 {
for _, id := range ids {
p := JobPayloadRescanMsg{}
fmt.Sprintf(p.Query, "SELECT o.id from obj o where o.id = %d and o.obj_type_id = %d and o.obj_sub_type_id = %d;", id, objTypeMessage, objSubTypeMessageUnknown)
b, _ := json.Marshal(p)
err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanChildMsg, time.Now(), b)
logOnError(err, "jobRescan : createJob(objSubTypeJobRescanMsg)")
}
p := JobPayloadSetDone{
JobID64: j.ID64,
}
b, _ := json.Marshal(p)
err := createJob(objSubTypeJobSetJobDone, objJobPriority, time.Now(), b)
logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)")
} else if len(ids) == 1 {
SQLMsgIdentifyQueue <- ids[0]
err = setJobDone(j.ID64)
logOnError(err, "jobRescan : setJobDone")
}
return
}
func jobSetDone(Job j) {
err := setJobStart(j.ID64)
logOnError(err, "jobSetDone : setJobStart")
r := JobPayloadSetDone{}
err = json.Unmarshal(j.payload, &r)
err = setJobDone(r.JobID64)
logOnError(err, "jobSetDone : setJobDone(child)")
err = setJobDone(j.ID64)
logOnError(err, "jobSetDone : setJobDone")
return
}

View File

@ -116,7 +116,7 @@ func main() {
go SQLJobWorker(w)
}
fmt.Println("Started !")
log.Println("Bot started !")
// Main loop
for {

81
sql.go
View File

@ -199,8 +199,11 @@ func initDB() {
,priority SMALLINT NOT NULL
,status SMALLINT NOT NULL
,schedule DATETIME NOT NULL
,start TIMESTAMP
,end TIMESTAMP
,is_done TINYINT NOT NULL
,in_work TINYINT NOT NULL
,pulled TIMESTAMP
,started TIMESTAMP
,ended TIMESTAMP
,payload VARCHAR(4000)
,FOREIGN KEY (obj_id) REFERENCES obj(id) ON DELETE CASCADE
) ENGINE = InnoDB CHARSET=utf8mb4 COLLATE utf8mb4_unicode_ci;`)
@ -240,7 +243,7 @@ func initDB() {
,(` + strconv.Itoa(objSubTypeJobWithdrawal) + `, "job_withdraw", "Withdrawal job", ` + strconv.Itoa(objTypeJob) + `)
,(` + strconv.Itoa(objSubTypeJobGStock) + `, "job_gstock", "GStock job", ` + strconv.Itoa(objTypeJob) + `)
,(` + strconv.Itoa(objSubTypeJobRescanMsg) + `, "job_rescan_msg", "Rescan message job", ` + strconv.Itoa(objTypeJob) + `)
,(` + strconv.Itoa(objSubTypeJobMarkAsDone) + `, "job_mark_as_done", "Mark job as done job", ` + strconv.Itoa(objTypeJob) + `)
,(` + strconv.Itoa(objSubTypeJobSetJobDone) + `, "job_set_done", "Set job as done job", ` + strconv.Itoa(objTypeJob) + `)
,(` + strconv.Itoa(objSubTypeItemResource) + `, "item_res", "Time", ` + strconv.Itoa(objTypeItem) + `)
,(` + strconv.Itoa(objSubTypeItemAlch) + `, "item_alch", "Time", ` + strconv.Itoa(objTypeItem) + `)
,(` + strconv.Itoa(objSubTypeItemMisc) + `, "item_misc", "Time", ` + strconv.Itoa(objTypeItem) + `)
@ -1353,15 +1356,15 @@ func createJob(job_type_id int32, priority int32, schedule time.Time, payload []
return err
}
stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, status, schedule, start, end, payload)
VALUES (?, ?, ?, ?, NULL, NULL, ?);`)
stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, status, schedule, is_done, in_work, inserted, pulled, started, end, payload)
VALUES (?, ?, ?, ?, 0, 0, , ?, NULL, NULL, NULL, ?);`)
logOnError(err, "createJob : prepare insert obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(objId, priority, objJobStatusNew, schedule, payload)
_, err = stmt.Exec(objId, priority, objJobStatusNew, schedule, time.Now(), payload)
logOnError(err, "createJob : insert obj_job")
if err != nil {
return err
@ -1370,6 +1373,40 @@ func createJob(job_type_id int32, priority int32, schedule time.Time, payload []
return nil
}
func setJobDone(jobId int64) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`)
logOnError(err, "setJobDone : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
res, err := stmt.Exec(time.Now(), jobId)
s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobId)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func setJobStart(jobId int64) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.ended = ? WHERE j.obj_id = ?;`)
logOnError(err, "setJobStart : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
res, err := stmt.Exec(time.Now(), jobId)
s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func loadMsgParsingRules() (m map[int]MessageParsingRule, err error) {
var (
id int32
@ -1420,6 +1457,7 @@ func loadCurrentJobs() ([]Job, error) {
var (
objId int64
jobTypeId int32
status int32
payload []byte
jobs []Job
)
@ -1431,18 +1469,19 @@ func loadCurrentJobs() ([]Job, error) {
}
defer tx.Rollback()
stmt, err := tx.Prepare("SELECT o.id, o.obj_sub_type_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.status = ? AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ? FOR UPDATE;")
stmt, err := tx.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 and j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ? FOR UPDATE;")
logOnError(err, "loadCurrentJobs : prepare select statement")
rows, err := stmt.Query(objJobStatusNew, time.Now(), SQLJobSliceSize)
rows, err := stmt.Query(time.Now(), SQLJobSliceSize)
logOnError(err, "loadCurrentJobs : query select statement")
for rows.Next() {
err = rows.Scan(&objId, &jobTypeId, &payload)
err = rows.Scan(&objId, &jobTypeId, &status, &payload)
logOnError(err, "loadCurrentJobs : scan query rows")
job := Job{
ID64: objId,
JobTypeID: jobTypeId,
Status: status,
Payload: payload,
}
jobs = append(jobs, job)
@ -1454,11 +1493,11 @@ func loadCurrentJobs() ([]Job, error) {
err = stmt.Close()
logOnError(err, "loadCurrentJobs : close select statement")
stmt, err = tx.Prepare("UPDATE obj_job j SET j.status = ? WHERE j.obj_id = ?;")
stmt, err = tx.Prepare("UPDATE obj_job j SET j.in_work = 1, j.pulled = ? WHERE j.obj_id = ?;")
logOnError(err, "loadCurrentJobs : prepare update statement")
for _, job := range jobs {
_, err = stmt.Exec(objJonStatusPending, job.ID64)
_, err = stmt.Exec(time.Now(), job.ID64)
logOnError(err, "loadCurrentJobs : updating row")
}
@ -1470,3 +1509,23 @@ func loadCurrentJobs() ([]Job, error) {
return jobs, nil
}
func getSQLListID64(q string) []int64 {
var id int64
rows, err := db.Query(q)
s := fmt.Sprintf("getSQLListID64 : Query(%s)", q)
logOnError(err, s)
ids := make([]int64)
for rows.Next() {
err = rows.Scan(&id)
logOnError(err, "getSQLListID64 : scan next val")
ids = append(ids, id)
}
err = rows.Err()
loglOnError(err, "getSQLListID64 : query end")
rows.Close()
return ids
}

View File

@ -146,11 +146,16 @@ func SQLJobWorker(id int) {
if len(jobs) > 0 {
log.Printf("SQLJobWorker["+strconv.Itoa(id)+"] : %d jobs.\n", len(jobs))
}
/*
for _, j := range jobs {
for _, j := range jobs {
switch j.JobTypeID {
case objSubTypeJobRescanMsg:
jobRescan(j)
case objSubTypeJobSetJobDone:
jobSetDone(j)
default:
log.Printf("SQLJobWorker["+strconv.Itoa(id)+"] : No handler for job type #%d.\n", j.JobTypeID)
}
*/
}
if len(jobs) < SQLJobSliceSize {
time.Sleep(100 * time.Millisecond)
}