From 7eabf7a7d746cfed9afacdea728edbbdd6ee3745 Mon Sep 17 00:00:00 2001 From: shoopea Date: Fri, 17 May 2019 16:30:01 +0800 Subject: [PATCH] test --- bot.go | 80 +++++++++++++++++++++++++++++++++++------------------- def.go | 2 ++ workers.go | 18 ++++++++++-- 3 files changed, 69 insertions(+), 31 deletions(-) diff --git a/bot.go b/bot.go index 2f74fb6..12e88fa 100644 --- a/bot.go +++ b/bot.go @@ -18,13 +18,8 @@ func BotHandlers(b *tb.Bot) { } }) - b.Handle("/msg_rescan", func(m *tb.Message) { - s, err := botMsgRescan(m) - logOnError(err, "/msg_rescan") - if err == nil { - b.Send(m.Sender, s) - } - }) + b.Handle("/test", botTest) + b.Handle("/msg_rescan", botMsgRescan) b.Handle("/msg_rescan_all", botMsgRescanAll) b.Handle(tb.OnPhoto, botPhoto) @@ -69,11 +64,41 @@ func botText(m *tb.Message) { // captured by existing handlers } -func botMsgRescan(m *tb.Message) (string, error) { - fmt.Println("botRescanMsg :", m.Text) +func botTest(m *tb.Message) { if !m.Private() { - fmt.Println("botRescanMsg : !m.Private()") - return ``, nil + return + } + if _, ok := clientsKeepAlive[m.Chat.ID]; ok { + c := TGCommand{ + Type: commandSendMsg, + Text: "🏅Me", + FromUserID64: m.Chat.ID, + ToChatID64: userID64ChtWrsBot, + } + MQTGCmdQueue <- c + + c = TGCommand{ + Type: commandReplyMsg, + Text: "Test sent", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } else { + c := TGCommand{ + Type: commandReplyMsg, + Text: "Client not registered", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c + } + return +} + +func botMsgRescan(m *tb.Message) { + if !m.Private() { + return } r := regexp.MustCompile("^[0-9]+$") if r.MatchString(m.Payload) { @@ -87,29 +112,28 @@ func botMsgRescan(m *tb.Message) (string, error) { err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanMsg, int64(m.Sender.ID), time.Now(), b) logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") if err != nil { - return "Error scheduling the rescan for msg #" + m.Payload, nil + c := TGCommand{ + Type: commandReplyMsg, + Text: fmt.Sprint("Error scheduling the rescan for msg #%s", m.Payload), + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c } else { - return "Rescaning msg #" + m.Payload, nil + c := TGCommand{ + Type: commandReplyMsg, + Text: fmt.Sprint("Rescaning msg #%s", m.Payload), "Rescaning all msg scheduled.", + FromMsgID64: int64(m.ID), + FromChatID64: m.Chat.ID, + } + TGCmdQueue <- c } - } r = regexp.MustCompile("^all$") if r.MatchString(m.Payload) { - p := JobPayloadRescanMsg{ - Query: fmt.Sprintf("SELECT o.id FROM obj o WHERE o.obj_type_id = %d AND o.obj_sub_type_id = %d;", objTypeMessage, objSubTypeMessageUnknown), - MsgID64: int64(m.ID), - ChatID64: m.Chat.ID, - } - b, _ := json.Marshal(p) - err := createJob(objSubTypeJobRescanMsg, objJobPriorityRescanAllMsg, int64(m.Sender.ID), time.Now(), b) - logOnError(err, "botMsgRescan : createJob(objSubTypeJobRescanMsg)") - if err != nil { - return "Error scheduling the rescan for all msg", nil - } else { - return "Rescaning all msg scheduled", nil - } + botMsgRescanAll(m) } - return "/msg_rescan msg_id or /msg_rescan all", nil + return } func botMsgRescanAll(m *tb.Message) { diff --git a/def.go b/def.go index f1ea0c6..c4bb13e 100644 --- a/def.go +++ b/def.go @@ -125,6 +125,8 @@ type JobPayloadSetDone struct { } const ( + userID64ChtWrsBot = 408101137 + commandForwardMsg = 1 commandReplyMsg = 2 commandSendMsg = 3 diff --git a/workers.go b/workers.go index 1f10da5..ceabffb 100644 --- a/workers.go +++ b/workers.go @@ -231,9 +231,21 @@ func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) { func MQTGCmdWorker(id int, cmds <-chan TGCommand) { log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.") for c := range cmds { - j, err := json.Marshal(c) - logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") - log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) + if _, ok := clientsKeepAlive[c.FromUserID64]; ok { + j, err := json.Marshal(c) + logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") + log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) + err = clientsQueue[c.FromUserID64].Channel.Publish( + "", // exchange + clientsQueue[c.FromUserID64].Queue.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: []byte(b), + }) + logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.") + } } log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.")