first callbacks review

This commit is contained in:
shoopea 2019-08-17 14:38:37 +08:00
parent e549468f9c
commit f07c33e9c2
3 changed files with 17 additions and 8 deletions

10
job.go
View File

@ -55,7 +55,8 @@ func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Ti
}
func createJobCallback(jobTypeID int32, userID64 int64, msgTypeID64 int64, payload []byte) error {
jobID64, err := createJob(jobTypeID, userID64, time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00"), payload)
t, err := time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00")
jobID64, err := createJob(jobTypeID, objJobPriority, userID64, t, payload)
if err != nil {
return err
}
@ -65,7 +66,12 @@ func createJobCallback(jobTypeID int32, userID64 int64, msgTypeID64 int64, paylo
func setJobCallback(jobID64 int64, userID64 int64, msgTypeID64 int64) {
muxCallbacks.Lock()
callbacks = append(callbacks[userID64][msgTypeID64], jobID64)
if _, ok := callbacks[userID64]; !ok {
callbacks[userID64] = make(map[int64][]int)
}
s := callbacks[userID64][msgTypeID64]
s = append(s, jobID64)
callbacks[userID64][msgTypeID64] = s
muxCallbacks.Unlock()
}

View File

@ -64,7 +64,7 @@ var (
clients map[int64]*ChirpClient
muxClients sync.RWMutex
callbacks map[int64][int64][]int64
callbacks map[int64]map[int64][]int64
muxCallbacks sync.Mutex
)
@ -155,6 +155,7 @@ func main() {
MQTGCmdQueue = make(chan TGCommand, MQTGCmdQueueSize)
JobQueue = make(chan Job, JobQueueSize)
clients = make(map[int64]*ChirpClient)
callbacks = make(map[int64]map[int64][]int64)
for w := 1; w <= MQGetMsgWorkers; w++ {
go MQGetMsgWorker(w, MQCWMsgQueue)

View File

@ -469,12 +469,14 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) {
log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Unknwon message type in rule %d : %d (%d)\n%s\n", msgParsingRules[i].ID, msgParsingRules[i].MsgTypeID, objId, m.Text)
}
muxCallbacks.Lock()
if len(callbacks[m.TGUserID64][msgParsingRules[i].MsgTypeID]) > 0 {
for jobID64 := range callbacks[m.TGUserID64][msgParsingRules[i].MsgTypeID] {
err := rescheduleJob(jobID64, objJobStatusNew, time.Now().UTC())
logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : callbacks triggering")
if mc1, mok1 := callbacks[m.TGUserID64]; mok1 {
if mc2, mok2 := mc1[msgParsingRules[i].MsgTypeID]; mok2 {
for jobID64 := range mc2 {
err := rescheduleJob(jobID64, objJobStatusNew, time.Now().UTC())
logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : callbacks triggering")
}
mc1[msgParsingRules[i].MsgTypeID] = make([]int64)
}
callbacks[m.TGUserID64][msgParsingRules[i].MsgTypeID] = new([]int64)
}
muxCallbacks.Unlock()
}