update payload job

This commit is contained in:
shoopea 2020-01-21 14:14:58 +08:00
parent 94a03fdaa6
commit 6c2b48620a
2 changed files with 18 additions and 11 deletions

28
job.go
View File

@ -145,7 +145,7 @@ func setJobPayload(jobID64 int64, payload []byte) error {
return errors.New("payload too long") return errors.New("payload too long")
} }
stmt, err := db.Prepare(`UPDATE obj_job j SET j.payload = ? WHERE j.obj_id = ?;`) stmt, err := db.Prepare(`UPDATE obj_job j SET j.payload = ?, j.zipped = 1 WHERE j.obj_id = ?;`)
logOnError(err, "setJobTimeout : prepare update obj_job") logOnError(err, "setJobTimeout : prepare update obj_job")
if err != nil { if err != nil {
return err return err
@ -227,6 +227,7 @@ func loadCurrentJobs() ([]Job, error) {
userID64 int64 userID64 int64
trigger int64 trigger int64
timeout time.Time timeout time.Time
zipped int
zpayload []byte zpayload []byte
jobs []Job jobs []Job
) )
@ -240,7 +241,7 @@ func loadCurrentJobs() ([]Job, error) {
return jobs, err return jobs, err
} }
stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.payload, j.timeout FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.zipped, j.payload, j.timeout FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;")
logOnError(err, "loadCurrentJobs : prepare select statement") logOnError(err, "loadCurrentJobs : prepare select statement")
if err != nil { if err != nil {
stmt.Close() stmt.Close()
@ -256,18 +257,23 @@ func loadCurrentJobs() ([]Job, error) {
} }
for rows.Next() { for rows.Next() {
err = rows.Scan(&objId, &jobTypeID64, &trigger, &userID64, &zpayload, &timeout) err = rows.Scan(&objId, &jobTypeID64, &trigger, &userID64, &zipped, &zpayload, &timeout)
logOnError(err, "loadCurrentJobs : scan query rows") logOnError(err, "loadCurrentJobs : scan query rows")
zb := bytes.NewReader(zpayload) var payload []byte
zr, err := zlib.NewReader(zb) if zipped > 0 {
if err != nil { zb := bytes.NewReader(zpayload)
logOnError(err, "loadCurrentJobs : zlib.NewReader") zr, err := zlib.NewReader(zb)
continue if err != nil {
logOnError(err, "loadCurrentJobs : zlib.NewReader")
continue
}
b := new(bytes.Buffer)
b.ReadFrom(zr)
payload = b.Bytes()
} else {
payload = zpayload
} }
b := new(bytes.Buffer)
b.ReadFrom(zr)
payload := b.Bytes()
job := Job{ job := Job{
ID64: objId, ID64: objId,

1
sql.go
View File

@ -385,6 +385,7 @@ func initDB() {
,started TIMESTAMP ,started TIMESTAMP
,ended TIMESTAMP ,ended TIMESTAMP
,timeout TIMESTAMP ,timeout TIMESTAMP
,zipped TINYINT NOT NULL
,payload VARBINARY(20000) ,payload VARBINARY(20000)
,FOREIGN KEY (obj_id) REFERENCES obj(id) ON DELETE CASCADE ,FOREIGN KEY (obj_id) REFERENCES obj(id) ON DELETE CASCADE
,KEY (is_done) ,KEY (is_done)