test jobs

This commit is contained in:
shoopea 2019-05-10 17:23:47 +08:00
parent 4839c6b120
commit e7a947bbd3
4 changed files with 118 additions and 3 deletions

35
def.go
View File

@ -36,6 +36,36 @@ type MessageParsingRule struct {
re *regexp.Regexp re *regexp.Regexp
} }
type Job struct {
ID64 int64
JobTypeID int32
Payload []byte
}
type JobPayloadPillage struct {
UserID64 int64 `json:"user_id"`
}
type JobPayloadTribute struct {
UserID64 int64 `json:"user_id"`
}
type JobPayloadStatus struct {
UserID64 int64 `json:"user_id"`
}
type JobPayloadWithdrawal struct {
UserID64 int64 `json:"user_id"`
}
type JobPayloadGStock struct {
UserID64 int64 `json:"user_id"`
}
type JobPayloadRescanMsg struct {
MsgID64 int64 `json:"msg_id"`
}
const ( const (
objTypeUser = 1 objTypeUser = 1
objTypeGuild = 2 objTypeGuild = 2
@ -69,6 +99,7 @@ const (
objSubTypeJobStatus = 603 objSubTypeJobStatus = 603
objSubTypeJobWithdrawal = 604 objSubTypeJobWithdrawal = 604
objSubTypeJobGStock = 605 objSubTypeJobGStock = 605
objSubTypeJobRescanMsg = 606
objSubTypeItemResource = 701 objSubTypeItemResource = 701
objSubTypeItemAlch = 702 objSubTypeItemAlch = 702
objSubTypeItemMisc = 703 objSubTypeItemMisc = 703
@ -76,6 +107,10 @@ const (
objSubTypeItemPart = 705 objSubTypeItemPart = 705
objSubTypeItemOther = 706 objSubTypeItemOther = 706
objJobStatusNew = 0
objJonStatusPending = 10
objJobStatusDone = 20
MQGetMsgWorkers = 3 MQGetMsgWorkers = 3
SQLCWMsgWorkers = 6 SQLCWMsgWorkers = 6
SQLIdentifyMsgWorkers = 6 SQLIdentifyMsgWorkers = 6

View File

@ -105,6 +105,9 @@ func main() {
for w := 1; w <= SQLIdentifyMsgWorkers; w++ { for w := 1; w <= SQLIdentifyMsgWorkers; w++ {
go SQLIdentifyMsgWorker(w, SQLMsgIdentifyQueue) go SQLIdentifyMsgWorker(w, SQLMsgIdentifyQueue)
} }
for w := 1; w <= SQLJobWorkers; w++ {
go SQLJobWorker(w)
}
fmt.Println("Started !") fmt.Println("Started !")

70
sql.go
View File

@ -20,7 +20,7 @@ func initDB() {
failOnError(err, "initDB : set foreign_key_checks = 0") failOnError(err, "initDB : set foreign_key_checks = 0")
var name string var name string
rows, err := db.Query("show tables") rows, err := tx.Query("show tables")
failOnError(err, "initDB : show tables") failOnError(err, "initDB : show tables")
for rows.Next() { for rows.Next() {
@ -64,6 +64,7 @@ func initDB() {
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT
,obj_type_id SMALLINT UNSIGNED NOT NULL ,obj_type_id SMALLINT UNSIGNED NOT NULL
,obj_sub_type_id SMALLINT UNSIGNED NOT NULL ,obj_sub_type_id SMALLINT UNSIGNED NOT NULL
,timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
,PRIMARY KEY (id) ,PRIMARY KEY (id)
,FOREIGN KEY (obj_type_id) REFERENCES code_obj_type(id) ON DELETE CASCADE ,FOREIGN KEY (obj_type_id) REFERENCES code_obj_type(id) ON DELETE CASCADE
,FOREIGN KEY (obj_sub_type_id) REFERENCES code_obj_sub_type(id) ON DELETE CASCADE ,FOREIGN KEY (obj_sub_type_id) REFERENCES code_obj_sub_type(id) ON DELETE CASCADE
@ -191,6 +192,18 @@ func initDB() {
) ENGINE = InnoDB CHARSET=utf8mb4 COLLATE utf8mb4_unicode_ci;`) ) ENGINE = InnoDB CHARSET=utf8mb4 COLLATE utf8mb4_unicode_ci;`)
failOnError(err, "initDB : create table obj_auction_announce") failOnError(err, "initDB : create table obj_auction_announce")
_, err = db.Exec(`CREATE TABLE obj_job (
obj_id BIGINT UNSIGNED NOT NULL
,priority SMALLINT NOT NULL
,status SMALLINT NOT NULL
,schedule DATETIME NOT NULL
,start TIMESTAMP
,end TIMESTAMP
,payload VARCHAR(4000)
,FOREIGN KEY (obj_id) REFERENCES obj(id) ON DELETE CASCADE
) ENGINE = InnoDB CHARSET=utf8mb4 COLLATE utf8mb4_unicode_ci;`)
failOnError(err, "initDB : create table obj_job")
_, err = db.Exec(`INSERT INTO code_obj_type (id, intl_id, name) _, err = db.Exec(`INSERT INTO code_obj_type (id, intl_id, name)
VALUES (` + strconv.Itoa(objTypeUser) + `, "user", "User") VALUES (` + strconv.Itoa(objTypeUser) + `, "user", "User")
,(` + strconv.Itoa(objTypeGuild) + `, "guild", "Guild") ,(` + strconv.Itoa(objTypeGuild) + `, "guild", "Guild")
@ -1355,3 +1368,58 @@ func loadMsgParsingRules() (m map[int]MessageParsingRule, err error) {
return m, nil return m, nil
} }
func loadCurrentJobs() ([]Job, error) {
var (
objId int64
jobTypeId int32
payload []byte
jobs []Job
)
tx, err := db.Begin()
logOnError(err, "loadCurrentJobs : begin transaction")
if err != nil {
return 0, err
}
defer tx.Rollback()
stmt, err := tx.Prepare("SELECT o.id, o.obj_sub_type_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.status = ? AND j.schedule <= ? ORDER BY j.prio ASC LIMIT 10 FOR UPDATE;")
logOnError(err, "loadCurrentJobs : prepare select statement")
rows, err := stmt.Query(objJobStatusNew, time.Now())
logOnError(err, "loadCurrentJobs : query select statement")
for rows.Next() {
err = rows.Scan(&objId, &jobTypeId, &payload)
logOnError(err, "loadCurrentJobs : scan query rows")
job := Job{
ID64: objId,
JobTypeID: jobTypeId,
Payload: &payload,
}
jobs = append(jobs, job)
}
err = rows.Err()
logOnError(err, "loadCurrentJobs : scan end rows")
rows.Close()
err = stmt.Close()
logOnError(err, "loadCurrentJobs : close select statement")
stmt, err = tx.Prepare("UPDATE obj_job j SET j.status = ? WHERE j.obj_id = ?;")
logOnError(err, "loadCurrentJobs : prepare update statement")
for _, job := range jobs {
err = stmt.Exec(objJonStatusPending, job.ID64)
logOnError(err, "loadCurrentJobs : updating row")
}
err = stmt.Close()
logOnError(err, "loadCurrentJobs : close update statement")
err = tx.Commit()
logOnError(err, "loadCurrentJobs : commit")
return jobs, nil
}

View File

@ -127,7 +127,7 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) {
log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Year : %s (%d)\n", r.ReplaceAllString(m.Text, "${Year}"), objId) log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Year : %s (%d)\n", r.ReplaceAllString(m.Text, "${Year}"), objId)
*/ */
t, err := fromChatWarsDate(r.ReplaceAllString(m.Text, "${Day}") + " " + r.ReplaceAllString(m.Text, "${Month}") + " " + r.ReplaceAllString(m.Text, "${Year}") + " " + r.ReplaceAllString(m.Text, "${Hour}") + ":" + r.ReplaceAllString(m.Text, "${Minute}")) t, err := fromChatWarsDate(r.ReplaceAllString(m.Text, "${Day}") + " " + r.ReplaceAllString(m.Text, "${Month}") + " " + r.ReplaceAllString(m.Text, "${Year}") + " " + r.ReplaceAllString(m.Text, "${Hour}") + ":" + r.ReplaceAllString(m.Text, "${Minute}"))
logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Time : time") logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : fromChatWarsDate")
log.Println(t) log.Println(t)
default: default:
log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Unknwon message type in rule %d : %d (%d)\n", msgParsingRules[i].ID, msgParsingRules[i].MsgTypeID, objId) log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Unknwon message type in rule %d : %d (%d)\n", msgParsingRules[i].ID, msgParsingRules[i].MsgTypeID, objId)
@ -140,10 +140,19 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) {
func SQLJobWorker(id int) { func SQLJobWorker(id int) {
// var tick time.Time // var tick time.Time
var (
objId int64
priority int32
status int32
payload string
j []Job
)
log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Starting.") log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Starting.")
for true { for true {
jobs, err := loadCurrentJobs()
logOnError(err, "SQLJobWorker["+strconv.Itoa(id)+"] : loadCurrentJobs")
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
// tick = time.Now()
} }
log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Closing.") log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Closing.")
} }