From c7db99ce7db1ffad8288e8e6b3707f316fbf6a83 Mon Sep 17 00:00:00 2001 From: shoopea Date: Thu, 16 May 2019 20:39:12 +0800 Subject: [PATCH] test --- def.go | 7 ++----- job.go | 15 +++++++++++++++ main.go | 6 +++++- sql.go | 4 ++-- workers.go | 33 ++++++++++++++++++++++++++------- 5 files changed, 50 insertions(+), 15 deletions(-) diff --git a/def.go b/def.go index 570e68a..a1477fe 100644 --- a/def.go +++ b/def.go @@ -82,23 +82,18 @@ type Job struct { } type JobPayloadPillage struct { - UserID64 int64 `json:"user_id"` } type JobPayloadTribute struct { - UserID64 int64 `json:"user_id"` } type JobPayloadStatus struct { - UserID64 int64 `json:"user_id"` } type JobPayloadWithdrawal struct { - UserID64 int64 `json:"user_id"` } type JobPayloadGStock struct { - UserID64 int64 `json:"user_id"` } type JobPayloadRescanMsg struct { @@ -157,6 +152,7 @@ const ( objSubTypeMessageAuctionUpdAck = 318 objSubTypeMessageTimeAck = 319 objSubTypeMessageTimeReq = 320 + objSubTypeMessageGo = 321 objSubTypeJobPillage = 601 objSubTypeJobTribute = 602 objSubTypeJobStatus = 603 @@ -185,6 +181,7 @@ const ( SQLIdentifyMsgWorkers = 6 SQLJobWorkers = 3 TGCmdWorkers = 3 + MQTGCmdWorkers = 3 SQLJobSliceSize = 25 ) diff --git a/job.go b/job.go index 7906d57..afe0361 100644 --- a/job.go +++ b/job.go @@ -91,3 +91,18 @@ func jobSetDone(j Job) { return } + +func jobPillage(j Job) { + var r JobPayloadPillage + + err := setJobStart(j.ID64) + logOnError(err, "jobPillage : setJobStart") + + err = json.Unmarshal(j.Payload, &r) + logOnError(err, "jobPillage : Unmarshal payload") + + err = setJobDone(j.ID64) + logOnError(err, "jobPillage : setJobDone") + + return +} diff --git a/main.go b/main.go index 647a86f..82c8d56 100644 --- a/main.go +++ b/main.go @@ -105,12 +105,13 @@ func main() { MQCWMsgQueue = make(chan ChatWarsMessage, 100) SQLMsgIdentifyQueue = make(chan int64, 100) TGCmdQueue = make(chan TGCommand, 100) + MQTGCmdQueue = make(chan TGCommand, 100) for w := 1; w <= MQGetMsgWorkers; w++ { go MQGetMsgWorker(w, MQCWMsgQueue) } for w := 1; w <= SQLCWMsgWorkers; w++ { - go SQLCWMsgWorker(w, MQCWMsgQueue, SQLMsgIdentifyQueue) + go SQLCWMsgWorker(w, MQCWIncMsgQueue, SQLMsgIdentifyQueue) } for w := 1; w <= SQLIdentifyMsgWorkers; w++ { go SQLIdentifyMsgWorker(w, SQLMsgIdentifyQueue) @@ -121,6 +122,9 @@ func main() { for w := 1; w <= TGCmdWorkers; w++ { go TGCmdWorker(w, b, TGCmdQueue) } + for w := 1; w <= MQTGCmdWorkers; w++ { + go MQTGCmdWorker(w, MQTGCmdQueue) + } log.Println("Bot started !") diff --git a/sql.go b/sql.go index be2fb87..e563ab4 100644 --- a/sql.go +++ b/sql.go @@ -1375,7 +1375,7 @@ func putUnprocessedMsg(m ChatWarsMessage) (int64, error) { func getMsg(objId int64) (*ChatWarsMessage, error) { var m *ChatWarsMessage - stmt, err := db.Prepare(`SELECT om.msg_id, om.chat_id, om.sender_user_id, om.date, om.text FROM obj_msg om WHERE om.obj_id = ?`) + stmt, err := db.Prepare(`SELECT om.msg_id, om.chat_id, om.user_id, om.sender_user_id, om.date, om.text FROM obj_msg om WHERE om.obj_id = ?`) if err != nil { return m, err } @@ -1383,7 +1383,7 @@ func getMsg(objId int64) (*ChatWarsMessage, error) { m = new(ChatWarsMessage) - err = stmt.QueryRow(objId).Scan(&m.ID64, &m.ChatID64, &m.SenderUserID64, &m.Date, &m.Text) + err = stmt.QueryRow(objId).Scan(&m.ID64, &m.ChatID64, &m.UserID64, &m.SenderUserID64, &m.Date, &m.Text) if err != nil { return m, err } diff --git a/workers.go b/workers.go index 97dd491..c3a82d3 100644 --- a/workers.go +++ b/workers.go @@ -110,15 +110,22 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) { err = setObjSubTypeId(objId, objSubTypeMessageMiniWar) logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : setObjSubTypeId(MiniWar)") case objSubTypeMessagePillageInc: - /* - cwm, err := parseSubTypeMessagePillageInc(m, r) - logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Parsing objSubTypeMessageMiniWar.") - cwm.ObjID64 = objId - err = insertMsgPillageInc(cwm) - logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : insertMsgMiniWar") - */ + cwm, err := parseSubTypeMessagePillageInc(m, r) + logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Parsing objSubTypeMessageMiniWar.") + cwm.ObjID64 = objId + err = insertMsgPillageInc(cwm) + logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : insertMsgMiniWar") err = setObjSubTypeId(objId, objSubTypeMessagePillageInc) logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : setObjSubTypeId(PillageInc)") + err = createJob(objSubTypeJobPillage, objJobPriority, m.UserID64, time.Now().Add((25+rand.Intn(35))*time.Second)) + logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : createJob(JobPillage)") + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("Catching pillage (%s)", m.Date.Format(time.RFC3339)), + ToChatID64: m.UserID64, + } + TGCmdQueue <- s + case objSubTypeMessageGo: case objSubTypeMessageAuctionAnnounce: cwm := ChatWarsMessageAuctionAnnounce{ ObjID64: objId, @@ -201,3 +208,15 @@ func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) { } log.Printf("TGCmdWorker[" + strconv.Itoa(id) + "] : Closing.") } + +func MQTGCmdWorker(id int, msgs <-chan TGCommand) { + log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.") + for c := range cmds { + j, err := json.Marshal(c) + logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") + log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) + } + + log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.") + +}