From 38fdbb8c3298471277a8695acf7e0003bea04e6e Mon Sep 17 00:00:00 2001 From: shoopea Date: Fri, 7 Feb 2020 10:54:01 +0800 Subject: [PATCH] test jobs main shops slaves --- bot.go | 1 + def.go | 14 ++++--- job.go | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- msg.go | 11 ++++++ 4 files changed, 139 insertions(+), 10 deletions(-) diff --git a/bot.go b/bot.go index 549ee23..91c4fe2 100644 --- a/bot.go +++ b/bot.go @@ -791,6 +791,7 @@ func botShops(m *ChatWarsMessage) { Status: 0, ChatID64: m.ChatID64, MsgID64: m.ID64, + Msgs: make([]ChatWarsMessage, 0), } b, err := json.Marshal(j) logOnError(err, "botShops : Marshal") diff --git a/def.go b/def.go index 7c1adb3..368f0d0 100644 --- a/def.go +++ b/def.go @@ -367,6 +367,11 @@ type ChatWarsMessageTributesStatsAck struct { Tributes []ChatWarsTribute `json:"tributes"` } +type ChatWarsMessageShopMainReq struct { + Msg *ChatWarsMessage `json:"msg"` + Link string `json:"link"` +} + type ChatWarsMessageShopMainAck struct { Msg *ChatWarsMessage `json:"msg"` Name string `json:"name"` @@ -558,11 +563,10 @@ type JobPayloadGetVault struct { } type JobPayloadShops struct { - MsgID64 int64 `json:"msg_id"` - ChatID64 int64 `json:"chat_id"` - Status int64 `json:"status"` - ShopMainAckMsg []ChatWarsMessage `json:"shop_main_ack_msg"` - CleanupMsg []ChatWarsMessage `json:"cleanup_msg"` + MsgID64 int64 `json:"msg_id"` + ChatID64 int64 `json:"chat_id"` + Status int64 `json:"status"` + Msgs []ChatWarsMessage `json:"msgs"` } type JobPayloadShopsSlave struct { diff --git a/job.go b/job.go index 17b3e36..9233927 100644 --- a/job.go +++ b/job.go @@ -154,7 +154,23 @@ func setJobPayloadJSON(jobID64 int64, payload interface{}) error { return setJobPayload(jobID64, b) } +func setJobPayloadJSONUnsafe(jobID64 int64, payload interface{}) error { + b, err := json.Marshal(payload) + logOnError(err, "setJobPayloadJSONUnsafe") + if err != nil { + return err + } + + return setJobPayloadUnsafe(jobID64, b) +} + func setJobPayload(jobID64 int64, payload []byte) error { + muxObjJob.Lock() + return setJobPayloadUnsafe(jobID64, payload) + defer muxObjJob.Unlock() +} + +func setJobPayloadUnsafe(jobID64 int64, payload []byte) error { var ( zb bytes.Buffer zpayload []byte @@ -190,11 +206,9 @@ func setJobPayload(jobID64 int64, payload []byte) error { return err } - muxObjJob.Lock() j := cacheObjJob[jobID64] j.Payload = payload cacheObjJob[jobID64] = j - muxObjJob.Unlock() //log.Printf("setJobPayload[%d] : %s\n", jobID64, string(payload)) @@ -202,9 +216,13 @@ func setJobPayload(jobID64 int64, payload []byte) error { } func getJobPayload(jobID64 int64) []byte { - var b []byte muxObjJob.Lock() defer muxObjJob.Unlock() + return getJobPayloadUnsafe(jobID64) +} + +func getJobPayloadUnsafe(jobID64 int64) []byte { + var b []byte if j, ok := cacheObjJob[jobID64]; ok { b = j.Payload return b @@ -2180,16 +2198,111 @@ func jobCheckVaultLimit(j Job) { } func jobShops(j Job) { + var p JobPayloadShops - err := setJobDone(j.ID64) + err := setJobStart(j.ID64) + logOnError(err, "jobShops : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobShops : Unmarshal payload") + + c := TGCommand{ + Type: commandReplyMsg, + Text: "Done", + FromMsgID64: p.MsgID64, + FromChatID64: p.ChatID64, + ParseMode: cmdParseModeHTML, + } + TGCmdQueue <- c + + err = setJobDone(j.ID64) logOnError(err, "jobShops : setJobDone") return } func jobShopsSlave(j Job) { + var p JobPayloadShopsSlave - err := setJobDone(j.ID64) + err := setJobStart(j.ID64) + logOnError(err, "jobShopsSlave : setJobStart") + + err = json.Unmarshal(j.Payload, &p) + logOnError(err, "jobShopsSlave : Unmarshal payload") + + if j.Trigger != 0 { + id, err := getObjSubTypeId(j.Trigger) + logOnError(err, "jobShopsSlave : getObjSubType("+strconv.FormatInt(j.Trigger, 10)+")") + if err == nil { + m, err := getObjMsg(j.Trigger) + logOnError(err, "jobShopsSlave : getObjMsg") + rule, err := getMsgParsingRule(m) + logOnError(err, "jobShopsSlave : getMsgParsingRule") + + switch id { + case cacheObjSubType[`msg_shop_main_req`]: + cwm, err := parseSubTypeMessageShopMainReq(m, rule.re) + logOnError(err, "jobShopsSlave : parseSubTypeMessageShopMainReq") + + muxObjJob.Lock() + b2 := getJobPayloadUnsafe(p.JobCallbackID64) + var p2 JobPayloadShops + err = json.Unmarshal(b2, &p2) + p2.Msgs = append(p2.Msgs, m) + err = setJobPayloadJSONUnsafe(p.JobCallbackID64, p2) + logOnError(err, "jobShopsSlave : setJobPayloadJSONUnsafe") + muxObjJob.Unlock() + + rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC()) // reschedule so that it gets picked up by shop main ack callback + return + case cacheObjSubType[`msg_shop_main_ack`]: + cwm, err := parseSubTypeMessageShopMainAck(m, rule.re) + logOnError(err, "jobShopsSlave : parseSubTypeMessageShopMainAck") + + muxObjJob.Lock() + b2 := getJobPayloadUnsafe(p.JobCallbackID64) + var p2 JobPayloadShops + err = json.Unmarshal(b2, &p2) + p2.Msgs = append(p2.Msgs, m) + err = setJobPayloadJSONUnsafe(p.JobCallbackID64, p2) + muxObjJob.Unlock() + + for i, link := range p.Shops { + if cwm.Link == link { + p.Shops = append(p.Shops[:i], p.Shops[i+1:]) + break + } + } + + err = setJobPayloadJSON(j.ID64, p) + logOnError(err, "jobShopsSlave : setJobPayloadJSON") + default: + } + } + } + if len(p.Shops) != 0 { + rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC()) // reschedule so that it gets picked up by shop main ack callback + setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_shop_main_req`]) + setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_shop_main_ack`]) + clientSendCWMsgDelay(j.UserID64, fmt.Sprintf("/ws_%s", p.Shops[0]), 4) + return + } + + // we update the master status now that the slave job is done + muxObjJob.Lock() + b2 := getJobPayloadUnsafe(p.JobCallbackID64) + var p2 JobPayloadShops + err = json.Unmarshal(b2, &p2) + p2.Status += 1 + err = setJobPayloadJSONUnsafe(p.JobCallbackID64, p2) + muxObjJob.Unlock() + + // if last job to finish then we wake up the master + if p2.Status == p.Slaves { + rescheduleJob(j.JobCallbackID64, 0, time.Now().UTC()) + } + + err = setJobDone(j.ID64) logOnError(err, "jobShopsSlave : setJobDone") return diff --git a/msg.go b/msg.go index eabbdf5..c9fc4f1 100644 --- a/msg.go +++ b/msg.go @@ -298,6 +298,17 @@ func parseSubTypeMessageTributesStatsAck(m *ChatWarsMessage, r *regexp.Regexp) ( return &cwm, nil } +func parseSubTypeMessageShopMainReq(m *ChatWarsMessage, r *regexp.Regexp) (*ChatWarsMessageShopMainReq, error) { + cwm := ChatWarsMessageShopMainReq{} + cwm.Msg = m + + cwm.Link = r.ReplaceAllString(m.Text, "${Link}") + + log.Printf("parseSubTypeMessageShopMainReq : Link : %s\n", cwm.Link) + + return &cwm, nil +} + func parseSubTypeMessageShopMainAck(m *ChatWarsMessage, r *regexp.Regexp) (*ChatWarsMessageShopMainAck, error) { cwm := ChatWarsMessageShopMainAck{} cwm.Msg = m