error handling for jobs loading

This commit is contained in:
shoopea 2019-08-01 11:49:05 +08:00
parent 6dde8d308b
commit baec19650a
2 changed files with 27 additions and 5 deletions

18
job.go
View File

@ -120,13 +120,24 @@ func loadCurrentJobs() ([]Job, error) {
_, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize) _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize)
logOnError(err, "loadCurrentJobs : update intial rows") logOnError(err, "loadCurrentJobs : update intial rows")
if err != nil {
return jobs, err
}
stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;")
logOnError(err, "loadCurrentJobs : prepare select statement") logOnError(err, "loadCurrentJobs : prepare select statement")
if err != nil {
stmt.Close()
return jobs, err
}
rows, err := stmt.Query(r) rows, err := stmt.Query(r)
// rows, err := stmt.Query(time.Now()) // rows, err := stmt.Query(time.Now())
logOnError(err, "loadCurrentJobs : query select statement") logOnError(err, "loadCurrentJobs : query select statement")
if err != nil {
stmt.Close()
return jobs, err
}
for rows.Next() { for rows.Next() {
err = rows.Scan(&objId, &jobTypeId, &status, &userID64, &payload) err = rows.Scan(&objId, &jobTypeId, &status, &userID64, &payload)
@ -143,9 +154,16 @@ func loadCurrentJobs() ([]Job, error) {
err = rows.Err() err = rows.Err()
logOnError(err, "loadCurrentJobs : scan end rows") logOnError(err, "loadCurrentJobs : scan end rows")
rows.Close() rows.Close()
if err != nil {
stmt.Close()
return jobs, err
}
err = stmt.Close() err = stmt.Close()
logOnError(err, "loadCurrentJobs : close select statement") logOnError(err, "loadCurrentJobs : close select statement")
if err != nil {
return jobs, err
}
return jobs, nil return jobs, nil
} }

View File

@ -472,11 +472,15 @@ func SQLJobWorker() {
log.Printf("SQLJobWorker : %d jobs.\n", len(jobs)) log.Printf("SQLJobWorker : %d jobs.\n", len(jobs))
} }
*/ */
for _, j := range jobs { if err == nil {
JobQueue <- j for _, j := range jobs {
} JobQueue <- j
if len(jobs) < SQLJobSliceSize { }
time.Sleep(100 * time.Millisecond) if len(jobs) < SQLJobSliceSize {
time.Sleep(100 * time.Millisecond)
}
} else {
time.Sleep(1 * time.Second)
} }
} }
log.Printf("SQLJobWorker : Closing.") log.Printf("SQLJobWorker : Closing.")