diff --git a/def.go b/def.go index 4260a89..6658661 100644 --- a/def.go +++ b/def.go @@ -36,6 +36,36 @@ type MessageParsingRule struct { re *regexp.Regexp } +type Job struct { + ID64 int64 + JobTypeID int32 + Payload []byte +} + +type JobPayloadPillage struct { + UserID64 int64 `json:"user_id"` +} + +type JobPayloadTribute struct { + UserID64 int64 `json:"user_id"` +} + +type JobPayloadStatus struct { + UserID64 int64 `json:"user_id"` +} + +type JobPayloadWithdrawal struct { + UserID64 int64 `json:"user_id"` +} + +type JobPayloadGStock struct { + UserID64 int64 `json:"user_id"` +} + +type JobPayloadRescanMsg struct { + MsgID64 int64 `json:"msg_id"` +} + const ( objTypeUser = 1 objTypeGuild = 2 @@ -69,6 +99,7 @@ const ( objSubTypeJobStatus = 603 objSubTypeJobWithdrawal = 604 objSubTypeJobGStock = 605 + objSubTypeJobRescanMsg = 606 objSubTypeItemResource = 701 objSubTypeItemAlch = 702 objSubTypeItemMisc = 703 @@ -76,6 +107,10 @@ const ( objSubTypeItemPart = 705 objSubTypeItemOther = 706 + objJobStatusNew = 0 + objJonStatusPending = 10 + objJobStatusDone = 20 + MQGetMsgWorkers = 3 SQLCWMsgWorkers = 6 SQLIdentifyMsgWorkers = 6 diff --git a/main.go b/main.go index 41d719a..d82c0d8 100644 --- a/main.go +++ b/main.go @@ -105,6 +105,9 @@ func main() { for w := 1; w <= SQLIdentifyMsgWorkers; w++ { go SQLIdentifyMsgWorker(w, SQLMsgIdentifyQueue) } + for w := 1; w <= SQLJobWorkers; w++ { + go SQLJobWorker(w) + } fmt.Println("Started !") diff --git a/sql.go b/sql.go index 730055e..c50f96c 100644 --- a/sql.go +++ b/sql.go @@ -20,7 +20,7 @@ func initDB() { failOnError(err, "initDB : set foreign_key_checks = 0") var name string - rows, err := db.Query("show tables") + rows, err := tx.Query("show tables") failOnError(err, "initDB : show tables") for rows.Next() { @@ -64,6 +64,7 @@ func initDB() { id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT ,obj_type_id SMALLINT UNSIGNED NOT NULL ,obj_sub_type_id SMALLINT UNSIGNED NOT NULL + ,timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ,PRIMARY KEY (id) ,FOREIGN KEY (obj_type_id) REFERENCES code_obj_type(id) ON DELETE CASCADE ,FOREIGN KEY (obj_sub_type_id) REFERENCES code_obj_sub_type(id) ON DELETE CASCADE @@ -191,6 +192,18 @@ func initDB() { ) ENGINE = InnoDB CHARSET=utf8mb4 COLLATE utf8mb4_unicode_ci;`) failOnError(err, "initDB : create table obj_auction_announce") + _, err = db.Exec(`CREATE TABLE obj_job ( + obj_id BIGINT UNSIGNED NOT NULL + ,priority SMALLINT NOT NULL + ,status SMALLINT NOT NULL + ,schedule DATETIME NOT NULL + ,start TIMESTAMP + ,end TIMESTAMP + ,payload VARCHAR(4000) + ,FOREIGN KEY (obj_id) REFERENCES obj(id) ON DELETE CASCADE + ) ENGINE = InnoDB CHARSET=utf8mb4 COLLATE utf8mb4_unicode_ci;`) + failOnError(err, "initDB : create table obj_job") + _, err = db.Exec(`INSERT INTO code_obj_type (id, intl_id, name) VALUES (` + strconv.Itoa(objTypeUser) + `, "user", "User") ,(` + strconv.Itoa(objTypeGuild) + `, "guild", "Guild") @@ -1355,3 +1368,58 @@ func loadMsgParsingRules() (m map[int]MessageParsingRule, err error) { return m, nil } + +func loadCurrentJobs() ([]Job, error) { + var ( + objId int64 + jobTypeId int32 + payload []byte + jobs []Job + ) + + tx, err := db.Begin() + logOnError(err, "loadCurrentJobs : begin transaction") + if err != nil { + return 0, err + } + defer tx.Rollback() + + stmt, err := tx.Prepare("SELECT o.id, o.obj_sub_type_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.status = ? AND j.schedule <= ? ORDER BY j.prio ASC LIMIT 10 FOR UPDATE;") + logOnError(err, "loadCurrentJobs : prepare select statement") + + rows, err := stmt.Query(objJobStatusNew, time.Now()) + logOnError(err, "loadCurrentJobs : query select statement") + + for rows.Next() { + err = rows.Scan(&objId, &jobTypeId, &payload) + logOnError(err, "loadCurrentJobs : scan query rows") + job := Job{ + ID64: objId, + JobTypeID: jobTypeId, + Payload: &payload, + } + jobs = append(jobs, job) + } + err = rows.Err() + logOnError(err, "loadCurrentJobs : scan end rows") + rows.Close() + + err = stmt.Close() + logOnError(err, "loadCurrentJobs : close select statement") + + stmt, err = tx.Prepare("UPDATE obj_job j SET j.status = ? WHERE j.obj_id = ?;") + logOnError(err, "loadCurrentJobs : prepare update statement") + + for _, job := range jobs { + err = stmt.Exec(objJonStatusPending, job.ID64) + logOnError(err, "loadCurrentJobs : updating row") + } + + err = stmt.Close() + logOnError(err, "loadCurrentJobs : close update statement") + + err = tx.Commit() + logOnError(err, "loadCurrentJobs : commit") + + return jobs, nil +} diff --git a/workers.go b/workers.go index dbb7103..ed25e8b 100644 --- a/workers.go +++ b/workers.go @@ -127,7 +127,7 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) { log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Year : %s (%d)\n", r.ReplaceAllString(m.Text, "${Year}"), objId) */ t, err := fromChatWarsDate(r.ReplaceAllString(m.Text, "${Day}") + " " + r.ReplaceAllString(m.Text, "${Month}") + " " + r.ReplaceAllString(m.Text, "${Year}") + " " + r.ReplaceAllString(m.Text, "${Hour}") + ":" + r.ReplaceAllString(m.Text, "${Minute}")) - logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Time : time") + logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : fromChatWarsDate") log.Println(t) default: log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Unknwon message type in rule %d : %d (%d)\n", msgParsingRules[i].ID, msgParsingRules[i].MsgTypeID, objId) @@ -140,10 +140,19 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) { func SQLJobWorker(id int) { // var tick time.Time + var ( + objId int64 + priority int32 + status int32 + payload string + j []Job + ) + log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Starting.") for true { + jobs, err := loadCurrentJobs() + logOnError(err, "SQLJobWorker["+strconv.Itoa(id)+"] : loadCurrentJobs") time.Sleep(100 * time.Millisecond) - // tick = time.Now() } log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Closing.") }