diff --git a/job.go b/job.go index 4a98c18..60c96dc 100644 --- a/job.go +++ b/job.go @@ -29,8 +29,8 @@ func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Ti return 0, err } - stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, schedule, is_done, in_work, inserted, pulled, started, ended, payload) - VALUES (?, ?, ?, ?, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) + stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, seq_nr, schedule, is_done, in_work, inserted, pulled, started, ended, payload) + VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) logOnError(err, "createJob : prepare insert obj_job") if err != nil { return 0, err @@ -108,14 +108,15 @@ func loadCurrentJobs() ([]Job, error) { ) t := time.Now() + r := RndInt64() - _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, t, SQLJobSliceSize) + _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize) logOnError(err, "loadCurrentJobs : update intial rows") - stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.pulled = ? ORDER BY j.priority ASC, j.obj_id ASC;") + stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") logOnError(err, "loadCurrentJobs : prepare select statement") - rows, err := stmt.Query(t) + rows, err := stmt.Query(r) // rows, err := stmt.Query(time.Now()) logOnError(err, "loadCurrentJobs : query select statement") diff --git a/main.go b/main.go index ec506ae..bfbc4ec 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "gopkg.in/gcfg.v1" tb "gopkg.in/tucnak/telebot.v2" "log" + "math/rand" "time" ) @@ -44,16 +45,21 @@ var ( db *sql.DB b *tb.Bot - cfg Config + cfg Config + + RndSrc *rand.Rand + RndMu sync.Mutex + MQCWMsgQueue chan ChatWarsMessage SQLMsgIdentifyQueue chan int64 TGCmdQueue chan TGCommand MQTGCmdQueue chan TGCommand JobQueue chan Job - msgParsingRules map[int]MessageParsingRule - clientsKeepAlive map[int64]*MQKeepAlive - clientsQueue map[int64]*MQClient - clientsCW map[int64]*ChatWarsClient + + msgParsingRules map[int]MessageParsingRule + clientsKeepAlive map[int64]*MQKeepAlive + clientsQueue map[int64]*MQClient + clientsCW map[int64]*ChatWarsClient ) func PrintText(m *tb.Message) { @@ -67,6 +73,9 @@ func main() { // Parsing config flag.Parse() + // randomize + RndSrc = rand.New(rand.NewSource(time.Now())) + err := gcfg.ReadFileInto(&cfg, *config) failOnError(err, "Parsing config") diff --git a/sql.go b/sql.go index 3dd5519..2560cc7 100644 --- a/sql.go +++ b/sql.go @@ -308,6 +308,7 @@ func initDB() { ,schedule DATETIME NOT NULL ,is_done TINYINT NOT NULL ,in_work TINYINT NOT NULL + ,seq_nr BIGINT UNSIGNED ,inserted TIMESTAMP ,pulled TIMESTAMP ,started TIMESTAMP diff --git a/utils.go b/utils.go index ec4b42f..34a5a63 100644 --- a/utils.go +++ b/utils.go @@ -77,3 +77,10 @@ func fromChatWarsDate(d string) (t time.Time, err error) { func toChatWarsDate(t time.Time) (s string, err error) { return "test", nil } + +func RndInt64() int64 { + RndMu.Lock() + i := RndSrc.Uint64() + RndMu.Unlock() + return i +} diff --git a/workers.go b/workers.go index 4cd551e..0fdfc11 100644 --- a/workers.go +++ b/workers.go @@ -293,10 +293,10 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) { logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) err = clientsQueue[c.FromUserID64].Channel.Publish( - "", // exchange + "", // exchange clientsQueue[c.FromUserID64].Queue.Name, // routing key - false, // mandatory - false, // immediate + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: []byte(j),