diff --git a/sql.go b/sql.go index 5caa0b5..ca0b537 100644 --- a/sql.go +++ b/sql.go @@ -276,6 +276,7 @@ func initDB() { obj_id BIGINT UNSIGNED NOT NULL ,priority SMALLINT NOT NULL ,user_id BIGINT UNSIGNED NOT NULL + ,sesssion_id SMALLINT ,status SMALLINT NOT NULL ,schedule DATETIME NOT NULL ,is_done TINYINT NOT NULL @@ -1525,8 +1526,8 @@ func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Ti return err } - stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, status, schedule, is_done, in_work, inserted, pulled, started, ended, payload) - VALUES (?, ?, ?, ?, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) + stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, session_id, status, schedule, is_done, in_work, inserted, pulled, started, ended, payload) + VALUES (?, ?, ?, NULL, ?, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) logOnError(err, "createJob : prepare insert obj_job") if err != nil { return err @@ -1622,7 +1623,7 @@ func loadMsgParsingRules() (m map[int]MessageParsingRule, err error) { return m, nil } -func loadCurrentJobs() ([]Job, error) { +func loadCurrentJobs(sid int64) ([]Job, error) { var ( objId int64 jobTypeId int32 @@ -1631,18 +1632,14 @@ func loadCurrentJobs() ([]Job, error) { jobs []Job ) - tx, err := db.Begin() - logOnError(err, "loadCurrentJobs : begin transaction") - if err != nil { - return jobs, err - } - defer tx.Rollback() + _, err := db.Exec("UPDATE obj_job j SET session_id = ?, j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", sid, time.Now(), time.Now(), SQLJobSliceSize) + logOnError(err, "loadCurrentJobs : update intial rows") - stmt, err := tx.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 and j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT 1 FOR UPDATE;") + stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.session_id = ? ORDER BY j.priority ASC, j.obj_id ASC;") logOnError(err, "loadCurrentJobs : prepare select statement") - //rows, err := stmt.Query(time.Now(), SQLJobSliceSize) - rows, err := stmt.Query(time.Now()) + rows, err := stmt.Query(sid) + // rows, err := stmt.Query(time.Now()) logOnError(err, "loadCurrentJobs : query select statement") for rows.Next() { @@ -1663,20 +1660,6 @@ func loadCurrentJobs() ([]Job, error) { err = stmt.Close() logOnError(err, "loadCurrentJobs : close select statement") - stmt, err = tx.Prepare("UPDATE obj_job j SET j.in_work = 1, j.pulled = ? WHERE j.obj_id = ?;") - logOnError(err, "loadCurrentJobs : prepare update statement") - - for _, job := range jobs { - _, err = stmt.Exec(time.Now(), job.ID64) - logOnError(err, "loadCurrentJobs : updating row") - } - - err = stmt.Close() - logOnError(err, "loadCurrentJobs : close update statement") - - err = tx.Commit() - logOnError(err, "loadCurrentJobs : commit") - return jobs, nil }