This commit is contained in:
shoopea 2019-05-25 20:22:31 +08:00
parent cd721708a8
commit eba3491206

35
sql.go
View File

@ -276,6 +276,7 @@ func initDB() {
obj_id BIGINT UNSIGNED NOT NULL
,priority SMALLINT NOT NULL
,user_id BIGINT UNSIGNED NOT NULL
,sesssion_id SMALLINT
,status SMALLINT NOT NULL
,schedule DATETIME NOT NULL
,is_done TINYINT NOT NULL
@ -1525,8 +1526,8 @@ func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Ti
return err
}
stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, schedule, is_done, in_work, inserted, pulled, started, ended, payload)
VALUES (?, ?, ?, ?, ?, 0, 0, ?, NULL, NULL, NULL, ?);`)
stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, session_id, status, schedule, is_done, in_work, inserted, pulled, started, ended, payload)
VALUES (?, ?, ?, NULL, ?, ?, 0, 0, ?, NULL, NULL, NULL, ?);`)
logOnError(err, "createJob : prepare insert obj_job")
if err != nil {
return err
@ -1622,7 +1623,7 @@ func loadMsgParsingRules() (m map[int]MessageParsingRule, err error) {
return m, nil
}
func loadCurrentJobs() ([]Job, error) {
func loadCurrentJobs(sid int64) ([]Job, error) {
var (
objId int64
jobTypeId int32
@ -1631,18 +1632,14 @@ func loadCurrentJobs() ([]Job, error) {
jobs []Job
)
tx, err := db.Begin()
logOnError(err, "loadCurrentJobs : begin transaction")
if err != nil {
return jobs, err
}
defer tx.Rollback()
_, err := db.Exec("UPDATE obj_job j SET session_id = ?, j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", sid, time.Now(), time.Now(), SQLJobSliceSize)
logOnError(err, "loadCurrentJobs : update intial rows")
stmt, err := tx.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 and j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT 1 FOR UPDATE;")
stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.session_id = ? ORDER BY j.priority ASC, j.obj_id ASC;")
logOnError(err, "loadCurrentJobs : prepare select statement")
//rows, err := stmt.Query(time.Now(), SQLJobSliceSize)
rows, err := stmt.Query(time.Now())
rows, err := stmt.Query(sid)
// rows, err := stmt.Query(time.Now())
logOnError(err, "loadCurrentJobs : query select statement")
for rows.Next() {
@ -1663,20 +1660,6 @@ func loadCurrentJobs() ([]Job, error) {
err = stmt.Close()
logOnError(err, "loadCurrentJobs : close select statement")
stmt, err = tx.Prepare("UPDATE obj_job j SET j.in_work = 1, j.pulled = ? WHERE j.obj_id = ?;")
logOnError(err, "loadCurrentJobs : prepare update statement")
for _, job := range jobs {
_, err = stmt.Exec(time.Now(), job.ID64)
logOnError(err, "loadCurrentJobs : updating row")
}
err = stmt.Close()
logOnError(err, "loadCurrentJobs : close update statement")
err = tx.Commit()
logOnError(err, "loadCurrentJobs : commit")
return jobs, nil
}