update job definition

This commit is contained in:
shoopea 2020-01-12 17:11:34 +08:00
parent 1acf0ce5cf
commit fbc31dc7ef
2 changed files with 81 additions and 6 deletions

28
job.go
View File

@ -18,7 +18,13 @@ import (
)
func createJob(jobTypeID64 int64, priority int32, userID64 int64, trigger int64, schedule time.Time, payload []byte) (int64, error) {
if len(payload) > 20000 {
zb := make(bytes.Buffer)
zw := zlib.NewWriter(&zb)
zw.Write(payload)
zw.Close()
zpayload := zb.Bytes()
if len(zpayload) > 20000 {
return 0, errors.New("payload too long")
}
@ -51,7 +57,7 @@ func createJob(jobTypeID64 int64, priority int32, userID64 int64, trigger int64,
}
defer stmt.Close()
_, err = stmt.Exec(objId, priority, userID64, trigger, schedule.UTC(), time.Now().UTC(), time.Unix(maxUnixTimestamp, 0).UTC(), payload)
_, err = stmt.Exec(objId, priority, userID64, trigger, schedule.UTC(), time.Now().UTC(), time.Unix(maxUnixTimestamp, 0).UTC(), zpayload)
logOnError(err, "createJob : insert obj_job")
if err != nil {
return 0, err
@ -158,7 +164,7 @@ func loadCurrentJobs() ([]Job, error) {
userID64 int64
trigger int64
timeout time.Time
payload []byte
zpayload []byte
jobs []Job
)
@ -187,16 +193,28 @@ func loadCurrentJobs() ([]Job, error) {
}
for rows.Next() {
err = rows.Scan(&objId, &jobTypeID64, &trigger, &userID64, &payload, &timeout)
err = rows.Scan(&objId, &jobTypeID64, &trigger, &userID64, &zpayload, &timeout)
logOnError(err, "loadCurrentJobs : scan query rows")
zb := bytes.NewReader(zpayload)
zr, err := zlib.NewReader(zb)
if err != nil {
logOnError(err, "loadCurrentJobs : zlib.NewReader.")
continue
}
b := new(bytes.Buffer)
b.ReadFrom(zb)
payload := b.Bytes()
job := Job{
ID64: objId,
JobTypeID64: jobTypeID64,
Trigger: trigger,
UserID64: userID64,
Payload: payload,
Timeout: timeout,
Payload: payload,
}
jobs = append(jobs, job)
}
err = rows.Err()

59
obj.go
View File

@ -21,9 +21,11 @@ var (
cacheObjSubType map[string]int64
cacheObjItem map[string]ChatWarsItem
cacheObjItemId map[int64]ChatWarsItem
muxObjItem sync.Mutex
cacheObjItemId map[int64]ChatWarsItem
muxObjItemId sync.Mutex
cacheObjJob map[int64]Job
muxObjJob sync.Mutex
)
func initCache() {
@ -58,6 +60,10 @@ func initCache() {
err = loadObjMsg()
logOnError(err, "initCache : caching msgs")
log.Println("Caching jobs ..")
err = loadObjJob()
logOnError(err, "initCache : caching jobs")
}
func loadObjType() error {
@ -1005,3 +1011,54 @@ func loadObjItem() error {
return nil
}
func loadObjJob() error {
var (
id int64
type_id int64
trigger int64
timeout time.Time
user int64
zpayload []byte
)
muxObjJob.Lock()
cacheObjJob = make(map[int64]Job)
muxObjItem.Unlock()
jobs, err := db.Query(`SELECT o.id, o.obj_sub_type_id, oj.trigger_id, oj.timeout, oj.user_id, oj.payload FROM obj o, obj_job oj WHERE o.id = oj.obj_id;;`)
if err != nil {
return err
}
defer jobs.Close()
for jobs.Next() {
err = jobs.Scan(&id, &type_id, &trigger, &timeout, &user, &zpayload)
if err != nil {
return err
}
j := new(Job)
j.ID64 = id
j.JobTypeID64 = type_id
j.Trigger = trigger
j.Timeout = timeout
j.UserID64 = user
zb := bytes.NewReader(zpayload)
zr, err := zlib.NewReader(zb)
if err != nil {
logOnError(err, "loadObjJob : zlib.NewReader.")
continue
}
b := new(bytes.Buffer)
b.ReadFrom(zb)
payload := b.Bytes()
j.Payload = payload
muxObjJob.Lock()
cacheObjJob[id] = *j
muxObjJob.Unlock()
}
return nil
}