This commit is contained in:
shoopea 2019-06-03 09:19:06 +08:00
parent a1b3fff0e8
commit 5e75ca3334
5 changed files with 31 additions and 13 deletions

11
job.go
View File

@ -29,8 +29,8 @@ func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Ti
return 0, err return 0, err
} }
stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, schedule, is_done, in_work, inserted, pulled, started, ended, payload) stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, seq_nr, schedule, is_done, in_work, inserted, pulled, started, ended, payload)
VALUES (?, ?, ?, ?, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, NULL, NULL, NULL, ?);`)
logOnError(err, "createJob : prepare insert obj_job") logOnError(err, "createJob : prepare insert obj_job")
if err != nil { if err != nil {
return 0, err return 0, err
@ -108,14 +108,15 @@ func loadCurrentJobs() ([]Job, error) {
) )
t := time.Now() t := time.Now()
r := RndInt64()
_, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, t, SQLJobSliceSize) _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize)
logOnError(err, "loadCurrentJobs : update intial rows") logOnError(err, "loadCurrentJobs : update intial rows")
stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.pulled = ? ORDER BY j.priority ASC, j.obj_id ASC;") stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;")
logOnError(err, "loadCurrentJobs : prepare select statement") logOnError(err, "loadCurrentJobs : prepare select statement")
rows, err := stmt.Query(t) rows, err := stmt.Query(r)
// rows, err := stmt.Query(time.Now()) // rows, err := stmt.Query(time.Now())
logOnError(err, "loadCurrentJobs : query select statement") logOnError(err, "loadCurrentJobs : query select statement")

19
main.go
View File

@ -8,6 +8,7 @@ import (
"gopkg.in/gcfg.v1" "gopkg.in/gcfg.v1"
tb "gopkg.in/tucnak/telebot.v2" tb "gopkg.in/tucnak/telebot.v2"
"log" "log"
"math/rand"
"time" "time"
) )
@ -44,16 +45,21 @@ var (
db *sql.DB db *sql.DB
b *tb.Bot b *tb.Bot
cfg Config cfg Config
RndSrc *rand.Rand
RndMu sync.Mutex
MQCWMsgQueue chan ChatWarsMessage MQCWMsgQueue chan ChatWarsMessage
SQLMsgIdentifyQueue chan int64 SQLMsgIdentifyQueue chan int64
TGCmdQueue chan TGCommand TGCmdQueue chan TGCommand
MQTGCmdQueue chan TGCommand MQTGCmdQueue chan TGCommand
JobQueue chan Job JobQueue chan Job
msgParsingRules map[int]MessageParsingRule
clientsKeepAlive map[int64]*MQKeepAlive msgParsingRules map[int]MessageParsingRule
clientsQueue map[int64]*MQClient clientsKeepAlive map[int64]*MQKeepAlive
clientsCW map[int64]*ChatWarsClient clientsQueue map[int64]*MQClient
clientsCW map[int64]*ChatWarsClient
) )
func PrintText(m *tb.Message) { func PrintText(m *tb.Message) {
@ -67,6 +73,9 @@ func main() {
// Parsing config // Parsing config
flag.Parse() flag.Parse()
// randomize
RndSrc = rand.New(rand.NewSource(time.Now()))
err := gcfg.ReadFileInto(&cfg, *config) err := gcfg.ReadFileInto(&cfg, *config)
failOnError(err, "Parsing config") failOnError(err, "Parsing config")

1
sql.go
View File

@ -308,6 +308,7 @@ func initDB() {
,schedule DATETIME NOT NULL ,schedule DATETIME NOT NULL
,is_done TINYINT NOT NULL ,is_done TINYINT NOT NULL
,in_work TINYINT NOT NULL ,in_work TINYINT NOT NULL
,seq_nr BIGINT UNSIGNED
,inserted TIMESTAMP ,inserted TIMESTAMP
,pulled TIMESTAMP ,pulled TIMESTAMP
,started TIMESTAMP ,started TIMESTAMP

View File

@ -77,3 +77,10 @@ func fromChatWarsDate(d string) (t time.Time, err error) {
func toChatWarsDate(t time.Time) (s string, err error) { func toChatWarsDate(t time.Time) (s string, err error) {
return "test", nil return "test", nil
} }
func RndInt64() int64 {
RndMu.Lock()
i := RndSrc.Uint64()
RndMu.Unlock()
return i
}

View File

@ -293,10 +293,10 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
err = clientsQueue[c.FromUserID64].Channel.Publish( err = clientsQueue[c.FromUserID64].Channel.Publish(
"", // exchange "", // exchange
clientsQueue[c.FromUserID64].Queue.Name, // routing key clientsQueue[c.FromUserID64].Queue.Name, // routing key
false, // mandatory false, // mandatory
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: "application/json", ContentType: "application/json",
Body: []byte(j), Body: []byte(j),