From baec19650a3ff13ae6b8c4309ddde2158cdeca75 Mon Sep 17 00:00:00 2001 From: shoopea Date: Thu, 1 Aug 2019 11:49:05 +0800 Subject: [PATCH] error handling for jobs loading --- job.go | 18 ++++++++++++++++++ workers.go | 14 +++++++++----- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/job.go b/job.go index 97ff11f..68c0982 100644 --- a/job.go +++ b/job.go @@ -120,13 +120,24 @@ func loadCurrentJobs() ([]Job, error) { _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize) logOnError(err, "loadCurrentJobs : update intial rows") + if err != nil { + return jobs, err + } stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") logOnError(err, "loadCurrentJobs : prepare select statement") + if err != nil { + stmt.Close() + return jobs, err + } rows, err := stmt.Query(r) // rows, err := stmt.Query(time.Now()) logOnError(err, "loadCurrentJobs : query select statement") + if err != nil { + stmt.Close() + return jobs, err + } for rows.Next() { err = rows.Scan(&objId, &jobTypeId, &status, &userID64, &payload) @@ -143,9 +154,16 @@ func loadCurrentJobs() ([]Job, error) { err = rows.Err() logOnError(err, "loadCurrentJobs : scan end rows") rows.Close() + if err != nil { + stmt.Close() + return jobs, err + } err = stmt.Close() logOnError(err, "loadCurrentJobs : close select statement") + if err != nil { + return jobs, err + } return jobs, nil } diff --git a/workers.go b/workers.go index a010fb7..73ac6a4 100644 --- a/workers.go +++ b/workers.go @@ -472,11 +472,15 @@ func SQLJobWorker() { log.Printf("SQLJobWorker : %d jobs.\n", len(jobs)) } */ - for _, j := range jobs { - JobQueue <- j - } - if len(jobs) < SQLJobSliceSize { - time.Sleep(100 * time.Millisecond) + if err == nil { + for _, j := range jobs { + JobQueue <- j + } + if len(jobs) < SQLJobSliceSize { + time.Sleep(100 * time.Millisecond) + } + } else { + time.Sleep(1 * time.Second) } } log.Printf("SQLJobWorker : Closing.")