This commit is contained in:
shoopea 2019-05-16 20:39:12 +08:00
parent 55fcdc3f17
commit c7db99ce7d
5 changed files with 50 additions and 15 deletions

7
def.go
View File

@ -82,23 +82,18 @@ type Job struct {
} }
type JobPayloadPillage struct { type JobPayloadPillage struct {
UserID64 int64 `json:"user_id"`
} }
type JobPayloadTribute struct { type JobPayloadTribute struct {
UserID64 int64 `json:"user_id"`
} }
type JobPayloadStatus struct { type JobPayloadStatus struct {
UserID64 int64 `json:"user_id"`
} }
type JobPayloadWithdrawal struct { type JobPayloadWithdrawal struct {
UserID64 int64 `json:"user_id"`
} }
type JobPayloadGStock struct { type JobPayloadGStock struct {
UserID64 int64 `json:"user_id"`
} }
type JobPayloadRescanMsg struct { type JobPayloadRescanMsg struct {
@ -157,6 +152,7 @@ const (
objSubTypeMessageAuctionUpdAck = 318 objSubTypeMessageAuctionUpdAck = 318
objSubTypeMessageTimeAck = 319 objSubTypeMessageTimeAck = 319
objSubTypeMessageTimeReq = 320 objSubTypeMessageTimeReq = 320
objSubTypeMessageGo = 321
objSubTypeJobPillage = 601 objSubTypeJobPillage = 601
objSubTypeJobTribute = 602 objSubTypeJobTribute = 602
objSubTypeJobStatus = 603 objSubTypeJobStatus = 603
@ -185,6 +181,7 @@ const (
SQLIdentifyMsgWorkers = 6 SQLIdentifyMsgWorkers = 6
SQLJobWorkers = 3 SQLJobWorkers = 3
TGCmdWorkers = 3 TGCmdWorkers = 3
MQTGCmdWorkers = 3
SQLJobSliceSize = 25 SQLJobSliceSize = 25
) )

15
job.go
View File

@ -91,3 +91,18 @@ func jobSetDone(j Job) {
return return
} }
func jobPillage(j Job) {
var r JobPayloadPillage
err := setJobStart(j.ID64)
logOnError(err, "jobPillage : setJobStart")
err = json.Unmarshal(j.Payload, &r)
logOnError(err, "jobPillage : Unmarshal payload")
err = setJobDone(j.ID64)
logOnError(err, "jobPillage : setJobDone")
return
}

View File

@ -105,12 +105,13 @@ func main() {
MQCWMsgQueue = make(chan ChatWarsMessage, 100) MQCWMsgQueue = make(chan ChatWarsMessage, 100)
SQLMsgIdentifyQueue = make(chan int64, 100) SQLMsgIdentifyQueue = make(chan int64, 100)
TGCmdQueue = make(chan TGCommand, 100) TGCmdQueue = make(chan TGCommand, 100)
MQTGCmdQueue = make(chan TGCommand, 100)
for w := 1; w <= MQGetMsgWorkers; w++ { for w := 1; w <= MQGetMsgWorkers; w++ {
go MQGetMsgWorker(w, MQCWMsgQueue) go MQGetMsgWorker(w, MQCWMsgQueue)
} }
for w := 1; w <= SQLCWMsgWorkers; w++ { for w := 1; w <= SQLCWMsgWorkers; w++ {
go SQLCWMsgWorker(w, MQCWMsgQueue, SQLMsgIdentifyQueue) go SQLCWMsgWorker(w, MQCWIncMsgQueue, SQLMsgIdentifyQueue)
} }
for w := 1; w <= SQLIdentifyMsgWorkers; w++ { for w := 1; w <= SQLIdentifyMsgWorkers; w++ {
go SQLIdentifyMsgWorker(w, SQLMsgIdentifyQueue) go SQLIdentifyMsgWorker(w, SQLMsgIdentifyQueue)
@ -121,6 +122,9 @@ func main() {
for w := 1; w <= TGCmdWorkers; w++ { for w := 1; w <= TGCmdWorkers; w++ {
go TGCmdWorker(w, b, TGCmdQueue) go TGCmdWorker(w, b, TGCmdQueue)
} }
for w := 1; w <= MQTGCmdWorkers; w++ {
go MQTGCmdWorker(w, MQTGCmdQueue)
}
log.Println("Bot started !") log.Println("Bot started !")

4
sql.go
View File

@ -1375,7 +1375,7 @@ func putUnprocessedMsg(m ChatWarsMessage) (int64, error) {
func getMsg(objId int64) (*ChatWarsMessage, error) { func getMsg(objId int64) (*ChatWarsMessage, error) {
var m *ChatWarsMessage var m *ChatWarsMessage
stmt, err := db.Prepare(`SELECT om.msg_id, om.chat_id, om.sender_user_id, om.date, om.text FROM obj_msg om WHERE om.obj_id = ?`) stmt, err := db.Prepare(`SELECT om.msg_id, om.chat_id, om.user_id, om.sender_user_id, om.date, om.text FROM obj_msg om WHERE om.obj_id = ?`)
if err != nil { if err != nil {
return m, err return m, err
} }
@ -1383,7 +1383,7 @@ func getMsg(objId int64) (*ChatWarsMessage, error) {
m = new(ChatWarsMessage) m = new(ChatWarsMessage)
err = stmt.QueryRow(objId).Scan(&m.ID64, &m.ChatID64, &m.SenderUserID64, &m.Date, &m.Text) err = stmt.QueryRow(objId).Scan(&m.ID64, &m.ChatID64, &m.UserID64, &m.SenderUserID64, &m.Date, &m.Text)
if err != nil { if err != nil {
return m, err return m, err
} }

View File

@ -110,15 +110,22 @@ func SQLIdentifyMsgWorker(id int, objIds <-chan int64) {
err = setObjSubTypeId(objId, objSubTypeMessageMiniWar) err = setObjSubTypeId(objId, objSubTypeMessageMiniWar)
logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : setObjSubTypeId(MiniWar)") logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : setObjSubTypeId(MiniWar)")
case objSubTypeMessagePillageInc: case objSubTypeMessagePillageInc:
/* cwm, err := parseSubTypeMessagePillageInc(m, r)
cwm, err := parseSubTypeMessagePillageInc(m, r) logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Parsing objSubTypeMessageMiniWar.")
logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Parsing objSubTypeMessageMiniWar.") cwm.ObjID64 = objId
cwm.ObjID64 = objId err = insertMsgPillageInc(cwm)
err = insertMsgPillageInc(cwm) logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : insertMsgMiniWar")
logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : insertMsgMiniWar")
*/
err = setObjSubTypeId(objId, objSubTypeMessagePillageInc) err = setObjSubTypeId(objId, objSubTypeMessagePillageInc)
logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : setObjSubTypeId(PillageInc)") logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : setObjSubTypeId(PillageInc)")
err = createJob(objSubTypeJobPillage, objJobPriority, m.UserID64, time.Now().Add((25+rand.Intn(35))*time.Second))
logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : createJob(JobPillage)")
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("Catching pillage (%s)", m.Date.Format(time.RFC3339)),
ToChatID64: m.UserID64,
}
TGCmdQueue <- s
case objSubTypeMessageGo:
case objSubTypeMessageAuctionAnnounce: case objSubTypeMessageAuctionAnnounce:
cwm := ChatWarsMessageAuctionAnnounce{ cwm := ChatWarsMessageAuctionAnnounce{
ObjID64: objId, ObjID64: objId,
@ -201,3 +208,15 @@ func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) {
} }
log.Printf("TGCmdWorker[" + strconv.Itoa(id) + "] : Closing.") log.Printf("TGCmdWorker[" + strconv.Itoa(id) + "] : Closing.")
} }
func MQTGCmdWorker(id int, msgs <-chan TGCommand) {
log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.")
for c := range cmds {
j, err := json.Marshal(c)
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
}
log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.")
}