This commit is contained in:
shoopea 2019-05-31 20:24:27 +08:00
parent 43d7943c4c
commit 06dc7c7662
4 changed files with 45 additions and 28 deletions

15
job.go
View File

@ -97,7 +97,7 @@ func rescheduleJob(jobID64 int64, status int32, schedule time.Time) error {
return nil return nil
} }
func loadCurrentJobs(sid int) ([]Job, error) { func loadCurrentJobs() ([]Job, error) {
var ( var (
objId int64 objId int64
jobTypeId int32 jobTypeId int32
@ -107,13 +107,13 @@ func loadCurrentJobs(sid int) ([]Job, error) {
jobs []Job jobs []Job
) )
_, err := db.Exec("UPDATE obj_job j SET session_id = ?, j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", sid, time.Now(), time.Now(), SQLJobSliceSize) _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", time.Now(), time.Now(), SQLJobSliceSize)
logOnError(err, "loadCurrentJobs : update intial rows") logOnError(err, "loadCurrentJobs : update intial rows")
stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.session_id = ? ORDER BY j.priority ASC, j.obj_id ASC;") stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND ORDER BY j.priority ASC, j.obj_id ASC;")
logOnError(err, "loadCurrentJobs : prepare select statement") logOnError(err, "loadCurrentJobs : prepare select statement")
rows, err := stmt.Query(sid) rows, err := stmt.Query()
// rows, err := stmt.Query(time.Now()) // rows, err := stmt.Query(time.Now())
logOnError(err, "loadCurrentJobs : query select statement") logOnError(err, "loadCurrentJobs : query select statement")
@ -314,6 +314,13 @@ func jobPillage(j Job) {
return return
} }
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("No outcome for the pillage yet(%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
//no outcome yet, have we sent a "/go" in the last 30 sec ? //no outcome yet, have we sent a "/go" in the last 30 sec ?
ids = getSQLListID64(` select ox.id ids = getSQLListID64(` select ox.id
from obj ox from obj ox

View File

@ -49,6 +49,7 @@ var (
SQLMsgIdentifyQueue chan int64 SQLMsgIdentifyQueue chan int64
TGCmdQueue chan TGCommand TGCmdQueue chan TGCommand
MQTGCmdQueue chan TGCommand MQTGCmdQueue chan TGCommand
JobQueue chan Job
msgParsingRules map[int]MessageParsingRule msgParsingRules map[int]MessageParsingRule
clientsKeepAlive map[int64]*MQKeepAlive clientsKeepAlive map[int64]*MQKeepAlive
clientsQueue map[int64]*MQClient clientsQueue map[int64]*MQClient
@ -124,6 +125,7 @@ func main() {
SQLMsgIdentifyQueue = make(chan int64, SQLMsgIdentifyQueueSize) SQLMsgIdentifyQueue = make(chan int64, SQLMsgIdentifyQueueSize)
TGCmdQueue = make(chan TGCommand, TGCmdQueueSize) TGCmdQueue = make(chan TGCommand, TGCmdQueueSize)
MQTGCmdQueue = make(chan TGCommand, MQTGCmdQueueSize) MQTGCmdQueue = make(chan TGCommand, MQTGCmdQueueSize)
JobQueue = make(chan Job, JobQueueSize)
clientsQueue = make(map[int64]*MQClient) clientsQueue = make(map[int64]*MQClient)
clientsKeepAlive = make(map[int64]*MQKeepAlive) clientsKeepAlive = make(map[int64]*MQKeepAlive)
clientsCW = make(map[int64]*ChatWarsClient) clientsCW = make(map[int64]*ChatWarsClient)
@ -137,8 +139,8 @@ func main() {
for w := 1; w <= SQLIdentifyMsgWorkers; w++ { for w := 1; w <= SQLIdentifyMsgWorkers; w++ {
go SQLIdentifyMsgWorker(w, SQLMsgIdentifyQueue) go SQLIdentifyMsgWorker(w, SQLMsgIdentifyQueue)
} }
for w := 1; w <= SQLJobWorkers; w++ { for w := 1; w <= jobWorkers; w++ {
go SQLJobWorker(w) go jobWorker(w, JobQueue)
} }
for w := 1; w <= TGCmdWorkers; w++ { for w := 1; w <= TGCmdWorkers; w++ {
go TGCmdWorker(w, b, TGCmdQueue) go TGCmdWorker(w, b, TGCmdQueue)
@ -146,6 +148,7 @@ func main() {
for w := 1; w <= MQTGCmdWorkers; w++ { for w := 1; w <= MQTGCmdWorkers; w++ {
go MQTGCmdWorker(w, MQTGCmdQueue) go MQTGCmdWorker(w, MQTGCmdQueue)
} }
go SQLJobWorker()
go MQKeepAliveWorker() go MQKeepAliveWorker()
go MQTidyKeepAliveWorker() go MQTidyKeepAliveWorker()

1
sql.go
View File

@ -304,7 +304,6 @@ func initDB() {
obj_id BIGINT UNSIGNED NOT NULL obj_id BIGINT UNSIGNED NOT NULL
,priority SMALLINT NOT NULL ,priority SMALLINT NOT NULL
,user_id BIGINT UNSIGNED NOT NULL ,user_id BIGINT UNSIGNED NOT NULL
,session_id SMALLINT
,status SMALLINT NOT NULL ,status SMALLINT NOT NULL
,schedule DATETIME NOT NULL ,schedule DATETIME NOT NULL
,is_done TINYINT NOT NULL ,is_done TINYINT NOT NULL

View File

@ -279,33 +279,41 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) {
log.Printf("SQLIdentifyMsgWorker[" + strconv.Itoa(id) + "] : Closing.") log.Printf("SQLIdentifyMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
} }
func SQLJobWorker(id int) { func SQLJobWorker() {
//log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Starting.") //log.Printf("SQLJobWorker : Starting.")
for true { for true {
jobs, err := loadCurrentJobs(id) jobs, err := loadCurrentJobs()
logOnError(err, "SQLJobWorker["+strconv.Itoa(id)+"] : loadCurrentJobs") logOnError(err, "SQLJobWorker : loadCurrentJobs")
if len(jobs) > 0 { if len(jobs) > 0 {
log.Printf("SQLJobWorker["+strconv.Itoa(id)+"] : %d jobs.\n", len(jobs)) log.Printf("SQLJobWorker : %d jobs.\n", len(jobs))
} }
for _, j := range jobs { for _, j := range jobs {
switch j.JobTypeID { JobQueue <- j
case objSubTypeJobRescanMsg:
jobRescan(j)
case objSubTypeJobSetJobDone:
jobSetDone(j)
case objSubTypeJobPillage:
jobPillage(j)
case objSubTypeJobMsgClient:
jobMsgClient(j)
default:
log.Printf("SQLJobWorker["+strconv.Itoa(id)+"] : No handler for job type #%d.\n", j.JobTypeID)
}
} }
if len(jobs) < SQLJobSliceSize { if len(jobs) < SQLJobSliceSize {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
} }
log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Closing.") log.Printf("SQLJobWorker : Closing.")
}
func jobWorker(id int, jobs <-chan Job) {
//log.Printf("jobWorker[" + strconv.Itoa(id) + "] : Starting.")
for j := range jobs {
switch j.JobTypeID {
case objSubTypeJobRescanMsg:
jobRescan(j)
case objSubTypeJobSetJobDone:
jobSetDone(j)
case objSubTypeJobPillage:
jobPillage(j)
case objSubTypeJobMsgClient:
jobMsgClient(j)
default:
log.Printf("jobWorker["+strconv.Itoa(id)+"] : No handler for job type #%d.\n", j.JobTypeID)
}
}
log.Printf("jobWorker[" + strconv.Itoa(id) + "] : Closing.")
} }
func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) { func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) {
@ -355,10 +363,10 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
err = clientsQueue[c.FromUserID64].Channel.Publish( err = clientsQueue[c.FromUserID64].Channel.Publish(
"", // exchange "", // exchange
clientsQueue[c.FromUserID64].Queue.Name, // routing key clientsQueue[c.FromUserID64].Queue.Name, // routing key
false, // mandatory false, // mandatory
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: "application/json", ContentType: "application/json",
Body: []byte(j), Body: []byte(j),