chirpnest/job.go
2019-10-13 15:24:11 +08:00

1152 lines
35 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"regexp"
"strconv"
"strings"
"time"
tb "gopkg.in/tucnak/telebot.v2"
)
func createJob(jobTypeID int32, priority int32, userID64 int64, trigger int64, schedule time.Time, payload []byte) (int64, error) {
stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id)
VALUES (? , ?);`)
logOnError(err, "createJob : prepare insert obj")
if err != nil {
return 0, err
}
defer stmt.Close()
res, err := stmt.Exec(objTypeJob, jobTypeID)
s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID)
logOnError(err, s)
if err != nil {
return 0, err
}
objId, err := res.LastInsertId()
logOnError(err, "createJob : get last insert Id")
if err != nil {
return 0, err
}
stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, trigger_id, seq_nr, schedule, is_done, in_work, inserted, timeout, pulled, started, ended, payload)
VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, ?, NULL, NULL, NULL, ?);`)
logOnError(err, "createJob : prepare insert obj_job")
if err != nil {
return 0, err
}
defer stmt.Close()
_, err = stmt.Exec(objId, priority, userID64, trigger, schedule.UTC(), time.Now().UTC(), time.Unix(maxUnixTimestamp, 0).UTC(), payload)
logOnError(err, "createJob : insert obj_job")
if err != nil {
return 0, err
}
return objId, nil
}
func createJobCallback(jobTypeID int32, userID64 int64, msgTypeID64 int64, payload []byte, timeout time.Duration) error {
//t, err := time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00")
jobID64, err := createJob(jobTypeID, objJobPriority, userID64, 0, time.Unix(maxUnixTimestamp, 0).UTC(), payload)
if err != nil {
return err
}
setJobCallback(jobID64, userID64, msgTypeID64)
err = setJobTimeout(jobID64, timeout)
logOnError(err, "createJobCallback : setJobTimeout")
return nil
}
func setJobCallback(jobID64 int64, userID64 int64, msgTypeID64 int64) {
muxCallbacks.Lock()
if _, ok := callbacks[userID64]; !ok {
callbacks[userID64] = make(map[int64][]int64)
}
s := callbacks[userID64][msgTypeID64]
s = append(s, jobID64)
callbacks[userID64][msgTypeID64] = s
muxCallbacks.Unlock()
}
func setJobTimeout(jobID64 int64, d time.Duration) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.timeout = ? WHERE j.obj_id = ?;`)
logOnError(err, "setJobTimeout : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(time.Now().UTC().Add(d), jobID64)
logOnError(err, fmt.Sprintf("setJobTimeout, update obj_job(%d)", jobID64))
if err != nil {
return err
}
return nil
}
func setJobDone(jobID64 int64) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`)
logOnError(err, "setJobDone : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(time.Now().UTC(), jobID64)
s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobID64)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func setJobStart(jobId int64) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`)
logOnError(err, "setJobStart : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(time.Now().UTC(), jobId)
s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func rescheduleJob(jobID64 int64, trigger int64, schedule time.Time) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ?, j.trigger_id = ? WHERE j.obj_id = ?;`)
logOnError(err, "rescheduleJob : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(schedule.UTC(), trigger, jobID64)
s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobID64)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func loadCurrentJobs() ([]Job, error) {
var (
objId int64
jobTypeId int32
userID64 int64
trigger int64
timeout time.Time
payload []byte
jobs []Job
)
t := time.Now().UTC()
r := RndInt64()
_, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize)
logOnError(err, "loadCurrentJobs : update intial rows")
if err != nil {
return jobs, err
}
stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.payload, j.timeout FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;")
logOnError(err, "loadCurrentJobs : prepare select statement")
if err != nil {
stmt.Close()
return jobs, err
}
rows, err := stmt.Query(r)
// rows, err := stmt.Query(time.Now())
logOnError(err, "loadCurrentJobs : query select statement")
if err != nil {
stmt.Close()
return jobs, err
}
for rows.Next() {
err = rows.Scan(&objId, &jobTypeId, &trigger, &userID64, &payload, &timeout)
logOnError(err, "loadCurrentJobs : scan query rows")
job := Job{
ID64: objId,
JobTypeID: jobTypeId,
Trigger: trigger,
UserID64: userID64,
Payload: payload,
Timeout: timeout,
}
jobs = append(jobs, job)
}
err = rows.Err()
logOnError(err, "loadCurrentJobs : scan end rows")
rows.Close()
if err != nil {
stmt.Close()
return jobs, err
}
err = stmt.Close()
logOnError(err, "loadCurrentJobs : close select statement")
if err != nil {
return jobs, err
}
return jobs, nil
}
func jobRescan(j Job) {
var p JobPayloadRescanMsg
err := setJobStart(j.ID64)
logOnError(err, "jobRescan : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobRescan : Unmarshal payload")
start := time.Now()
milestone := time.Now()
ids := getSQLListID64(p.Query)
if len(ids) > 1 {
txt := fmt.Sprintf("Rescanning %d messages.", len(ids))
m := TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
i := 0
for _, id := range ids {
SQLMsgIdentifyQueue <- id
i = i + 1
if time.Now().After(milestone.Add(1 * time.Minute)) {
//txt := fmt.Sprintf("Rescanned %d/%d messages.", i, len(ids))
m = TGCommand{
Type: commandReplyMsg,
Text: fmt.Sprintf("Rescanned %d/%d messages.", i, len(ids)),
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
milestone = time.Now()
}
}
r := JobPayloadSetDone{
JobID64: j.ID64,
MsgID64: p.MsgID64,
ChatID64: p.ChatID64,
Text: fmt.Sprintf("%d messages processed in %s.", len(ids), time.Since(start)),
}
b, _ := json.Marshal(r)
_, err := createJob(objSubTypeJobSetJobDone, objJobPriorityRescanAllMsg, j.UserID64, j.ID64, time.Now().UTC(), b)
logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)")
} else if len(ids) == 1 {
SQLMsgIdentifyQueue <- ids[0]
err = setJobDone(j.ID64)
logOnError(err, "jobRescan : setJobDone(1)")
if p.MsgID64 != 0 || p.ChatID64 != 0 {
m := TGCommand{
Type: commandReplyMsg,
Text: "One message processed.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
} else {
err = setJobDone(j.ID64)
logOnError(err, "jobRescan : setJobDone(0)")
if p.MsgID64 != 0 || p.ChatID64 != 0 {
m := TGCommand{
Type: commandReplyMsg,
Text: "No message processed.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
}
return
}
func jobSetDone(j Job) {
var r JobPayloadSetDone
err := setJobStart(j.ID64)
logOnError(err, "jobSetDone : setJobStart")
err = json.Unmarshal(j.Payload, &r)
logOnError(err, "jobSetDone : Unmarshal payload")
err = setJobDone(r.JobID64)
logOnError(err, "jobSetDone : setJobDone(child)")
err = setJobDone(j.ID64)
logOnError(err, "jobSetDone : setJobDone")
m := TGCommand{
Type: commandReplyMsg,
Text: r.Text,
FromMsgID64: r.MsgID64,
FromChatID64: r.ChatID64,
}
TGCmdQueue <- m
return
}
func jobPillage(j Job) {
var r JobPayloadPillage
err := setJobStart(j.ID64)
logOnError(err, "jobPillage : setJobStart")
err = json.Unmarshal(j.Payload, &r)
logOnError(err, "jobPillage : Unmarshal payload")
// check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job
ids := getSQLListID64(`SELECT ox.id
FROM obj ox
,obj_msg omx
,obj op
,obj_msg omp
,obj_job oj
WHERE oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + `
AND omx.user_id = oj.user_id
AND omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + `
AND omx.obj_id = ox.id
AND ox.obj_sub_type_id in (` + strconv.Itoa(objSubTypeMessagePillageGo) +
`, ` + strconv.Itoa(objSubTypeMessagePillageTimeout) +
`, ` + strconv.Itoa(objSubTypeMessagePillageLoss) +
`, ` + strconv.Itoa(objSubTypeMessagePillageWin) + `)
AND op.id = ` + strconv.FormatInt(r.ObjID64, 10) + `
AND omp.obj_id = op.id
AND omx.date between omp.date AND ADDTIME(omp.date, '0 0:3:30.000000')
ORDER BY CASE ox.obj_sub_type_id WHEN ` + strconv.Itoa(objSubTypeMessagePillageWin) + ` THEN 0
WHEN ` + strconv.Itoa(objSubTypeMessagePillageLoss) + ` THEN 1
WHEN ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` THEN 2
WHEN ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` THEN 3
ELSE 4 END ASC
LIMIT 1;`)
if len(ids) > 1 { // issue there ?
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not
m, err := getObjMsg(ids[0])
logOnError(err, "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)")
if err == nil {
if m.Date.Add(60 * time.Second).After(time.Now().UTC()) {
msgTypeID64, err := getObjSubTypeId(ids[0])
logOnError(err, "jobPillage : getObjSubTypeId")
if err == nil {
if msgTypeID64 == objSubTypeMessagePillageGo {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if msgTypeID64 == objSubTypeMessagePillageWin {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We avoided a pillage (%s))", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if msgTypeID64 == objSubTypeMessagePillageLoss {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if msgTypeID64 == objSubTypeMessagePillageTimeout {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
}
}
}
}
err = setJobDone(j.ID64)
logOnError(err, "jobSetDone : setJobDone")
return
}
// is the job outdated now ?
if time.Now().UTC().After(r.Date.Add(time.Minute*3 + time.Second*30)) {
// log.Printf("jobPillage :\n\tPillageTime : %s\n\tNowTime : %s\n", r.Date.Format(time.RFC3339), time.Now().UTC().Format(time.RFC3339))
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("Pillage interception expired"),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
return
}
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("No outcome for the pillage yet"),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
//no outcome yet, have we sent a "/go" in the last 30 sec ?
ids = getSQLListID64(` select ox.id
from obj ox
,obj_msg omx
,obj_job oj
where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + `
and omx.user_id = oj.user_id
and omx.sender_user_id = oj.user_id
and omx.obj_id = ox.id
and ox.obj_sub_type_id =` + strconv.Itoa(objSubTypeMessageGo) + `
and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`)
if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait
m, err := getObjMsg(ids[0])
logOnError(err, "jobPillage : getMsg(objSubTypeMessageGo)")
if err == nil {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
}
err = rescheduleJob(j.ID64, j.Trigger, time.Now().Add(30*time.Second).UTC())
logOnError(err, "jobPillage : rescheduleJob(objSubTypeMessageGo)")
} else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec
clientSendCWMsg(j.UserID64, "/go")
err = rescheduleJob(j.ID64, j.Trigger, time.Now().Add(30*time.Second).UTC())
logOnError(err, "jobPillage : rescheduleJob")
}
return
}
func jobMsgRefresh(j Job) {
var p JobPayloadMsgRefresh
// identify whether the message has been properly refreshed ? create new job ? reschedule same job ?
err := setJobStart(j.ID64)
logOnError(err, "jobMsgRefresh : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobMsgRefresh : Unmarshal payload")
m, err := getObjMsg(p.ObjID64)
if err != nil && strings.Compare(err.Error(), `sql: no rows in result set`) == 0 {
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
} else {
logOnError(err, "jobMsgRefresh : getObjMsg")
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
}
err = delObj(p.ObjID64)
logOnError(err, "jobMsgRefresh : delObj")
clientRefreshCWMsg(m.TGUserID64, m.ChatID64, m.ID64)
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
}
func jobMsgClient(j Job) {
var p JobPayloadMsgClient
err := setJobStart(j.ID64)
logOnError(err, "jobMsgClient : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobMsgClient : Unmarshal payload")
if err == nil {
clientSendCWMsg(j.UserID64, p.Text)
m := TGCommand{
Type: commandReplyMsg,
Text: "Message sent.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
}
func jobBackupExport(j Job) {
var p JobPayloadBackupExport
err := setJobStart(j.ID64)
logOnError(err, "jobBackupExport : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobBackupExport : Unmarshal payload")
bkp := DataBackup{}
start := time.Now()
milestone := time.Now()
s := new([]ChatWarsMessage)
msgs := *s
ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`)
txt := fmt.Sprintf("Backing up %d messages.", len(ids))
m := TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
i := 0
for _, id := range ids {
msg, err := getObjMsg(id)
logOnError(err, "jobBackupExport : getMsg")
if err == nil {
msgs = append(msgs, *msg)
}
i = i + 1
if time.Now().After(milestone.Add(1 * time.Minute)) {
txt := fmt.Sprintf("Exported %d/%d messages.", i, len(ids))
m = TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
milestone = time.Now()
}
}
bkp.Messages = msgs
b, err := json.Marshal(bkp)
logOnError(err, "jobBackupExport : Marshal")
m = TGCommand{
Type: commandReplyMsg,
Text: `Compressing archive`,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
zbuf := new(bytes.Buffer)
zw := zip.NewWriter(zbuf)
zf, err := zw.Create(`backup.json`)
logOnError(err, "jobBackupExport : Create")
_, err = zf.Write(b)
logOnError(err, "jobBackupExport : Write")
err = zw.Close()
logOnError(err, "jobBackupExport : Close")
d := tb.Document{}
d.File = tb.FromReader(bytes.NewReader(zbuf.Bytes()))
d.FileName = fmt.Sprintf("%s.backup.zip", start.Format("20060102150405"))
d.Caption = d.FileName
d.MIME = `application/zip`
m = TGCommand{
Type: commandReplyMsg,
Text: `Export done.`,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
m = TGCommand{
Type: commandSendDocument,
Document: d,
ToChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupExport : setJobDone")
return
}
func jobBackupImport(j Job) {
var p JobPayloadBackupImport
err := setJobStart(j.ID64)
logOnError(err, "jobBackupImport : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobBackupImport : Unmarshal payload")
resp, err := http.Get(p.URL)
logOnError(err, "jobBackupImport : Get")
defer resp.Body.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
m := TGCommand{
Type: commandReplyMsg,
Text: "File downloaded.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
z := buf.Bytes()
r := bytes.NewReader(z)
zr, err := zip.NewReader(r, int64(len(z)))
for _, f := range zr.File {
if strings.Compare(f.Name, "backup.json") == 0 {
rc, err := f.Open()
logOnError(err, "jobBackupImport : Open")
if err != nil {
return
}
data, err := ioutil.ReadAll(rc)
logOnError(err, "jobBackupImport : ReadAll")
if err != nil {
return
}
log.Printf("jobBackupImport : %d uncompressed bytes.\n", len(data))
rc.Close()
bkp := DataBackup{}
err = json.Unmarshal(data, &bkp)
logOnError(err, "jobBackupImport : Unmarshal")
if err != nil {
return
}
for _, msg := range bkp.Messages {
MQCWMsgQueue <- msg
}
m := TGCommand{
Type: commandReplyMsg,
Text: "Backup restored.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupImport : setJobDone")
return
}
}
m = TGCommand{
Type: commandReplyMsg,
Text: "Not backup file found in archive.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupImport : setJobDone")
return
}
func jobGStock(j Job) {
var p JobPayloadGStock
err := setJobStart(j.ID64)
logOnError(err, "jobGStock : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGStock : Unmarshal payload")
/*
clientSendCWMsg(m.Chat.ID, "/g_stock_res")
clientSendCWMsg(m.Chat.ID, "/g_stock_alch")
clientSendCWMsg(m.Chat.ID, "/g_stock_misc")
clientSendCWMsg(m.Chat.ID, "/g_stock_rec")
clientSendCWMsg(m.Chat.ID, "/g_stock_parts")
clientSendCWMsg(m.Chat.ID, "/g_stock_other")
*/
if err == nil {
m := TGCommand{
Type: commandReplyMsg,
Text: "Message sent.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
err = setJobDone(j.ID64)
logOnError(err, "jobGStock : setJobDone")
return
}
func jobGDepositForward(j Job) {
var p JobPayloadGDepositForward
err := setJobStart(j.ID64)
logOnError(err, "jobGDepositForward : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGDepositForward : Unmarshal payload")
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGDepositForward : getObjMsg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGDepositForward : getMsgParsingRule")
cwm, err := parseSubTypeMessageGDepositReq(msg, rule.re)
if cwm.ItemID64 == p.ItemID64 && cwm.Quantity == p.Quantity {
log.Printf("jobGDepositForward : match (%d / %d).\n", cwm.ItemID64, cwm.Quantity)
gDepositForwardMux.Lock()
gDepositForwardMsg = append(gDepositForwardMsg, j.Trigger)
gDepositForwardMux.Unlock()
err = setJobDone(j.ID64)
logOnError(err, "jobGDeposit : setJobDone")
} else {
log.Printf("jobGDepositForward : found (%d / %d), expected (%d / %d).\n", cwm.ItemID64, cwm.Quantity, p.ItemID64, p.Quantity)
err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
logOnError(err, "jobGDepositForward : rescheduleJob")
setJobCallback(j.ID64, j.UserID64, objSubTypeMessageGDepositReq)
}
return
}
func jobGDeposit(j Job) {
var p JobPayloadGDeposit
err := setJobStart(j.ID64)
logOnError(err, "jobGDeposit : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGDeposit : Unmarshal payload")
if p.Status == 0 { /* handle remaining resources to be stored */
var res, misc, alch, craft, equip bool = false, false, false, false, false
var delay time.Duration = 0 * time.Second
var b []byte
if len(p.ResObjID64) > 0 {
for i := range p.ResObjID64 {
obj, err := getObjItem(p.ResObjID64[i])
logOnError(err, "jobGDeposit : getObjItem")
if err == nil {
switch obj.ItemTypeID {
case objSubTypeItemResource:
res = true
case objSubTypeItemAlch:
alch = true
case objSubTypeItemMisc:
misc = true
case objSubTypeItemRecipe:
craft = true
case objSubTypeItemPart:
craft = true
case objSubTypeItemOther:
equip = true
case objSubTypeItemUnique:
equip = true
default:
}
}
}
}
if res {
clientSendCWMsgDelay(p.ChatID64, `📦Resources`, delay)
p.Status = objSubTypeMessageStockAck
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageStockAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if alch {
clientSendCWMsgDelay(p.ChatID64, `Alchemy`, delay)
p.Status = 1 // FIXME UPDATE WITH PROPER TYPE
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if misc {
clientSendCWMsgDelay(p.ChatID64, `🗃Misc`, delay)
p.Status = 1 // FIXME UPDATE WITH PROPER TYPE
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if craft {
clientSendCWMsgDelay(p.ChatID64, `⚒Crafting`, delay)
p.Status = objSubTypeMessageStockAnyAck
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageStockAnyAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if equip {
clientSendCWMsgDelay(p.ChatID64, `🏷Equipment`, delay)
p.Status = 1 // FIXME UPDATE WITH PROPER TYPE
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
return
} else if p.Status == 1 { /* handle that one resource from the objSubTypeMessageOrderbookAck msg */
log.Printf("jobGDeposit : 1 : %d.\n", j.Trigger)
} else if p.Status == objSubTypeMessageStockAck {
//log.Printf("jobGDeposit : objSubTypeMessageStockAck : %d.\n", j.Trigger)
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGDeposit : getObjMsg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGDeposit : getMsgParsingRule")
cwm, err := parseSubTypeMessageStockAck(msg, rule.re)
for stockIdx := range cwm.Stock {
for resIdx := range p.ResObjID64 {
if cwm.Stock[stockIdx].ItemID64 == p.ResObjID64[resIdx] {
//log.Printf("jobGDeposit : objSubTypeMessageStockAck : Matching ItemID %d (%d).\n", p.ResObjID64[resIdx], cwm.Stock[stockIdx].Quantity)
item, _ := getObjItem(p.ResObjID64[resIdx])
clientSendCWMsg(p.ChatID64, fmt.Sprintf("/g_deposit %s %d", item.Code, cwm.Stock[stockIdx].Quantity))
p2 := JobPayloadGDepositForward{
ItemID64: p.ResObjID64[resIdx],
Quantity: cwm.Stock[stockIdx].Quantity,
}
b2, _ := json.Marshal(p2)
err = createJobCallback(objSubTypeJobGDepositForward, j.UserID64, objSubTypeMessageGDepositReq, b2, time.Duration(1*time.Minute))
}
}
}
} else if p.Status == objSubTypeMessageStockAnyAck {
log.Printf("jobGDeposit : objSubTypeMessageStockAnyAck : %d.\n", j.Trigger)
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGDeposit : getObjMsg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGDeposit : getMsgParsingRule")
cwm, err := parseSubTypeMessageStockAnyAck(msg, rule.re)
for stockIdx := range cwm.Stock {
for resIdx := range p.ResObjID64 {
if cwm.Stock[stockIdx].ItemID64 == p.ResObjID64[resIdx] {
log.Printf("jobGDeposit : objSubTypeMessageStockAnyAck : Matching ItemID %d (%d).\n", p.ResObjID64[resIdx], cwm.Stock[stockIdx].Quantity)
item, _ := getObjItem(p.ResObjID64[resIdx])
clientSendCWMsg(p.ChatID64, fmt.Sprintf("/g_deposit %s %d", item.Code, cwm.Stock[stockIdx].Quantity))
p2 := JobPayloadGDepositForward{
ItemID64: p.ResObjID64[resIdx],
Quantity: cwm.Stock[stockIdx].Quantity,
}
b2, _ := json.Marshal(p2)
err = createJobCallback(objSubTypeJobGDepositForward, j.UserID64, objSubTypeMessageGDepositReq, b2, time.Duration(1*time.Minute))
}
}
}
}
err = setJobDone(j.ID64)
logOnError(err, "jobGDeposit : setJobDone")
return
}
func jobVaultItemStatus(j Job) {
var (
p JobPayloadVaultItemStatus
itemID64, currentItemID64 int64
user, deposit, withdraw int64
userList, depositList, withdrawList []int64
)
err := setJobStart(j.ID64)
logOnError(err, "jobVaultItemStatus : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobVaultItemStatus : Unmarshal payload")
stmt := `SELECT x.item_id
,x.user_id
,(SELECT COALESCE(SUM(omv.quantity), 0)
FROM obj_msg_vault_v omv
WHERE omv.user_id = x.user_id
AND omv.item_id = x.item_id
AND omv.msg_type_id = ` + strconv.Itoa(objSubTypeMessageGDepositAck) + `
AND omv.chat_id = x.chat_id) deposit
,(SELECT COALESCE(SUM(omv.quantity), 0)
FROM obj_msg_vault_v omv
WHERE omv.user_id = x.user_id
AND omv.item_id = x.item_id
AND omv.msg_type_id = ` + strconv.Itoa(objSubTypeMessageWithdrawRcv) + `
AND omv.chat_id = x.chat_id) withdraw
FROM (SELECT DISTINCT
omv.user_id
,omv.chat_id
,omv.item_id
FROM obj_msg_vault_v omv
WHERE omv.chat_id = ?
AND omv.item_id in (?` + strings.Repeat(",?", len(p.ItemListID64)-1) + `)) x
ORDER BY x.user_id ASC;`
args := make([]interface{}, len(p.ItemListID64)+1)
args[0] = p.DepositChatID64
for i, id := range p.ItemListID64 {
args[i+1] = id
}
rows, err := db.Query(stmt, args...)
logOnError(err, "jobVaultItemStatus : Get rows")
if err != nil {
err = setJobDone(j.ID64)
logOnError(err, "jobVaultItemStatus : setJobDone")
return
}
currentItemID64 = 0
for rows.Next() {
err = rows.Scan(&itemID64, &user, &deposit, &withdraw)
logOnError(err, "jobVaultItemStatus : scan next val")
if itemID64 != currentItemID64 {
if currentItemID64 != 0 {
// display info
out := fmt.Sprintf("<code>%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `User`)
for i, userId := range userList {
logOnError(err, "jobVaultItemStatus : getObjItem")
out = fmt.Sprintf("%s%-32d |%6d |%6d |%6d\n", out, userId, depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandSendMsg,
Text: out,
ToChatID64: p.UserID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
}
currentItemID64 = itemID64
userList = nil
depositList = nil
withdrawList = nil
}
userList = append(userList, user)
depositList = append(depositList, deposit)
withdrawList = append(withdrawList, withdraw)
}
if currentItemID64 != 0 {
// display info
out := fmt.Sprintf("<code>%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `User`)
for i, userId := range userList {
logOnError(err, "jobVaultItemStatus : getObjItem")
out = fmt.Sprintf("%s%-32d |%6d |%6d |%6d\n", out, userId, depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandSendMsg,
Text: out,
ToChatID64: p.UserID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
}
err = rows.Err()
logOnError(err, "jobVaultItemStatus : query end")
rows.Close()
err = setJobDone(j.ID64)
logOnError(err, "jobVaultItemStatus : setJobDone")
return
}
func jobVaultUserStatus(j Job) {
var (
p JobPayloadVaultUserStatus
userID64, currentUserID64 int64
itemID64, deposit, withdraw int64
itemList, depositList, withdrawList []int64
)
err := setJobStart(j.ID64)
logOnError(err, "jobVaultUserStatus : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobVaultUserStatus : Unmarshal payload")
stmt := `SELECT x.user_id
,x.item_id
,(SELECT COALESCE(SUM(omv.quantity), 0)
FROM obj_msg_vault_v omv
WHERE omv.user_id = x.user_id
AND omv.item_id = x.item_id
AND omv.msg_type_id = ` + strconv.Itoa(objSubTypeMessageGDepositAck) + `
AND omv.chat_id = x.chat_id) deposit
,(SELECT COALESCE(SUM(omv.quantity), 0)
FROM obj_msg_vault_v omv
WHERE omv.user_id = x.user_id
AND omv.item_id = x.item_id
AND omv.msg_type_id = ` + strconv.Itoa(objSubTypeMessageWithdrawRcv) + `
AND omv.chat_id = x.chat_id) withdraw
FROM (SELECT DISTINCT
omv.user_id
,omv.chat_id
,omv.item_id
FROM obj_msg_vault_v omv
WHERE omv.chat_id = ?
AND omv.user_id IN (?` + strings.Repeat(",?", len(p.UserListID64)-1) + `)
AND omv.item_type_id IN (?` + strings.Repeat(",?", len(p.ItemTypeListID64)-1) + `)) x
ORDER BY x.user_id ASC;`
args := make([]interface{}, len(p.UserListID64)+len(p.ItemTypeListID64)+1)
args[0] = p.DepositChatID64
for i, id := range p.UserListID64 {
args[i+1] = id
}
for i, id := range p.ItemTypeListID64 {
args[i+1+len(p.UserListID64)] = id
}
rows, err := db.Query(stmt, args...)
logOnError(err, "jobVaultUserStatus : Get rows")
if err != nil {
err = setJobDone(j.ID64)
logOnError(err, "jobVaultUserStatus : setJobDone")
return
}
currentUserID64 = 0
for rows.Next() {
err = rows.Scan(&userID64, &itemID64, &deposit, &withdraw)
logOnError(err, "jobVaultUserStatus : scan next val")
if userID64 != currentUserID64 {
if currentUserID64 != 0 {
// display info
out := fmt.Sprintf("<code>%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `Item`)
for i, itemId := range itemList {
item, err := getObjItem(itemId)
logOnError(err, "jobVaultUserStatus : getObjItem")
out = fmt.Sprintf("%s%-32s |%6d |%6d |%6d\n", out, item.Name, depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandSendMsg,
Text: out,
ToChatID64: p.UserID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
}
currentUserID64 = userID64
itemList = nil
depositList = nil
withdrawList = nil
}
itemList = append(itemList, itemID64)
depositList = append(depositList, deposit)
withdrawList = append(withdrawList, withdraw)
}
if currentUserID64 != 0 {
//display info
out := fmt.Sprintf("<code>%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `Item`)
for i, itemId := range itemList {
item, err := getObjItem(itemId)
logOnError(err, "jobVaultUserStatus : getObjItem")
out = fmt.Sprintf("%s%-32s |%6d |%6d |%6d\n", out, item.Name, depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandSendMsg,
Text: out,
ToChatID64: p.UserID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
}
err = rows.Err()
logOnError(err, "jobVaultUserStatus : query end")
rows.Close()
err = setJobDone(j.ID64)
logOnError(err, "jobVaultUserStatus : setJobDone")
return
}
func jobGWithdraw(j Job) {
var p JobPayloadGWithdraw
err := setJobStart(j.ID64)
logOnError(err, "jobGWithdraw : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGWithdraw : Unmarshal payload")
r := regexp.MustCompile("[a-z0-9]+ [0-9]+")
for _, l := range r.FindAllStringSubmatch(p.Request, -1) {
fmt.Printf("jobGWithdraw : %v\n", l)
/*
i := getObjItemID(``, l[1])
q, _ := strconv.ParseInt(l[2], 10, 64)
*/
/*
ChatWarsItems
b, _ := json.Marshal(p)
t := time.Now().UTC()
_, err := createJob(objSubTypeJobGDeposit, objJobPriority, int64(m.Chat.ID), 0, t, b)
*/
}
err = setJobDone(j.ID64)
logOnError(err, "jobGWithdraw : setJobDone")
return
}