package main import ( "archive/zip" "bytes" "encoding/json" "errors" "fmt" "io/ioutil" "log" "net/http" "regexp" "strconv" "strings" "time" tb "gopkg.in/tucnak/telebot.v2" ) func createJob(jobTypeID int32, priority int32, userID64 int64, trigger int64, schedule time.Time, payload []byte) (int64, error) { if len(payload) > 20000 { return 0, errors.New("payload too long") } stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id) VALUES (? , ?);`) logOnError(err, "createJob : prepare insert obj") if err != nil { return 0, err } defer stmt.Close() res, err := stmt.Exec(objTypeJob, jobTypeID) s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID) logOnError(err, s) if err != nil { return 0, err } objId, err := res.LastInsertId() logOnError(err, "createJob : get last insert Id") if err != nil { return 0, err } stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, trigger_id, seq_nr, schedule, is_done, in_work, inserted, timeout, pulled, started, ended, payload) VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, ?, NULL, NULL, NULL, ?);`) logOnError(err, "createJob : prepare insert obj_job") if err != nil { return 0, err } defer stmt.Close() _, err = stmt.Exec(objId, priority, userID64, trigger, schedule.UTC(), time.Now().UTC(), time.Unix(maxUnixTimestamp, 0).UTC(), payload) logOnError(err, "createJob : insert obj_job") if err != nil { return 0, err } return objId, nil } func createJobCallback(jobTypeID int32, userID64 int64, msgTypeID64 int64, payload []byte, timeout time.Duration) error { //t, err := time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00") jobID64, err := createJob(jobTypeID, objJobPriority, userID64, 0, time.Unix(maxUnixTimestamp, 0).UTC(), payload) if err != nil { return err } setJobCallback(jobID64, userID64, msgTypeID64) err = setJobTimeout(jobID64, timeout) logOnError(err, "createJobCallback : setJobTimeout") return nil } func setJobCallback(jobID64 int64, userID64 int64, msgTypeID64 int64) { muxCallbacks.Lock() if _, ok := callbacks[userID64]; !ok { callbacks[userID64] = make(map[int64][]int64) } s := callbacks[userID64][msgTypeID64] s = append(s, jobID64) callbacks[userID64][msgTypeID64] = s muxCallbacks.Unlock() } func setJobTimeout(jobID64 int64, d time.Duration) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.timeout = ? WHERE j.obj_id = ?;`) logOnError(err, "setJobTimeout : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(time.Now().UTC().Add(d), jobID64) logOnError(err, fmt.Sprintf("setJobTimeout, update obj_job(%d)", jobID64)) if err != nil { return err } return nil } func setJobDone(jobID64 int64) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) logOnError(err, "setJobDone : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(time.Now().UTC(), jobID64) s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobID64) logOnError(err, s) if err != nil { return err } return nil } func setJobStart(jobId int64) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`) logOnError(err, "setJobStart : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(time.Now().UTC(), jobId) s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId) logOnError(err, s) if err != nil { return err } return nil } func rescheduleJob(jobID64 int64, trigger int64, schedule time.Time) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.in_work = 0, j.schedule = ?, j.trigger_id = ? WHERE j.obj_id = ?;`) logOnError(err, "rescheduleJob : prepare update obj_job") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(schedule.UTC(), trigger, jobID64) s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobID64) logOnError(err, s) if err != nil { return err } return nil } func loadCurrentJobs() ([]Job, error) { var ( objId int64 jobTypeId int32 userID64 int64 trigger int64 timeout time.Time payload []byte jobs []Job ) t := time.Now().UTC() r := RndInt64() _, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize) logOnError(err, "loadCurrentJobs : update intial rows") if err != nil { return jobs, err } stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.payload, j.timeout FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") logOnError(err, "loadCurrentJobs : prepare select statement") if err != nil { stmt.Close() return jobs, err } rows, err := stmt.Query(r) // rows, err := stmt.Query(time.Now()) logOnError(err, "loadCurrentJobs : query select statement") if err != nil { stmt.Close() return jobs, err } for rows.Next() { err = rows.Scan(&objId, &jobTypeId, &trigger, &userID64, &payload, &timeout) logOnError(err, "loadCurrentJobs : scan query rows") job := Job{ ID64: objId, JobTypeID: jobTypeId, Trigger: trigger, UserID64: userID64, Payload: payload, Timeout: timeout, } jobs = append(jobs, job) } err = rows.Err() logOnError(err, "loadCurrentJobs : scan end rows") rows.Close() if err != nil { stmt.Close() return jobs, err } err = stmt.Close() logOnError(err, "loadCurrentJobs : close select statement") if err != nil { return jobs, err } return jobs, nil } func jobRescan(j Job) { var p JobPayloadRescanMsg err := setJobStart(j.ID64) logOnError(err, "jobRescan : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobRescan : Unmarshal payload") start := time.Now() milestone := time.Now() ids := getSQLListID64(p.Query) if len(ids) > 1 { txt := fmt.Sprintf("Rescanning %d messages.", len(ids)) m := TGCommand{ Type: commandReplyMsg, Text: txt, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m i := 0 for _, id := range ids { SQLMsgIdentifyQueue <- id i = i + 1 if time.Now().After(milestone.Add(1 * time.Minute)) { //txt := fmt.Sprintf("Rescanned %d/%d messages.", i, len(ids)) m = TGCommand{ Type: commandReplyMsg, Text: fmt.Sprintf("Rescanned %d/%d messages.", i, len(ids)), FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m milestone = time.Now() } } r := JobPayloadSetDone{ JobID64: j.ID64, MsgID64: p.MsgID64, ChatID64: p.ChatID64, Text: fmt.Sprintf("%d messages processed in %s.", len(ids), time.Since(start)), } b, _ := json.Marshal(r) _, err := createJob(objSubTypeJobSetJobDone, objJobPriorityRescanAllMsg, j.UserID64, j.ID64, time.Now().UTC(), b) logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)") } else if len(ids) == 1 { SQLMsgIdentifyQueue <- ids[0] err = setJobDone(j.ID64) logOnError(err, "jobRescan : setJobDone(1)") if p.MsgID64 != 0 || p.ChatID64 != 0 { m := TGCommand{ Type: commandReplyMsg, Text: "One message processed.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m } } else { err = setJobDone(j.ID64) logOnError(err, "jobRescan : setJobDone(0)") if p.MsgID64 != 0 || p.ChatID64 != 0 { m := TGCommand{ Type: commandReplyMsg, Text: "No message processed.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m } } return } func jobSetDone(j Job) { var r JobPayloadSetDone err := setJobStart(j.ID64) logOnError(err, "jobSetDone : setJobStart") err = json.Unmarshal(j.Payload, &r) logOnError(err, "jobSetDone : Unmarshal payload") err = setJobDone(r.JobID64) logOnError(err, "jobSetDone : setJobDone(child)") err = setJobDone(j.ID64) logOnError(err, "jobSetDone : setJobDone") m := TGCommand{ Type: commandReplyMsg, Text: r.Text, FromMsgID64: r.MsgID64, FromChatID64: r.ChatID64, } TGCmdQueue <- m return } func jobPillage(j Job) { var r JobPayloadPillage err := setJobStart(j.ID64) logOnError(err, "jobPillage : setJobStart") err = json.Unmarshal(j.Payload, &r) logOnError(err, "jobPillage : Unmarshal payload") // check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job ids := getSQLListID64(`SELECT ox.id FROM obj ox ,obj_msg omx ,obj op ,obj_msg omp ,obj_job oj WHERE oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` AND omx.user_id = oj.user_id AND omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + ` AND omx.obj_id = ox.id AND ox.obj_sub_type_id in (` + strconv.Itoa(objSubTypeMessagePillageGo) + `, ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + `, ` + strconv.Itoa(objSubTypeMessagePillageLoss) + `, ` + strconv.Itoa(objSubTypeMessagePillageWin) + `) AND op.id = ` + strconv.FormatInt(r.ObjID64, 10) + ` AND omp.obj_id = op.id AND omx.date between omp.date AND ADDTIME(omp.date, '0 0:3:30.000000') ORDER BY CASE ox.obj_sub_type_id WHEN ` + strconv.Itoa(objSubTypeMessagePillageWin) + ` THEN 0 WHEN ` + strconv.Itoa(objSubTypeMessagePillageLoss) + ` THEN 1 WHEN ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` THEN 2 WHEN ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` THEN 3 ELSE 4 END ASC LIMIT 1;`) if len(ids) > 1 { // issue there ? s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not m, err := getObjMsg(ids[0]) logOnError(err, "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)") if err == nil { if m.Date.Add(60 * time.Second).After(time.Now().UTC()) { msgTypeID64, err := getObjSubTypeId(ids[0]) logOnError(err, "jobPillage : getObjSubTypeId") if err == nil { if msgTypeID64 == objSubTypeMessagePillageGo { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if msgTypeID64 == objSubTypeMessagePillageWin { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We avoided a pillage (%s))", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if msgTypeID64 == objSubTypeMessagePillageLoss { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else if msgTypeID64 == objSubTypeMessagePillageTimeout { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } else { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } } } } err = setJobDone(j.ID64) logOnError(err, "jobSetDone : setJobDone") return } // is the job outdated now ? if time.Now().UTC().After(r.Date.Add(time.Minute*3 + time.Second*30)) { // log.Printf("jobPillage :\n\tPillageTime : %s\n\tNowTime : %s\n", r.Date.Format(time.RFC3339), time.Now().UTC().Format(time.RFC3339)) s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("Pillage interception expired"), ToUserID64: j.UserID64, } TGCmdQueue <- s return } s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("No outcome for the pillage yet"), ToUserID64: j.UserID64, } TGCmdQueue <- s //no outcome yet, have we sent a "/go" in the last 30 sec ? ids = getSQLListID64(` select ox.id from obj ox ,obj_msg omx ,obj_job oj where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + ` and omx.user_id = oj.user_id and omx.sender_user_id = oj.user_id and omx.obj_id = ox.id and ox.obj_sub_type_id =` + strconv.Itoa(objSubTypeMessageGo) + ` and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`) if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait m, err := getObjMsg(ids[0]) logOnError(err, "jobPillage : getMsg(objSubTypeMessageGo)") if err == nil { s := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)), ToUserID64: j.UserID64, } TGCmdQueue <- s } err = rescheduleJob(j.ID64, j.Trigger, time.Now().Add(30*time.Second).UTC()) logOnError(err, "jobPillage : rescheduleJob(objSubTypeMessageGo)") } else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec clientSendCWMsg(j.UserID64, "/go") err = rescheduleJob(j.ID64, j.Trigger, time.Now().Add(30*time.Second).UTC()) logOnError(err, "jobPillage : rescheduleJob") } return } func jobMsgRefresh(j Job) { var p JobPayloadMsgRefresh // identify whether the message has been properly refreshed ? create new job ? reschedule same job ? err := setJobStart(j.ID64) logOnError(err, "jobMsgRefresh : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobMsgRefresh : Unmarshal payload") m, err := getObjMsg(p.ObjID64) if err != nil && strings.Compare(err.Error(), `sql: no rows in result set`) == 0 { err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } else { logOnError(err, "jobMsgRefresh : getObjMsg") err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } err = delObj(p.ObjID64) logOnError(err, "jobMsgRefresh : delObj") clientRefreshCWMsg(m.TGUserID64, m.ChatID64, m.ID64) err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } func jobMsgClient(j Job) { var p JobPayloadMsgClient err := setJobStart(j.ID64) logOnError(err, "jobMsgClient : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobMsgClient : Unmarshal payload") if err == nil { clientSendCWMsg(j.UserID64, p.Text) m := TGCommand{ Type: commandReplyMsg, Text: "Message sent.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m } err = setJobDone(j.ID64) logOnError(err, "joMsgClient : setJobDone") return } func jobBackupExport(j Job) { var p JobPayloadBackupExport err := setJobStart(j.ID64) logOnError(err, "jobBackupExport : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobBackupExport : Unmarshal payload") bkp := DataBackup{} start := time.Now() milestone := time.Now() s := new([]ChatWarsMessage) msgs := *s ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`) txt := fmt.Sprintf("Backing up %d messages.", len(ids)) m := TGCommand{ Type: commandReplyMsg, Text: txt, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m i := 0 for _, id := range ids { msg, err := getObjMsg(id) logOnError(err, "jobBackupExport : getMsg") if err == nil { msgs = append(msgs, *msg) } i = i + 1 if time.Now().After(milestone.Add(1 * time.Minute)) { txt := fmt.Sprintf("Exported %d/%d messages.", i, len(ids)) m = TGCommand{ Type: commandReplyMsg, Text: txt, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m milestone = time.Now() } } bkp.Messages = msgs b, err := json.Marshal(bkp) logOnError(err, "jobBackupExport : Marshal") m = TGCommand{ Type: commandReplyMsg, Text: `Compressing archive`, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m zbuf := new(bytes.Buffer) zw := zip.NewWriter(zbuf) zf, err := zw.Create(`backup.json`) logOnError(err, "jobBackupExport : Create") _, err = zf.Write(b) logOnError(err, "jobBackupExport : Write") err = zw.Close() logOnError(err, "jobBackupExport : Close") d := tb.Document{} d.File = tb.FromReader(bytes.NewReader(zbuf.Bytes())) d.FileName = fmt.Sprintf("%s.backup.zip", start.Format("20060102150405")) d.Caption = d.FileName d.MIME = `application/zip` m = TGCommand{ Type: commandReplyMsg, Text: `Export done.`, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m m = TGCommand{ Type: commandSendDocument, Document: d, ToChatID64: p.ChatID64, } TGCmdQueue <- m err = setJobDone(j.ID64) logOnError(err, "jobBackupExport : setJobDone") return } func jobBackupImport(j Job) { var p JobPayloadBackupImport err := setJobStart(j.ID64) logOnError(err, "jobBackupImport : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobBackupImport : Unmarshal payload") resp, err := http.Get(p.URL) logOnError(err, "jobBackupImport : Get") defer resp.Body.Close() buf := new(bytes.Buffer) buf.ReadFrom(resp.Body) m := TGCommand{ Type: commandReplyMsg, Text: "File downloaded.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m z := buf.Bytes() r := bytes.NewReader(z) zr, err := zip.NewReader(r, int64(len(z))) for _, f := range zr.File { if strings.Compare(f.Name, "backup.json") == 0 { rc, err := f.Open() logOnError(err, "jobBackupImport : Open") if err != nil { return } data, err := ioutil.ReadAll(rc) logOnError(err, "jobBackupImport : ReadAll") if err != nil { return } log.Printf("jobBackupImport : %d uncompressed bytes.\n", len(data)) rc.Close() bkp := DataBackup{} err = json.Unmarshal(data, &bkp) logOnError(err, "jobBackupImport : Unmarshal") if err != nil { return } for _, msg := range bkp.Messages { MQCWMsgQueue <- msg } m := TGCommand{ Type: commandReplyMsg, Text: "Backup restored.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m err = setJobDone(j.ID64) logOnError(err, "jobBackupImport : setJobDone") return } } m = TGCommand{ Type: commandReplyMsg, Text: "Not backup file found in archive.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, } TGCmdQueue <- m err = setJobDone(j.ID64) logOnError(err, "jobBackupImport : setJobDone") return } func jobGStock(j Job) { var p JobPayloadGStock var resSize, resCount, alchSize, alchCount, miscSize, miscCount, recSize, recCount, partSize, partCount, otherSize, otherCount, totalSize int64 err := setJobStart(j.ID64) logOnError(err, "jobGStock : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobGStock : Unmarshal payload") fmt.Printf("jobGStock : Progress => %d\n", p.Progress) fmt.Printf("jobGStock : UserID64 => %d\n", j.UserID64) switch p.Progress { case 0: // send /g_stock_res p.Progress = 1 b, _ := json.Marshal(&p) jobID64, err := createJob(objSubTypeJobGStock, objJobPriority, j.UserID64, 0, time.Unix(maxUnixTimestamp, 0).UTC(), b) logOnError(err, "jobGStock : createJob") setJobCallback(jobID64, j.UserID64, objSubTypeMessageGStockAnyAck) setJobCallback(jobID64, j.UserID64, objSubTypeMessageBusy) setJobCallback(jobID64, j.UserID64, objSubTypeMessageBattle) err = setJobTimeout(jobID64, 1*time.Minute) logOnError(err, "jobGStock : setJobTimeout") clientSendCWMsg(j.UserID64, "/g_stock_res") case 1: // send /g_stock_alch msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGStock : getObjMsg msg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGStock : getMsgParsingRule") if rule.MsgTypeID64 == objSubTypeMessageGStockAnyAck { cwm, err := parseSubTypeMessageGStockAnyAck(msg, rule.re) for _, v := range cwm.Stock { p.Stock = append(p.Stock, v) } p.Progress = 2 b, _ := json.Marshal(&p) err = createJobCallback(objSubTypeJobGStock, j.UserID64, objSubTypeMessageGStockAnyAck, b, 1*time.Minute) logOnError(err, "jobGStock : createJobCallback") clientSendCWMsg(j.UserID64, "/g_stock_alch") } else if rule.MsgTypeID64 == objSubTypeMessageBusy || rule.MsgTypeID64 == objSubTypeMessageBattle { m := TGCommand{ Type: commandReplyMsg, Text: "Busy, please retry later.", FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, ParseMode: cmdParseModeHTML, } TGCmdQueue <- m } case 2: // send /g_stock_misc msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGStock : getObjMsg msg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGStock : getMsgParsingRule") cwm, err := parseSubTypeMessageGStockAnyAck(msg, rule.re) for _, v := range cwm.Stock { p.Stock = append(p.Stock, v) } p.Progress = 3 b, _ := json.Marshal(&p) err = createJobCallback(objSubTypeJobGStock, j.UserID64, objSubTypeMessageGStockAnyAck, b, 1*time.Minute) logOnError(err, "jobGStock : createJobCallback") clientSendCWMsg(j.UserID64, "/g_stock_misc") case 3: // send /g_stock_rec msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGStock : getObjMsg msg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGStock : getMsgParsingRule") cwm, err := parseSubTypeMessageGStockAnyAck(msg, rule.re) for _, v := range cwm.Stock { p.Stock = append(p.Stock, v) } p.Progress = 4 b, _ := json.Marshal(&p) err = createJobCallback(objSubTypeJobGStock, j.UserID64, objSubTypeMessageGStockAnyAck, b, 1*time.Minute) logOnError(err, "jobGStock : createJobCallback") clientSendCWMsg(j.UserID64, "/g_stock_rec") case 4: // send /g_stock_parts msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGStock : getObjMsg msg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGStock : getMsgParsingRule") cwm, err := parseSubTypeMessageGStockAnyAck(msg, rule.re) for _, v := range cwm.Stock { p.Stock = append(p.Stock, v) } p.Progress = 5 b, _ := json.Marshal(&p) err = createJobCallback(objSubTypeJobGStock, j.UserID64, objSubTypeMessageGStockAnyAck, b, 1*time.Minute) logOnError(err, "jobGStock : createJobCallback") clientSendCWMsg(j.UserID64, "/g_stock_parts") case 5: // send /g_stock_other msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGStock : getObjMsg msg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGStock : getMsgParsingRule") cwm, err := parseSubTypeMessageGStockAnyAck(msg, rule.re) for _, v := range cwm.Stock { p.Stock = append(p.Stock, v) } p.Progress = 6 b, _ := json.Marshal(&p) err = createJobCallback(objSubTypeJobGStock, j.UserID64, objSubTypeMessageGStockAnyAck, b, 1*time.Minute) logOnError(err, "jobGStock : createJobCallback") clientSendCWMsg(j.UserID64, "/g_stock_other") case 6: // collate everything and reply msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGStock : getObjMsg msg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGStock : getMsgParsingRule") cwm, err := parseSubTypeMessageGStockAnyAck(msg, rule.re) for _, v := range cwm.Stock { p.Stock = append(p.Stock, v) } for _, v := range p.Stock { item, err := getObjItem(v.ItemID64) logOnError(err, "jobGStock : getObjItem") if err == nil { if item.Weight != -1 { totalSize += item.Weight * v.Quantity switch item.ItemTypeID { case objSubTypeItemResource: resSize += item.Weight * v.Quantity resCount += v.Quantity case objSubTypeItemAlch: alchSize += item.Weight * v.Quantity alchCount += v.Quantity case objSubTypeItemMisc: miscSize += item.Weight * v.Quantity miscCount += v.Quantity case objSubTypeItemRecipe: recSize += item.Weight * v.Quantity recCount += v.Quantity case objSubTypeItemPart: partSize += item.Weight * v.Quantity partCount += v.Quantity case objSubTypeItemOther: otherSize += item.Weight * v.Quantity otherCount += v.Quantity } } else { w := TGCommand{ Type: commandSendMsg, Text: fmt.Sprintf("Unknown weight for item : %s - %s\n", item.Code, item.Name), ToUserID64: cfg.Bot.Admin, } TGCmdQueue <- w } } } txt := fmt.Sprintf("Current stock [%d/38000] :\n - Resources : %d (%d)\n - Alchemist : %d (%d)\n - Misc stuff : %d (%d)\n - Recipes : %d (%d)\n - Parts : %d (%d)\n - Other : %d (%d)\n", totalSize, resSize, resCount, alchSize, alchCount, miscSize, miscCount, recSize, recCount, partSize, partCount, otherSize, otherCount) m := TGCommand{ Type: commandReplyMsg, Text: txt, FromMsgID64: p.MsgID64, FromChatID64: p.ChatID64, ParseMode: cmdParseModeHTML, } TGCmdQueue <- m } err = setJobDone(j.ID64) logOnError(err, "jobGStock : setJobDone") return } func jobGDepositForward(j Job) { var p JobPayloadGDepositForward err := setJobStart(j.ID64) logOnError(err, "jobGDepositForward : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobGDepositForward : Unmarshal payload") msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGDepositForward : getObjMsg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGDepositForward : getMsgParsingRule") cwm, err := parseSubTypeMessageGDepositReq(msg, rule.re) if cwm.ItemID64 == p.ItemID64 && cwm.Quantity == p.Quantity { //log.Printf("jobGDepositForward : match (%d / %d).\n", cwm.ItemID64, cwm.Quantity) gDepositForwardMux.Lock() gDepositForwardMsg = append(gDepositForwardMsg, j.Trigger) gDepositForwardMux.Unlock() err = setJobDone(j.ID64) logOnError(err, "jobGDepositForward : setJobDone") } else { //log.Printf("jobGDepositForward : found (%d / %d), expected (%d / %d).\n", cwm.ItemID64, cwm.Quantity, p.ItemID64, p.Quantity) err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC()) logOnError(err, "jobGDepositForward : rescheduleJob") setJobCallback(j.ID64, j.UserID64, objSubTypeMessageGDepositReq) } return } func jobGDeposit(j Job) { var p JobPayloadGDeposit err := setJobStart(j.ID64) logOnError(err, "jobGDeposit : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobGDeposit : Unmarshal payload") if p.Status == 0 { /* handle remaining resources to be stored */ var res, misc, alch, craft, equip bool = false, false, false, false, false var delay time.Duration = 0 * time.Second var b []byte if len(p.ResObjID64) > 0 { for i := range p.ResObjID64 { obj, err := getObjItem(p.ResObjID64[i]) logOnError(err, "jobGDeposit : getObjItem") if err == nil { switch obj.ItemTypeID { case objSubTypeItemResource: res = true case objSubTypeItemAlch: alch = true case objSubTypeItemMisc: misc = true case objSubTypeItemRecipe: craft = true case objSubTypeItemPart: craft = true case objSubTypeItemOther: equip = true case objSubTypeItemUnique: equip = true default: } } } } if res { clientSendCWMsgDelay(p.ChatID64, `πŸ“¦Resources`, delay) p.Status = objSubTypeMessageStockAck b, _ = json.Marshal(&p) err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageStockAck, b, 1*time.Minute) logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2*time.Second } if alch { clientSendCWMsgDelay(p.ChatID64, `βš—οΈAlchemy`, delay) p.Status = 1 // FIXME UPDATE WITH PROPER TYPE b, _ = json.Marshal(&p) err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 1*time.Minute) logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2*time.Second } if misc { clientSendCWMsgDelay(p.ChatID64, `πŸ—ƒMisc`, delay) p.Status = 1 // FIXME UPDATE WITH PROPER TYPE b, _ = json.Marshal(&p) err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 1*time.Minute) logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2*time.Second } if craft { clientSendCWMsgDelay(p.ChatID64, `βš’Crafting`, delay) p.Status = objSubTypeMessageStockAnyAck b, _ = json.Marshal(&p) err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageStockAnyAck, b, 1*time.Minute) logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2*time.Second } if equip { clientSendCWMsgDelay(p.ChatID64, `🏷Equipment`, delay) p.Status = 1 // FIXME UPDATE WITH PROPER TYPE b, _ = json.Marshal(&p) err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 1*time.Minute) logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2*time.Second } return } else if p.Status == 1 { /* handle that one resource from the objSubTypeMessageOrderbookAck msg */ log.Printf("jobGDeposit : 1 : %d.\n", j.Trigger) } else if p.Status == objSubTypeMessageStockAck { //log.Printf("jobGDeposit : objSubTypeMessageStockAck : %d.\n", j.Trigger) msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGDeposit : getObjMsg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGDeposit : getMsgParsingRule") cwm, err := parseSubTypeMessageStockAck(msg, rule.re) for stockIdx := range cwm.Stock { for resIdx := range p.ResObjID64 { if cwm.Stock[stockIdx].ItemID64 == p.ResObjID64[resIdx] { //log.Printf("jobGDeposit : objSubTypeMessageStockAck : Matching ItemID %d (%d).\n", p.ResObjID64[resIdx], cwm.Stock[stockIdx].Quantity) item, _ := getObjItem(p.ResObjID64[resIdx]) clientSendCWMsg(p.ChatID64, fmt.Sprintf("/g_deposit %s %d", item.Code, cwm.Stock[stockIdx].Quantity)) p2 := JobPayloadGDepositForward{ ItemID64: p.ResObjID64[resIdx], Quantity: cwm.Stock[stockIdx].Quantity, } b2, _ := json.Marshal(p2) err = createJobCallback(objSubTypeJobGDepositForward, j.UserID64, objSubTypeMessageGDepositReq, b2, time.Duration(1*time.Minute)) } } } } else if p.Status == objSubTypeMessageStockAnyAck { log.Printf("jobGDeposit : objSubTypeMessageStockAnyAck : %d.\n", j.Trigger) msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGDeposit : getObjMsg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGDeposit : getMsgParsingRule") cwm, err := parseSubTypeMessageStockAnyAck(msg, rule.re) for stockIdx := range cwm.Stock { for resIdx := range p.ResObjID64 { if cwm.Stock[stockIdx].ItemID64 == p.ResObjID64[resIdx] { log.Printf("jobGDeposit : objSubTypeMessageStockAnyAck : Matching ItemID %d (%d).\n", p.ResObjID64[resIdx], cwm.Stock[stockIdx].Quantity) item, _ := getObjItem(p.ResObjID64[resIdx]) clientSendCWMsg(p.ChatID64, fmt.Sprintf("/g_deposit %s %d", item.Code, cwm.Stock[stockIdx].Quantity)) p2 := JobPayloadGDepositForward{ ItemID64: p.ResObjID64[resIdx], Quantity: cwm.Stock[stockIdx].Quantity, } b2, _ := json.Marshal(p2) err = createJobCallback(objSubTypeJobGDepositForward, j.UserID64, objSubTypeMessageGDepositReq, b2, time.Duration(1*time.Minute)) } } } } err = setJobDone(j.ID64) logOnError(err, "jobGDeposit : setJobDone") return } func jobVaultItemStatus(j Job) { var ( p JobPayloadVaultItemStatus itemID64, currentItemID64 int64 user, deposit, withdraw int64 userList, depositList, withdrawList []int64 ) err := setJobStart(j.ID64) logOnError(err, "jobVaultItemStatus : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobVaultItemStatus : Unmarshal payload") stmt := `SELECT x.item_id ,x.user_id ,(SELECT COALESCE(SUM(omv.quantity), 0) FROM obj_msg_vault_v omv WHERE omv.user_id = x.user_id AND omv.item_id = x.item_id AND omv.msg_type_id = ` + strconv.Itoa(objSubTypeMessageGDepositAck) + ` AND omv.chat_id = x.chat_id) deposit ,(SELECT COALESCE(SUM(omv.quantity), 0) FROM obj_msg_vault_v omv WHERE omv.user_id = x.user_id AND omv.item_id = x.item_id AND omv.msg_type_id = ` + strconv.Itoa(objSubTypeMessageWithdrawRcv) + ` AND omv.chat_id = x.chat_id) withdraw FROM (SELECT DISTINCT omv.user_id ,omv.chat_id ,omv.item_id FROM obj_msg_vault_v omv WHERE omv.chat_id = ? AND omv.item_id in (?` + strings.Repeat(",?", len(p.ItemListID64)-1) + `)) x ORDER BY x.user_id ASC;` args := make([]interface{}, len(p.ItemListID64)+1) args[0] = p.DepositChatID64 for i, id := range p.ItemListID64 { args[i+1] = id } rows, err := db.Query(stmt, args...) logOnError(err, "jobVaultItemStatus : Get rows") if err != nil { err = setJobDone(j.ID64) logOnError(err, "jobVaultItemStatus : setJobDone") return } currentItemID64 = 0 for rows.Next() { err = rows.Scan(&itemID64, &user, &deposit, &withdraw) logOnError(err, "jobVaultItemStatus : scan next val") if itemID64 != currentItemID64 { if currentItemID64 != 0 { // display info out := fmt.Sprintf("%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `User`) for i, userId := range userList { logOnError(err, "jobVaultItemStatus : getObjItem") out = fmt.Sprintf("%s%-32d |%6d |%6d |%6d\n", out, userId, depositList[i], withdrawList[i], depositList[i]-withdrawList[i]) } out = fmt.Sprintf("%s", out) c := TGCommand{ Type: commandSendMsg, Text: out, ToChatID64: p.UserID64, ParseMode: cmdParseModeHTML, } TGCmdQueue <- c } currentItemID64 = itemID64 userList = nil depositList = nil withdrawList = nil } userList = append(userList, user) depositList = append(depositList, deposit) withdrawList = append(withdrawList, withdraw) } if currentItemID64 != 0 { // display info out := fmt.Sprintf("%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `User`) for i, userId := range userList { logOnError(err, "jobVaultItemStatus : getObjItem") out = fmt.Sprintf("%s%-32d |%6d |%6d |%6d\n", out, userId, depositList[i], withdrawList[i], depositList[i]-withdrawList[i]) } out = fmt.Sprintf("%s", out) c := TGCommand{ Type: commandSendMsg, Text: out, ToChatID64: p.UserID64, ParseMode: cmdParseModeHTML, } TGCmdQueue <- c } err = rows.Err() logOnError(err, "jobVaultItemStatus : query end") rows.Close() err = setJobDone(j.ID64) logOnError(err, "jobVaultItemStatus : setJobDone") return } func jobVaultUserStatus(j Job) { var ( p JobPayloadVaultUserStatus userID64, currentUserID64 int64 itemID64, deposit, withdraw int64 itemList, depositList, withdrawList []int64 ) err := setJobStart(j.ID64) logOnError(err, "jobVaultUserStatus : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobVaultUserStatus : Unmarshal payload") stmt := `SELECT x.user_id ,x.item_id ,(SELECT COALESCE(SUM(omv.quantity), 0) FROM obj_msg_vault_v omv WHERE omv.user_id = x.user_id AND omv.item_id = x.item_id AND omv.msg_type_id = ` + strconv.Itoa(objSubTypeMessageGDepositAck) + ` AND omv.chat_id = x.chat_id) deposit ,(SELECT COALESCE(SUM(omv.quantity), 0) FROM obj_msg_vault_v omv WHERE omv.user_id = x.user_id AND omv.item_id = x.item_id AND omv.msg_type_id = ` + strconv.Itoa(objSubTypeMessageWithdrawRcv) + ` AND omv.chat_id = x.chat_id) withdraw FROM (SELECT DISTINCT omv.user_id ,omv.chat_id ,omv.item_id FROM obj_msg_vault_v omv WHERE omv.chat_id = ? AND omv.user_id IN (?` + strings.Repeat(",?", len(p.UserListID64)-1) + `) AND omv.item_type_id IN (?` + strings.Repeat(",?", len(p.ItemTypeListID64)-1) + `)) x ORDER BY x.user_id ASC;` args := make([]interface{}, len(p.UserListID64)+len(p.ItemTypeListID64)+1) args[0] = p.DepositChatID64 for i, id := range p.UserListID64 { args[i+1] = id } for i, id := range p.ItemTypeListID64 { args[i+1+len(p.UserListID64)] = id } rows, err := db.Query(stmt, args...) logOnError(err, "jobVaultUserStatus : Get rows") if err != nil { err = setJobDone(j.ID64) logOnError(err, "jobVaultUserStatus : setJobDone") return } currentUserID64 = 0 for rows.Next() { err = rows.Scan(&userID64, &itemID64, &deposit, &withdraw) logOnError(err, "jobVaultUserStatus : scan next val") if userID64 != currentUserID64 { if currentUserID64 != 0 { // display info out := fmt.Sprintf("%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `Item`) for i, itemId := range itemList { item, err := getObjItem(itemId) logOnError(err, "jobVaultUserStatus : getObjItem") out = fmt.Sprintf("%s%-32s |%6d |%6d |%6d\n", out, item.Name, depositList[i], withdrawList[i], depositList[i]-withdrawList[i]) } out = fmt.Sprintf("%s", out) c := TGCommand{ Type: commandSendMsg, Text: out, ToChatID64: p.UserID64, ParseMode: cmdParseModeHTML, } TGCmdQueue <- c } currentUserID64 = userID64 itemList = nil depositList = nil withdrawList = nil } itemList = append(itemList, itemID64) depositList = append(depositList, deposit) withdrawList = append(withdrawList, withdraw) } if currentUserID64 != 0 { //display info out := fmt.Sprintf("%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `Item`) for i, itemId := range itemList { item, err := getObjItem(itemId) logOnError(err, "jobVaultUserStatus : getObjItem") out = fmt.Sprintf("%s%-32s |%6d |%6d |%6d\n", out, item.Name, depositList[i], withdrawList[i], depositList[i]-withdrawList[i]) } out = fmt.Sprintf("%s", out) c := TGCommand{ Type: commandSendMsg, Text: out, ToChatID64: p.UserID64, ParseMode: cmdParseModeHTML, } TGCmdQueue <- c } err = rows.Err() logOnError(err, "jobVaultUserStatus : query end") rows.Close() err = setJobDone(j.ID64) logOnError(err, "jobVaultUserStatus : setJobDone") return } func jobGWithdraw(j Job) { var p JobPayloadGWithdraw err := setJobStart(j.ID64) logOnError(err, "jobGWithdraw : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobGWithdraw : Unmarshal payload") r := regexp.MustCompile("[a-z0-9]+ [0-9]+") for _, l := range r.FindAllStringSubmatch(p.Request, -1) { fmt.Printf("jobGWithdraw : %v\n", l) /* i := getObjItemID(``, l[1]) q, _ := strconv.ParseInt(l[2], 10, 64) */ /* ChatWarsItems b, _ := json.Marshal(p) t := time.Now().UTC() _, err := createJob(objSubTypeJobGDeposit, objJobPriority, int64(m.Chat.ID), 0, t, b) */ } err = setJobDone(j.ID64) logOnError(err, "jobGWithdraw : setJobDone") return } func jobFwdMsg(j Job) { var p JobPayloadFwdMsg err := setJobStart(j.ID64) logOnError(err, "jobFwdMsg : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobFwdMsg : Unmarshal payload") msg, err := getObjMsg(j.Trigger) logOnError(err, "jobFwdMsg : getObjMsg msg") clientFwdCWMsg(j.UserID64, msg.ID64, msg.ChatID64, p.ChatID64) err = setJobDone(j.ID64) logOnError(err, "jobFwdMsg : setJobDone") return } func jobSetDef(j Job) { var p JobPayloadSetDef err := setJobStart(j.ID64) logOnError(err, "jobSetDef : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobSetDef : Unmarshal payload") msg, err := getObjMsg(j.Trigger) logOnError(err, "jobSetDef : getObjMsg msg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobSetDef : getMsgParsingRule") cwm, err := parseSubTypeMessageMeAck(msg, rule.re) if cwm.State == `πŸ›ŒRest` { clientSendCWMsg(j.UserID64, `πŸ›‘Defend`) } err = setJobDone(j.ID64) logOnError(err, "jobSetDef : setJobDone") return } func jobGetHammerTime(j Job) { var p JobPayloadSetDef err := setJobStart(j.ID64) logOnError(err, "jobGetHammerTime : setJobStart") err = json.Unmarshal(j.Payload, &p) logOnError(err, "jobGetHammerTime : Unmarshal payload") msg, err := getObjMsg(j.Trigger) logOnError(err, "jobGetHammerTime : getObjMsg msg") rule, err := getMsgParsingRule(msg) logOnError(err, "jobGetHammerTime : getMsgParsingRule") cwm, err := parseSubTypeMessageTimeAck(msg, rule.re) out := `` if hammerTimeNow(cwm.TimeOfDay, cwm.Weather) { if hammerTimeNext(cwm.TimeOfDay, cwm.WeatherNext) || hammerTimeNext(cwm.TimeOfDay, cwm.Weather) { out = `Perfect weather for the next 2 hours, possibly 4.` } else { out = `Perfect weather only for the next 2 hours.` } c := TGCommand{ Type: commandSendMsg, Text: out, ToChatID64: cfg.Bot.Mainchat, ParseMode: cmdParseModeHTML, } TGCmdQueue <- c } /* } else { if hammerTimeNext(cwm.TimeOfDay, cwm.WeatherNext) || hammerTimeNext(cwm.TimeOfDay, cwm.Weather) { out = `Perfect weather maybe in 2 hours.` } else { out = `No perfect weather in sight for the next 4 hours.` } } */ err = setJobDone(j.ID64) logOnError(err, "jobGetHammerTime : setJobDone") return }