From 6c2b48620aed9b071d0fe23a3a574e0d07ac87e0 Mon Sep 17 00:00:00 2001 From: shoopea Date: Tue, 21 Jan 2020 14:14:58 +0800 Subject: [PATCH] update payload job --- job.go | 28 +++++++++++++++++----------- sql.go | 1 + 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/job.go b/job.go index 26c08a2..d05c12d 100644 --- a/job.go +++ b/job.go @@ -145,7 +145,7 @@ func setJobPayload(jobID64 int64, payload []byte) error { return errors.New("payload too long") } - stmt, err := db.Prepare(`UPDATE obj_job j SET j.payload = ? WHERE j.obj_id = ?;`) + stmt, err := db.Prepare(`UPDATE obj_job j SET j.payload = ?, j.zipped = 1 WHERE j.obj_id = ?;`) logOnError(err, "setJobTimeout : prepare update obj_job") if err != nil { return err @@ -227,6 +227,7 @@ func loadCurrentJobs() ([]Job, error) { userID64 int64 trigger int64 timeout time.Time + zipped int zpayload []byte jobs []Job ) @@ -240,7 +241,7 @@ func loadCurrentJobs() ([]Job, error) { return jobs, err } - stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.payload, j.timeout FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") + stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.zipped, j.payload, j.timeout FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") logOnError(err, "loadCurrentJobs : prepare select statement") if err != nil { stmt.Close() @@ -256,18 +257,23 @@ func loadCurrentJobs() ([]Job, error) { } for rows.Next() { - err = rows.Scan(&objId, &jobTypeID64, &trigger, &userID64, &zpayload, &timeout) + err = rows.Scan(&objId, &jobTypeID64, &trigger, &userID64, &zipped, &zpayload, &timeout) logOnError(err, "loadCurrentJobs : scan query rows") - zb := bytes.NewReader(zpayload) - zr, err := zlib.NewReader(zb) - if err != nil { - logOnError(err, "loadCurrentJobs : zlib.NewReader") - continue + var payload []byte + if zipped > 0 { + zb := bytes.NewReader(zpayload) + zr, err := zlib.NewReader(zb) + if err != nil { + logOnError(err, "loadCurrentJobs : zlib.NewReader") + continue + } + b := new(bytes.Buffer) + b.ReadFrom(zr) + payload = b.Bytes() + } else { + payload = zpayload } - b := new(bytes.Buffer) - b.ReadFrom(zr) - payload := b.Bytes() job := Job{ ID64: objId, diff --git a/sql.go b/sql.go index e8b0550..b1766b2 100644 --- a/sql.go +++ b/sql.go @@ -385,6 +385,7 @@ func initDB() { ,started TIMESTAMP ,ended TIMESTAMP ,timeout TIMESTAMP + ,zipped TINYINT NOT NULL ,payload VARBINARY(20000) ,FOREIGN KEY (obj_id) REFERENCES obj(id) ON DELETE CASCADE ,KEY (is_done)