From fbc31dc7efb34e1ef20dfb84abf9ac9e5d96d3af Mon Sep 17 00:00:00 2001 From: shoopea Date: Sun, 12 Jan 2020 17:11:34 +0800 Subject: [PATCH] update job definition --- job.go | 28 +++++++++++++++++++++++----- obj.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 6 deletions(-) diff --git a/job.go b/job.go index 6f51bd0..2e3768e 100644 --- a/job.go +++ b/job.go @@ -18,7 +18,13 @@ import ( ) func createJob(jobTypeID64 int64, priority int32, userID64 int64, trigger int64, schedule time.Time, payload []byte) (int64, error) { - if len(payload) > 20000 { + zb := make(bytes.Buffer) + zw := zlib.NewWriter(&zb) + zw.Write(payload) + zw.Close() + zpayload := zb.Bytes() + + if len(zpayload) > 20000 { return 0, errors.New("payload too long") } @@ -51,7 +57,7 @@ func createJob(jobTypeID64 int64, priority int32, userID64 int64, trigger int64, } defer stmt.Close() - _, err = stmt.Exec(objId, priority, userID64, trigger, schedule.UTC(), time.Now().UTC(), time.Unix(maxUnixTimestamp, 0).UTC(), payload) + _, err = stmt.Exec(objId, priority, userID64, trigger, schedule.UTC(), time.Now().UTC(), time.Unix(maxUnixTimestamp, 0).UTC(), zpayload) logOnError(err, "createJob : insert obj_job") if err != nil { return 0, err @@ -158,7 +164,7 @@ func loadCurrentJobs() ([]Job, error) { userID64 int64 trigger int64 timeout time.Time - payload []byte + zpayload []byte jobs []Job ) @@ -187,16 +193,28 @@ func loadCurrentJobs() ([]Job, error) { } for rows.Next() { - err = rows.Scan(&objId, &jobTypeID64, &trigger, &userID64, &payload, &timeout) + err = rows.Scan(&objId, &jobTypeID64, &trigger, &userID64, &zpayload, &timeout) logOnError(err, "loadCurrentJobs : scan query rows") + + zb := bytes.NewReader(zpayload) + zr, err := zlib.NewReader(zb) + if err != nil { + logOnError(err, "loadCurrentJobs : zlib.NewReader.") + continue + } + b := new(bytes.Buffer) + b.ReadFrom(zb) + payload := b.Bytes() + job := Job{ ID64: objId, JobTypeID64: jobTypeID64, Trigger: trigger, UserID64: userID64, - Payload: payload, Timeout: timeout, + Payload: payload, } + jobs = append(jobs, job) } err = rows.Err() diff --git a/obj.go b/obj.go index 9fa4332..311d52c 100644 --- a/obj.go +++ b/obj.go @@ -21,9 +21,11 @@ var ( cacheObjSubType map[string]int64 cacheObjItem map[string]ChatWarsItem - cacheObjItemId map[int64]ChatWarsItem muxObjItem sync.Mutex + cacheObjItemId map[int64]ChatWarsItem muxObjItemId sync.Mutex + cacheObjJob map[int64]Job + muxObjJob sync.Mutex ) func initCache() { @@ -58,6 +60,10 @@ func initCache() { err = loadObjMsg() logOnError(err, "initCache : caching msgs") + log.Println("Caching jobs ..") + err = loadObjJob() + logOnError(err, "initCache : caching jobs") + } func loadObjType() error { @@ -1005,3 +1011,54 @@ func loadObjItem() error { return nil } + +func loadObjJob() error { + var ( + id int64 + type_id int64 + trigger int64 + timeout time.Time + user int64 + zpayload []byte + ) + + muxObjJob.Lock() + cacheObjJob = make(map[int64]Job) + muxObjItem.Unlock() + + jobs, err := db.Query(`SELECT o.id, o.obj_sub_type_id, oj.trigger_id, oj.timeout, oj.user_id, oj.payload FROM obj o, obj_job oj WHERE o.id = oj.obj_id;;`) + if err != nil { + return err + } + defer jobs.Close() + + for jobs.Next() { + err = jobs.Scan(&id, &type_id, &trigger, &timeout, &user, &zpayload) + if err != nil { + return err + } + j := new(Job) + j.ID64 = id + j.JobTypeID64 = type_id + j.Trigger = trigger + j.Timeout = timeout + j.UserID64 = user + + zb := bytes.NewReader(zpayload) + zr, err := zlib.NewReader(zb) + if err != nil { + logOnError(err, "loadObjJob : zlib.NewReader.") + continue + } + b := new(bytes.Buffer) + b.ReadFrom(zb) + payload := b.Bytes() + j.Payload = payload + + muxObjJob.Lock() + cacheObjJob[id] = *j + muxObjJob.Unlock() + } + + return nil +}