diff --git a/job.go b/job.go index 736deba..c17f66e 100644 --- a/job.go +++ b/job.go @@ -97,7 +97,7 @@ func rescheduleJob(jobID64 int64, status int32, schedule time.Time) error { return nil } -func loadCurrentJobs(sid int) ([]Job, error) { +func loadCurrentJobs() ([]Job, error) { var ( objId int64 jobTypeId int32 @@ -107,13 +107,13 @@ func loadCurrentJobs(sid int) ([]Job, error) { jobs []Job ) - _, err := db.Exec("UPDATE obj_job j SET session_id = ?, j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", sid, time.Now(), time.Now(), SQLJobSliceSize) + _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", time.Now(), time.Now(), SQLJobSliceSize) logOnError(err, "loadCurrentJobs : update intial rows") - stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.session_id = ? ORDER BY j.priority ASC, j.obj_id ASC;") + stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND ORDER BY j.priority ASC, j.obj_id ASC;") logOnError(err, "loadCurrentJobs : prepare select statement") - rows, err := stmt.Query(sid) + rows, err := stmt.Query() // rows, err := stmt.Query(time.Now()) logOnError(err, "loadCurrentJobs : query select statement") @@ -314,6 +314,13 @@ func jobPillage(j Job) { return } + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("No outcome for the pillage yet(%s)", m.Date.Format(time.RFC3339)), + ToUserID64: j.UserID64, + } + TGCmdQueue <- s + //no outcome yet, have we sent a "/go" in the last 30 sec ? ids = getSQLListID64(` select ox.id from obj ox diff --git a/main.go b/main.go index 5cbff38..c980a4c 100644 --- a/main.go +++ b/main.go @@ -49,6 +49,7 @@ var ( SQLMsgIdentifyQueue chan int64 TGCmdQueue chan TGCommand MQTGCmdQueue chan TGCommand + JobQueue chan Job msgParsingRules map[int]MessageParsingRule clientsKeepAlive map[int64]*MQKeepAlive clientsQueue map[int64]*MQClient @@ -124,6 +125,7 @@ func main() { SQLMsgIdentifyQueue = make(chan int64, SQLMsgIdentifyQueueSize) TGCmdQueue = make(chan TGCommand, TGCmdQueueSize) MQTGCmdQueue = make(chan TGCommand, MQTGCmdQueueSize) + JobQueue = make(chan Job, JobQueueSize) clientsQueue = make(map[int64]*MQClient) clientsKeepAlive = make(map[int64]*MQKeepAlive) clientsCW = make(map[int64]*ChatWarsClient) @@ -137,8 +139,8 @@ func main() { for w := 1; w <= SQLIdentifyMsgWorkers; w++ { go SQLIdentifyMsgWorker(w, SQLMsgIdentifyQueue) } - for w := 1; w <= SQLJobWorkers; w++ { - go SQLJobWorker(w) + for w := 1; w <= jobWorkers; w++ { + go jobWorker(w, JobQueue) } for w := 1; w <= TGCmdWorkers; w++ { go TGCmdWorker(w, b, TGCmdQueue) @@ -146,6 +148,7 @@ func main() { for w := 1; w <= MQTGCmdWorkers; w++ { go MQTGCmdWorker(w, MQTGCmdQueue) } + go SQLJobWorker() go MQKeepAliveWorker() go MQTidyKeepAliveWorker() diff --git a/sql.go b/sql.go index 63a7529..d8919b4 100644 --- a/sql.go +++ b/sql.go @@ -304,7 +304,6 @@ func initDB() { obj_id BIGINT UNSIGNED NOT NULL ,priority SMALLINT NOT NULL ,user_id BIGINT UNSIGNED NOT NULL - ,session_id SMALLINT ,status SMALLINT NOT NULL ,schedule DATETIME NOT NULL ,is_done TINYINT NOT NULL diff --git a/workers.go b/workers.go index 468400d..9c49198 100644 --- a/workers.go +++ b/workers.go @@ -279,33 +279,41 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) { log.Printf("SQLIdentifyMsgWorker[" + strconv.Itoa(id) + "] : Closing.") } -func SQLJobWorker(id int) { - //log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Starting.") +func SQLJobWorker() { + //log.Printf("SQLJobWorker : Starting.") for true { - jobs, err := loadCurrentJobs(id) - logOnError(err, "SQLJobWorker["+strconv.Itoa(id)+"] : loadCurrentJobs") + jobs, err := loadCurrentJobs() + logOnError(err, "SQLJobWorker : loadCurrentJobs") if len(jobs) > 0 { - log.Printf("SQLJobWorker["+strconv.Itoa(id)+"] : %d jobs.\n", len(jobs)) + log.Printf("SQLJobWorker : %d jobs.\n", len(jobs)) } for _, j := range jobs { - switch j.JobTypeID { - case objSubTypeJobRescanMsg: - jobRescan(j) - case objSubTypeJobSetJobDone: - jobSetDone(j) - case objSubTypeJobPillage: - jobPillage(j) - case objSubTypeJobMsgClient: - jobMsgClient(j) - default: - log.Printf("SQLJobWorker["+strconv.Itoa(id)+"] : No handler for job type #%d.\n", j.JobTypeID) - } + JobQueue <- j } if len(jobs) < SQLJobSliceSize { time.Sleep(100 * time.Millisecond) } } - log.Printf("SQLJobWorker[" + strconv.Itoa(id) + "] : Closing.") + log.Printf("SQLJobWorker : Closing.") +} + +func jobWorker(id int, jobs <-chan Job) { + //log.Printf("jobWorker[" + strconv.Itoa(id) + "] : Starting.") + for j := range jobs { + switch j.JobTypeID { + case objSubTypeJobRescanMsg: + jobRescan(j) + case objSubTypeJobSetJobDone: + jobSetDone(j) + case objSubTypeJobPillage: + jobPillage(j) + case objSubTypeJobMsgClient: + jobMsgClient(j) + default: + log.Printf("jobWorker["+strconv.Itoa(id)+"] : No handler for job type #%d.\n", j.JobTypeID) + } + } + log.Printf("jobWorker[" + strconv.Itoa(id) + "] : Closing.") } func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) { @@ -355,10 +363,10 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) { logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) err = clientsQueue[c.FromUserID64].Channel.Publish( - "", // exchange + "", // exchange clientsQueue[c.FromUserID64].Queue.Name, // routing key - false, // mandatory - false, // immediate + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: []byte(j),