From f07c33e9c22cce153de587085b279d6f5f7b1276 Mon Sep 17 00:00:00 2001 From: shoopea Date: Sat, 17 Aug 2019 14:38:37 +0800 Subject: [PATCH] first callbacks review --- job.go | 10 ++++++++-- main.go | 3 ++- workers.go | 12 +++++++----- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/job.go b/job.go index 8e302cc..0b2ce07 100644 --- a/job.go +++ b/job.go @@ -55,7 +55,8 @@ func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Ti } func createJobCallback(jobTypeID int32, userID64 int64, msgTypeID64 int64, payload []byte) error { - jobID64, err := createJob(jobTypeID, userID64, time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00"), payload) + t, err := time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00") + jobID64, err := createJob(jobTypeID, objJobPriority, userID64, t, payload) if err != nil { return err } @@ -65,7 +66,12 @@ func createJobCallback(jobTypeID int32, userID64 int64, msgTypeID64 int64, paylo func setJobCallback(jobID64 int64, userID64 int64, msgTypeID64 int64) { muxCallbacks.Lock() - callbacks = append(callbacks[userID64][msgTypeID64], jobID64) + if _, ok := callbacks[userID64]; !ok { + callbacks[userID64] = make(map[int64][]int) + } + s := callbacks[userID64][msgTypeID64] + s = append(s, jobID64) + callbacks[userID64][msgTypeID64] = s muxCallbacks.Unlock() } diff --git a/main.go b/main.go index 02613ab..cca1486 100644 --- a/main.go +++ b/main.go @@ -64,7 +64,7 @@ var ( clients map[int64]*ChirpClient muxClients sync.RWMutex - callbacks map[int64][int64][]int64 + callbacks map[int64]map[int64][]int64 muxCallbacks sync.Mutex ) @@ -155,6 +155,7 @@ func main() { MQTGCmdQueue = make(chan TGCommand, MQTGCmdQueueSize) JobQueue = make(chan Job, JobQueueSize) clients = make(map[int64]*ChirpClient) + callbacks = make(map[int64]map[int64][]int64) for w := 1; w <= MQGetMsgWorkers; w++ { go MQGetMsgWorker(w, MQCWMsgQueue) diff --git a/workers.go b/workers.go index 3cca178..93092bb 100644 --- a/workers.go +++ b/workers.go @@ -469,12 +469,14 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) { log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Unknwon message type in rule %d : %d (%d)\n%s\n", msgParsingRules[i].ID, msgParsingRules[i].MsgTypeID, objId, m.Text) } muxCallbacks.Lock() - if len(callbacks[m.TGUserID64][msgParsingRules[i].MsgTypeID]) > 0 { - for jobID64 := range callbacks[m.TGUserID64][msgParsingRules[i].MsgTypeID] { - err := rescheduleJob(jobID64, objJobStatusNew, time.Now().UTC()) - logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : callbacks triggering") + if mc1, mok1 := callbacks[m.TGUserID64]; mok1 { + if mc2, mok2 := mc1[msgParsingRules[i].MsgTypeID]; mok2 { + for jobID64 := range mc2 { + err := rescheduleJob(jobID64, objJobStatusNew, time.Now().UTC()) + logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : callbacks triggering") + } + mc1[msgParsingRules[i].MsgTypeID] = make([]int64) } - callbacks[m.TGUserID64][msgParsingRules[i].MsgTypeID] = new([]int64) } muxCallbacks.Unlock() }