test jobs main shops slaves

This commit is contained in:
shoopea 2020-02-07 10:54:01 +08:00
parent 7aac758ac0
commit 38fdbb8c32
4 changed files with 139 additions and 10 deletions

1
bot.go
View File

@ -791,6 +791,7 @@ func botShops(m *ChatWarsMessage) {
Status: 0,
ChatID64: m.ChatID64,
MsgID64: m.ID64,
Msgs: make([]ChatWarsMessage, 0),
}
b, err := json.Marshal(j)
logOnError(err, "botShops : Marshal")

14
def.go
View File

@ -367,6 +367,11 @@ type ChatWarsMessageTributesStatsAck struct {
Tributes []ChatWarsTribute `json:"tributes"`
}
type ChatWarsMessageShopMainReq struct {
Msg *ChatWarsMessage `json:"msg"`
Link string `json:"link"`
}
type ChatWarsMessageShopMainAck struct {
Msg *ChatWarsMessage `json:"msg"`
Name string `json:"name"`
@ -558,11 +563,10 @@ type JobPayloadGetVault struct {
}
type JobPayloadShops struct {
MsgID64 int64 `json:"msg_id"`
ChatID64 int64 `json:"chat_id"`
Status int64 `json:"status"`
ShopMainAckMsg []ChatWarsMessage `json:"shop_main_ack_msg"`
CleanupMsg []ChatWarsMessage `json:"cleanup_msg"`
MsgID64 int64 `json:"msg_id"`
ChatID64 int64 `json:"chat_id"`
Status int64 `json:"status"`
Msgs []ChatWarsMessage `json:"msgs"`
}
type JobPayloadShopsSlave struct {

123
job.go
View File

@ -154,7 +154,23 @@ func setJobPayloadJSON(jobID64 int64, payload interface{}) error {
return setJobPayload(jobID64, b)
}
func setJobPayloadJSONUnsafe(jobID64 int64, payload interface{}) error {
b, err := json.Marshal(payload)
logOnError(err, "setJobPayloadJSONUnsafe")
if err != nil {
return err
}
return setJobPayloadUnsafe(jobID64, b)
}
func setJobPayload(jobID64 int64, payload []byte) error {
muxObjJob.Lock()
return setJobPayloadUnsafe(jobID64, payload)
defer muxObjJob.Unlock()
}
func setJobPayloadUnsafe(jobID64 int64, payload []byte) error {
var (
zb bytes.Buffer
zpayload []byte
@ -190,11 +206,9 @@ func setJobPayload(jobID64 int64, payload []byte) error {
return err
}
muxObjJob.Lock()
j := cacheObjJob[jobID64]
j.Payload = payload
cacheObjJob[jobID64] = j
muxObjJob.Unlock()
//log.Printf("setJobPayload[%d] : %s\n", jobID64, string(payload))
@ -202,9 +216,13 @@ func setJobPayload(jobID64 int64, payload []byte) error {
}
func getJobPayload(jobID64 int64) []byte {
var b []byte
muxObjJob.Lock()
defer muxObjJob.Unlock()
return getJobPayloadUnsafe(jobID64)
}
func getJobPayloadUnsafe(jobID64 int64) []byte {
var b []byte
if j, ok := cacheObjJob[jobID64]; ok {
b = j.Payload
return b
@ -2180,16 +2198,111 @@ func jobCheckVaultLimit(j Job) {
}
func jobShops(j Job) {
var p JobPayloadShops
err := setJobDone(j.ID64)
err := setJobStart(j.ID64)
logOnError(err, "jobShops : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobShops : Unmarshal payload")
c := TGCommand{
Type: commandReplyMsg,
Text: "Done",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
err = setJobDone(j.ID64)
logOnError(err, "jobShops : setJobDone")
return
}
func jobShopsSlave(j Job) {
var p JobPayloadShopsSlave
err := setJobDone(j.ID64)
err := setJobStart(j.ID64)
logOnError(err, "jobShopsSlave : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobShopsSlave : Unmarshal payload")
if j.Trigger != 0 {
id, err := getObjSubTypeId(j.Trigger)
logOnError(err, "jobShopsSlave : getObjSubType("+strconv.FormatInt(j.Trigger, 10)+")")
if err == nil {
m, err := getObjMsg(j.Trigger)
logOnError(err, "jobShopsSlave : getObjMsg")
rule, err := getMsgParsingRule(m)
logOnError(err, "jobShopsSlave : getMsgParsingRule")
switch id {
case cacheObjSubType[`msg_shop_main_req`]:
cwm, err := parseSubTypeMessageShopMainReq(m, rule.re)
logOnError(err, "jobShopsSlave : parseSubTypeMessageShopMainReq")
muxObjJob.Lock()
b2 := getJobPayloadUnsafe(p.JobCallbackID64)
var p2 JobPayloadShops
err = json.Unmarshal(b2, &p2)
p2.Msgs = append(p2.Msgs, m)
err = setJobPayloadJSONUnsafe(p.JobCallbackID64, p2)
logOnError(err, "jobShopsSlave : setJobPayloadJSONUnsafe")
muxObjJob.Unlock()
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC()) // reschedule so that it gets picked up by shop main ack callback
return
case cacheObjSubType[`msg_shop_main_ack`]:
cwm, err := parseSubTypeMessageShopMainAck(m, rule.re)
logOnError(err, "jobShopsSlave : parseSubTypeMessageShopMainAck")
muxObjJob.Lock()
b2 := getJobPayloadUnsafe(p.JobCallbackID64)
var p2 JobPayloadShops
err = json.Unmarshal(b2, &p2)
p2.Msgs = append(p2.Msgs, m)
err = setJobPayloadJSONUnsafe(p.JobCallbackID64, p2)
muxObjJob.Unlock()
for i, link := range p.Shops {
if cwm.Link == link {
p.Shops = append(p.Shops[:i], p.Shops[i+1:])
break
}
}
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobShopsSlave : setJobPayloadJSON")
default:
}
}
}
if len(p.Shops) != 0 {
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC()) // reschedule so that it gets picked up by shop main ack callback
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_shop_main_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_shop_main_ack`])
clientSendCWMsgDelay(j.UserID64, fmt.Sprintf("/ws_%s", p.Shops[0]), 4)
return
}
// we update the master status now that the slave job is done
muxObjJob.Lock()
b2 := getJobPayloadUnsafe(p.JobCallbackID64)
var p2 JobPayloadShops
err = json.Unmarshal(b2, &p2)
p2.Status += 1
err = setJobPayloadJSONUnsafe(p.JobCallbackID64, p2)
muxObjJob.Unlock()
// if last job to finish then we wake up the master
if p2.Status == p.Slaves {
rescheduleJob(j.JobCallbackID64, 0, time.Now().UTC())
}
err = setJobDone(j.ID64)
logOnError(err, "jobShopsSlave : setJobDone")
return

11
msg.go
View File

@ -298,6 +298,17 @@ func parseSubTypeMessageTributesStatsAck(m *ChatWarsMessage, r *regexp.Regexp) (
return &cwm, nil
}
func parseSubTypeMessageShopMainReq(m *ChatWarsMessage, r *regexp.Regexp) (*ChatWarsMessageShopMainReq, error) {
cwm := ChatWarsMessageShopMainReq{}
cwm.Msg = m
cwm.Link = r.ReplaceAllString(m.Text, "${Link}")
log.Printf("parseSubTypeMessageShopMainReq : Link : %s\n", cwm.Link)
return &cwm, nil
}
func parseSubTypeMessageShopMainAck(m *ChatWarsMessage, r *regexp.Regexp) (*ChatWarsMessageShopMainAck, error) {
cwm := ChatWarsMessageShopMainAck{}
cwm.Msg = m