chirpnest/job.go
2020-02-07 15:27:14 +08:00

2521 lines
75 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"archive/zip"
"bytes"
"compress/zlib"
"crypto/aes"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"regexp"
"strconv"
"strings"
"time"
tb "gopkg.in/tucnak/telebot.v2"
)
func createJob(jobTypeID64 int64, priority int32, userID64 int64, trigger int64, schedule time.Time, payload []byte) (int64, error) {
var (
zb bytes.Buffer
zpayload []byte
zipped int
)
if len(payload) > 10000 {
zw := zlib.NewWriter(&zb)
zw.Write(payload)
zw.Close()
zpayload = zb.Bytes()
zipped = 1
} else {
zpayload = payload
zipped = 0
}
if len(zpayload) > 20000 {
return 0, errors.New("payload too long")
}
stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id)
VALUES (? , ?);`)
logOnError(err, "createJob : prepare insert obj")
if err != nil {
return 0, err
}
defer stmt.Close()
res, err := stmt.Exec(cacheObjType[`job`], jobTypeID64)
s := fmt.Sprintf("createJob, insert obj(%d, %d)", cacheObjType[`job`], jobTypeID64)
logOnError(err, s)
if err != nil {
return 0, err
}
objId, err := res.LastInsertId()
logOnError(err, "createJob : get last insert Id")
if err != nil {
return 0, err
}
stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, trigger_id, seq_nr, schedule, is_done, in_work, inserted, timeout, pulled, started, ended, zipped, payload)
VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, ?, NULL, NULL, NULL, ?, ?);`)
logOnError(err, "createJob : prepare insert obj_job")
if err != nil {
return 0, err
}
defer stmt.Close()
_, err = stmt.Exec(objId, priority, userID64, trigger, schedule.UTC(), time.Now().UTC(), time.Unix(maxUnixTimestamp, 0).UTC(), zipped, zpayload)
logOnError(err, "createJob : insert obj_job")
if err != nil {
return 0, err
}
j := new(Job)
j.ID64 = objId
j.JobTypeID64 = jobTypeID64
j.Trigger = trigger
j.Timeout = time.Unix(maxUnixTimestamp, 0).UTC()
j.UserID64 = userID64
j.Payload = payload
muxObjJob.Lock()
cacheObjJob[objId] = *j
muxObjJob.Unlock()
return objId, nil
}
func createJobCallback(jobTypeID64 int64, userID64 int64, msgTypeID64 int64, payload []byte, timeout time.Duration) error {
//t, err := time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00")
jobID64, err := createJob(jobTypeID64, objJobPriority, userID64, 0, time.Unix(maxUnixTimestamp, 0).UTC(), payload)
if err != nil {
return err
}
setJobCallback(jobID64, userID64, msgTypeID64)
err = setJobTimeout(jobID64, timeout)
logOnError(err, "createJobCallback : setJobTimeout")
return nil
}
func setJobCallback(jobID64 int64, userID64 int64, msgTypeID64 int64) {
muxCallbacks.Lock()
if _, ok := callbacks[userID64]; !ok {
callbacks[userID64] = make(map[int64][]int64)
}
s := callbacks[userID64][msgTypeID64]
s = append(s, jobID64)
callbacks[userID64][msgTypeID64] = s
muxCallbacks.Unlock()
}
func setJobTimeout(jobID64 int64, d time.Duration) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.timeout = ? WHERE j.obj_id = ?;`)
logOnError(err, "setJobTimeout : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
t := time.Now().UTC().Add(d)
_, err = stmt.Exec(t, jobID64)
logOnError(err, fmt.Sprintf("setJobTimeout : update obj_job(%d)", jobID64))
if err != nil {
return err
}
muxObjJob.Lock()
j := cacheObjJob[jobID64]
j.Timeout = t
cacheObjJob[jobID64] = j
muxObjJob.Unlock()
return nil
}
func setJobPayloadJSON(jobID64 int64, payload interface{}) error {
b, err := json.Marshal(payload)
logOnError(err, "setJobPayloadJSON")
if err != nil {
return err
}
return setJobPayload(jobID64, b)
}
func setJobPayloadJSONUnsafe(jobID64 int64, payload interface{}) error {
b, err := json.Marshal(payload)
logOnError(err, "setJobPayloadJSONUnsafe")
if err != nil {
return err
}
return setJobPayloadUnsafe(jobID64, b)
}
func setJobPayload(jobID64 int64, payload []byte) error {
muxObjJob.Lock()
defer muxObjJob.Unlock()
return setJobPayloadUnsafe(jobID64, payload)
}
func setJobPayloadUnsafe(jobID64 int64, payload []byte) error {
var (
zb bytes.Buffer
zpayload []byte
zipped int
)
if len(payload) > 10000 {
zw := zlib.NewWriter(&zb)
zw.Write(payload)
zw.Close()
zpayload = zb.Bytes()
zipped = 1
} else {
zpayload = payload
zipped = 0
}
if len(zpayload) > 10000 {
return errors.New("payload too long")
}
stmt, err := db.Prepare(`UPDATE obj_job j SET j.payload = ?, j.zipped = ? WHERE j.obj_id = ?;`)
logOnError(err, "setJobTimeout : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(zpayload, zipped, jobID64)
logOnError(err, fmt.Sprintf("setJobPayload : update obj_job(%d)", jobID64))
if err != nil {
return err
}
j := cacheObjJob[jobID64]
j.Payload = payload
cacheObjJob[jobID64] = j
//log.Printf("setJobPayload[%d] : %s\n", jobID64, string(payload))
return nil
}
func getJobPayload(jobID64 int64) []byte {
muxObjJob.Lock()
defer muxObjJob.Unlock()
return getJobPayloadUnsafe(jobID64)
}
func getJobPayloadUnsafe(jobID64 int64) []byte {
var b []byte
if j, ok := cacheObjJob[jobID64]; ok {
b = j.Payload
return b
} else {
return nil
}
}
func setJobDone(jobID64 int64) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`)
logOnError(err, "setJobDone : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(time.Now().UTC(), jobID64)
s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobID64)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func setJobStart(jobId int64) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`)
logOnError(err, "setJobStart : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(time.Now().UTC(), jobId)
s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func rescheduleJob(jobID64 int64, trigger int64, schedule time.Time) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.in_work = 0, j.schedule = ?, j.trigger_id = ? WHERE j.obj_id = ?;`)
logOnError(err, "rescheduleJob : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(schedule.UTC(), trigger, jobID64)
s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobID64)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func loadCurrentJobs() ([]Job, error) {
var (
objId int64
jobTypeID64 int64
userID64 int64
trigger int64
timeout time.Time
zipped int
zpayload []byte
jobs []Job
)
t := time.Now().UTC()
r := RndInt64()
_, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize)
logOnError(err, "loadCurrentJobs : update intial rows")
if err != nil {
return jobs, err
}
stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.zipped, j.payload, j.timeout FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;")
logOnError(err, "loadCurrentJobs : prepare select statement")
if err != nil {
stmt.Close()
return jobs, err
}
rows, err := stmt.Query(r)
// rows, err := stmt.Query(time.Now())
logOnError(err, "loadCurrentJobs : query select statement")
if err != nil {
stmt.Close()
return jobs, err
}
for rows.Next() {
err = rows.Scan(&objId, &jobTypeID64, &trigger, &userID64, &zipped, &zpayload, &timeout)
logOnError(err, "loadCurrentJobs : scan query rows")
var payload []byte
if zipped > 0 {
zb := bytes.NewReader(zpayload)
zr, err := zlib.NewReader(zb)
if err != nil {
logOnError(err, "loadCurrentJobs : zlib.NewReader")
continue
}
b := new(bytes.Buffer)
b.ReadFrom(zr)
payload = b.Bytes()
} else {
payload = zpayload
}
job := Job{
ID64: objId,
JobTypeID64: jobTypeID64,
Trigger: trigger,
UserID64: userID64,
Timeout: timeout,
Payload: payload,
}
jobs = append(jobs, job)
}
err = rows.Err()
logOnError(err, "loadCurrentJobs : scan end rows")
rows.Close()
if err != nil {
stmt.Close()
return jobs, err
}
err = stmt.Close()
logOnError(err, "loadCurrentJobs : close select statement")
if err != nil {
return jobs, err
}
return jobs, nil
}
func jobRescan(j Job) {
var p JobPayloadRescanMsg
err := setJobStart(j.ID64)
logOnError(err, "jobRescan : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobRescan : Unmarshal payload")
start := time.Now()
milestone := time.Now()
ids := getSQLListID64(p.Query)
if len(ids) > 1 {
txt := fmt.Sprintf("Rescanning %d messages.", len(ids))
m := TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
i := 0
for _, id := range ids {
SQLMsgIdentifyQueue <- id
i = i + 1
if time.Now().After(milestone.Add(1 * time.Minute)) {
//txt := fmt.Sprintf("Rescanned %d/%d messages.", i, len(ids))
m = TGCommand{
Type: commandReplyMsg,
Text: fmt.Sprintf("Rescanned %d/%d messages.", i, len(ids)),
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
milestone = time.Now()
}
}
r := JobPayloadSetDone{
JobID64: j.ID64,
MsgID64: p.MsgID64,
ChatID64: p.ChatID64,
Text: fmt.Sprintf("%d messages processed in %s.", len(ids), time.Since(start)),
}
b, _ := json.Marshal(r)
_, err := createJob(cacheObjSubType[`job_set_done`], objJobPriorityRescanAllMsg, j.UserID64, j.ID64, time.Now().UTC(), b)
logOnError(err, "jobRescan : createJob(cacheObjSubType[`job_set_done`])")
} else if len(ids) == 1 {
SQLMsgIdentifyQueue <- ids[0]
err = setJobDone(j.ID64)
logOnError(err, "jobRescan : setJobDone(1)")
if p.MsgID64 != 0 || p.ChatID64 != 0 {
m := TGCommand{
Type: commandReplyMsg,
Text: "One message processed.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
} else {
err = setJobDone(j.ID64)
logOnError(err, "jobRescan : setJobDone(0)")
if p.MsgID64 != 0 || p.ChatID64 != 0 {
m := TGCommand{
Type: commandReplyMsg,
Text: "No message processed.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
}
return
}
func jobSetDone(j Job) {
var r JobPayloadSetDone
err := setJobStart(j.ID64)
logOnError(err, "jobSetDone : setJobStart")
err = json.Unmarshal(j.Payload, &r)
logOnError(err, "jobSetDone : Unmarshal payload")
err = setJobDone(r.JobID64)
logOnError(err, "jobSetDone : setJobDone(child)")
err = setJobDone(j.ID64)
logOnError(err, "jobSetDone : setJobDone")
m := TGCommand{
Type: commandReplyMsg,
Text: r.Text,
FromMsgID64: r.MsgID64,
FromChatID64: r.ChatID64,
}
TGCmdQueue <- m
return
}
func jobPillage(j Job) {
var r JobPayloadPillage
err := setJobStart(j.ID64)
logOnError(err, "jobPillage : setJobStart")
err = json.Unmarshal(j.Payload, &r)
logOnError(err, "jobPillage : Unmarshal payload")
// check if we have a acknowledgment of go or a timeout within 3m30 of the PillageInc from the Job
ids := getSQLListID64(`SELECT ox.id
FROM obj ox
,obj_msg omx
,obj op
,obj_msg omp
,obj_job oj
WHERE oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + `
AND omx.user_id = oj.user_id
AND omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + `
AND omx.obj_id = ox.id
AND ox.obj_sub_type_id in (` + strconv.FormatInt(cacheObjSubType[`msg_pillage_go`], 10) +
`, ` + strconv.FormatInt(cacheObjSubType[`msg_pillage_timeout`], 10) +
`, ` + strconv.FormatInt(cacheObjSubType[`msg_pillage_loss`], 10) +
`, ` + strconv.FormatInt(cacheObjSubType[`msg_pillage_win`], 10) + `)
AND op.id = ` + strconv.FormatInt(r.ObjID64, 10) + `
AND omp.obj_id = op.id
AND omx.date between omp.date AND ADDTIME(omp.date, '0 0:3:30.000000')
ORDER BY CASE ox.obj_sub_type_id WHEN ` + strconv.FormatInt(cacheObjSubType[`msg_pillage_win`], 10) + ` THEN 0
WHEN ` + strconv.FormatInt(cacheObjSubType[`msg_pillage_loss`], 10) + ` THEN 1
WHEN ` + strconv.FormatInt(cacheObjSubType[`msg_pillage_timeout`], 10) + ` THEN 2
WHEN ` + strconv.FormatInt(cacheObjSubType[`msg_pillage_go`], 10) + ` THEN 3
ELSE 4 END ASC
LIMIT 1;`)
if len(ids) > 1 { // issue there ?
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not
m, err := getObjMsg(ids[0])
logOnError(err, "jobPillage : getMsg(cacheObjSubType[`msg_pillage_go`], cacheObjSubType[`msg_pillage_timeout`], 10)")
if err == nil {
if m.Date.Add(60 * time.Second).After(time.Now().UTC()) {
msgTypeID64, err := getObjSubTypeId(ids[0])
logOnError(err, "jobPillage : getObjSubTypeId")
if err == nil {
if msgTypeID64 == cacheObjSubType[`msg_pillage_go`] {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if msgTypeID64 == cacheObjSubType[`msg_pillage_win`] {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We avoided a pillage (%s))", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if msgTypeID64 == cacheObjSubType[`msg_pillage_loss`] {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if msgTypeID64 == cacheObjSubType[`msg_pillage_timeout`] {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
}
}
}
}
err = setJobDone(j.ID64)
logOnError(err, "jobSetDone : setJobDone")
return
}
// is the job outdated now ?
if time.Now().UTC().After(r.Date.Add(time.Minute*3 + time.Second*30)) {
// log.Printf("jobPillage :\n\tPillageTime : %s\n\tNowTime : %s\n", r.Date.Format(time.RFC3339), time.Now().UTC().Format(time.RFC3339))
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("Pillage interception expired"),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
return
}
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("No outcome for the pillage yet"),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
//no outcome yet, have we sent a "/go" in the last 30 sec ?
ids = getSQLListID64(` select ox.id
from obj ox
,obj_msg omx
,obj_job oj
where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + `
and omx.user_id = oj.user_id
and omx.sender_user_id = oj.user_id
and omx.obj_id = ox.id
and ox.obj_sub_type_id =` + strconv.FormatInt(cacheObjSubType[`msg_go`], 10) + `
and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`)
if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait
m, err := getObjMsg(ids[0])
logOnError(err, "jobPillage : getMsg")
if err == nil {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
}
err = rescheduleJob(j.ID64, j.Trigger, time.Now().Add(30*time.Second).UTC())
logOnError(err, "jobPillage : rescheduleJob(cacheObjSubType[`msg_go`], 10)")
} else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec
t, err := getObjSubTypeId(r.ObjID64)
logOnError(err, "jobPillage : getObjSubTypeId")
m, err := getObjMsg(r.ObjID64)
logOnError(err, "jobPillage : getObjMsg")
if t == cacheObjSubType[`msg_pillage_inc`] {
clientSendCWMsg(j.UserID64, "/go")
} else if t == cacheObjSubType[`msg_pillage_inc2`] {
if len(m.Callbacks) != 1 {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("More than one button (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else {
for _, c := range m.Callbacks {
if c.Name == `🧹Intervene` {
clientCallback(j.UserID64, m.ID64, m.ChatID64, c.Name, c.Data)
}
}
}
} else {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("Unknown pillage version (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
}
err = rescheduleJob(j.ID64, j.Trigger, time.Now().Add(30*time.Second).UTC())
logOnError(err, "jobPillage : rescheduleJob")
}
return
}
func jobMsgRefresh(j Job) {
var p JobPayloadMsgRefresh
// identify whether the message has been properly refreshed ? create new job ? reschedule same job ?
err := setJobStart(j.ID64)
logOnError(err, "jobMsgRefresh : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobMsgRefresh : Unmarshal payload")
m, err := getObjMsg(p.ObjID64)
if err != nil && strings.Compare(err.Error(), `sql: no rows in result set`) == 0 {
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
} else {
logOnError(err, "jobMsgRefresh : getObjMsg")
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
}
err = delObj(p.ObjID64)
logOnError(err, "jobMsgRefresh : delObj")
clientRefreshCWMsg(m.TGUserID64, m.ChatID64, m.ID64)
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
}
func jobMsgClient(j Job) {
var p JobPayloadMsgClient
err := setJobStart(j.ID64)
logOnError(err, "jobMsgClient : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobMsgClient : Unmarshal payload")
if err == nil {
clientSendCWMsg(j.UserID64, p.Text)
m := TGCommand{
Type: commandReplyMsg,
Text: "Message sent.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
}
func jobMsgFwd(j Job) {
var p JobPayloadMsgFwd
err := setJobStart(j.ID64)
logOnError(err, "jobFwdMsg : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobFwdMsg : Unmarshal payload")
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobFwdMsg : getObjMsg msg")
clientFwdCWMsg(j.UserID64, msg.ID64, msg.ChatID64, p.ChatID64)
err = setJobDone(j.ID64)
logOnError(err, "jobFwdMsg : setJobDone")
return
}
func jobMsgDelete(j Job) {
var p JobPayloadMsgDel
err := setJobStart(j.ID64)
logOnError(err, "jobMsgDel : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobMsgDel : Unmarshal payload")
b, _ := json.Marshal(p)
log.Printf("jobMsgDelete[%d] : %d : Payload => %s.\n", j.ID64, j.UserID64, string(b))
if j.Trigger != 0 && p.MsgTypeID64 != 0 {
logOnError(err, "jobMsgDel : getObjMsg msg")
id, err := getObjSubTypeId(j.Trigger)
logOnError(err, "jobMsgDelete : getObjSubTypeId("+strconv.FormatInt(j.Trigger, 10)+")")
if id == p.MsgTypeID64 {
if p.Delay == 0 {
obj, err := getObjMsg(j.Trigger)
logOnError(err, "jobMsgDelete : getObjMsg("+strconv.FormatInt(j.Trigger, 10)+")")
clientDelTGMsg(j.UserID64, obj.ID64, obj.ChatID64)
} else {
delay := p.Delay
p.Delay = 0
p.ObjMsgID64 = j.Trigger
b, _ := json.Marshal(p)
_, err = createJob(cacheObjSubType[`job_msg_del`], objJobPriority, j.UserID64, 0, time.Now().Add(delay).UTC(), b)
}
} else {
log.Printf("jobMsgDelete : cannot identify msg to delete")
}
} else if p.ObjMsgID64 != 0 {
if p.Delay == 0 {
obj, err := getObjMsg(p.ObjMsgID64)
logOnError(err, "jobMsgDelete : getObjMsg("+strconv.FormatInt(p.ObjMsgID64, 10)+")")
clientDelTGMsg(j.UserID64, obj.ID64, obj.ChatID64)
} else {
delay := p.Delay
p.Delay = 0
b, _ := json.Marshal(p)
_, err = createJob(cacheObjSubType[`job_msg_del`], objJobPriority, j.UserID64, 0, time.Now().Add(delay).UTC(), b)
}
}
err = setJobDone(j.ID64)
logOnError(err, "jobMsgDel : setJobDone")
return
}
func jobBackupExport(j Job) {
var p JobPayloadBackupExport
err := setJobStart(j.ID64)
logOnError(err, "jobBackupExport : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobBackupExport : Unmarshal payload")
bkp := DataBackup{}
start := time.Now()
milestone := time.Now()
s := new([]ChatWarsMessage)
msgs := *s
ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`)
txt := fmt.Sprintf("Backing up %d messages.", len(ids))
m := TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
i := 0
for _, id := range ids {
msg, err := getObjMsg(id)
logOnError(err, "jobBackupExport : getMsg")
if err == nil {
msgs = append(msgs, *msg)
}
i = i + 1
if time.Now().After(milestone.Add(1 * time.Minute)) {
txt := fmt.Sprintf("Exported %d/%d messages.", i, len(ids))
m = TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
milestone = time.Now()
}
}
bkp.Messages = msgs
b, err := json.Marshal(bkp)
logOnError(err, "jobBackupExport : Marshal")
m = TGCommand{
Type: commandReplyMsg,
Text: `Compressing archive`,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
zbuf := new(bytes.Buffer)
zw := zip.NewWriter(zbuf)
zf, err := zw.Create(`backup.json`)
logOnError(err, "jobBackupExport : Create")
_, err = zf.Write(b)
logOnError(err, "jobBackupExport : Write")
err = zw.Close()
logOnError(err, "jobBackupExport : Close")
d := tb.Document{}
d.File = tb.FromReader(bytes.NewReader(zbuf.Bytes()))
d.FileName = fmt.Sprintf("%s.backup.zip", start.Format("20060102150405"))
d.Caption = d.FileName
d.MIME = `application/zip`
m = TGCommand{
Type: commandReplyMsg,
Text: `Export done.`,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
m = TGCommand{
Type: commandSendDocument,
Document: d,
ToChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupExport : setJobDone")
return
}
func jobBackupImport(j Job) {
var p JobPayloadBackupImport
err := setJobStart(j.ID64)
logOnError(err, "jobBackupImport : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobBackupImport : Unmarshal payload")
resp, err := http.Get(p.URL)
logOnError(err, "jobBackupImport : Get")
defer resp.Body.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
m := TGCommand{
Type: commandReplyMsg,
Text: "File downloaded.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
z := buf.Bytes()
r := bytes.NewReader(z)
zr, err := zip.NewReader(r, int64(len(z)))
for _, f := range zr.File {
if strings.Compare(f.Name, "backup.json") == 0 {
rc, err := f.Open()
logOnError(err, "jobBackupImport : Open")
if err != nil {
return
}
data, err := ioutil.ReadAll(rc)
logOnError(err, "jobBackupImport : ReadAll")
if err != nil {
return
}
log.Printf("jobBackupImport : %d uncompressed bytes.\n", len(data))
rc.Close()
bkp := DataBackup{}
err = json.Unmarshal(data, &bkp)
logOnError(err, "jobBackupImport : Unmarshal")
if err != nil {
return
}
for _, msg := range bkp.Messages {
MQCWMsgQueue <- msg
}
m := TGCommand{
Type: commandReplyMsg,
Text: "Backup restored.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupImport : setJobDone")
return
}
}
m = TGCommand{
Type: commandReplyMsg,
Text: "Not backup file found in archive.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupImport : setJobDone")
return
}
func jobGStock(j Job) {
var (
p JobPayloadGStock
p2 JobPayloadGetVault
b []byte
resSize, resCount int64
alchSize, alchCount int64
miscSize, miscCount int64
recSize, recCount int64
partSize, partCount int64
otherSize, otherCount int64
totalSize int64
)
err := setJobStart(j.ID64)
logOnError(err, "jobGStock : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGStock : Unmarshal payload")
if p.Status == 0 {
p2.JobCallbackID64 = j.ID64
p2.ItemTypeList = make([]int64, 0)
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_res`])
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_alch`])
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_misc`])
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_recipe`])
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_part`])
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_other`])
b, err = json.Marshal(p2)
logOnError(err, "jobGStock : Marshal(p2)")
jobID64, err := createJob(cacheObjSubType[`job_get_vault`], objJobPriority, j.UserID64, 0, time.Now().UTC(), b)
p.Status = 1
p.VaultJobID64 = jobID64
b, err = json.Marshal(p)
logOnError(err, "jobGStock : Marshal(p)")
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobGStock : setJobPayloadJSON(p)")
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
return
}
b = getJobPayload(p.VaultJobID64)
err = json.Unmarshal(b, &p2)
logOnError(err, "jobGStock : Unmarshal(p2)")
for _, v := range p2.Vault {
item, err := getObjItem(v.ItemID64)
logOnError(err, "jobGStock : getObjItem")
if err == nil {
if item.Weight != -1 {
totalSize += item.Weight * v.Quantity
switch item.ItemTypeID {
case cacheObjSubType[`item_res`]:
resSize += item.Weight * v.Quantity
resCount += v.Quantity
case cacheObjSubType[`item_alch`]:
alchSize += item.Weight * v.Quantity
alchCount += v.Quantity
case cacheObjSubType[`item_misc`]:
miscSize += item.Weight * v.Quantity
miscCount += v.Quantity
case cacheObjSubType[`item_recipe`]:
recSize += item.Weight * v.Quantity
recCount += v.Quantity
case cacheObjSubType[`item_part`]:
partSize += item.Weight * v.Quantity
partCount += v.Quantity
case cacheObjSubType[`item_other`]:
otherSize += item.Weight * v.Quantity
otherCount += v.Quantity
}
} else {
w := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("Unknown weight for item : %s - %s\n", item.Code, item.Names[0]),
ToUserID64: cfg.Bot.Admin,
}
TGCmdQueue <- w
}
}
}
txt := fmt.Sprintf("<code>Current stock [%d/38000] :\n - Resources : %d (%d)\n - Alchemist : %d (%d)\n - Misc stuff : %d (%d)\n - Recipes : %d (%d)\n - Parts : %d (%d)\n - Other : %d (%d)</code>\n", totalSize, resSize, resCount, alchSize, alchCount, miscSize, miscCount, recSize, recCount, partSize, partCount, otherSize, otherCount)
m := TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobGStock : setJobDone")
return
}
func jobGDepositForward(j Job) {
var p JobPayloadGDepositForward
err := setJobStart(j.ID64)
logOnError(err, "jobGDepositForward : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGDepositForward : Unmarshal payload")
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGDepositForward : getObjMsg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGDepositForward : getMsgParsingRule")
cwm, err := parseSubTypeMessageGDepositReq(msg, rule.re)
if cwm.ItemID64 == p.ItemID64 && cwm.Quantity == p.Quantity {
//log.Printf("jobGDepositForward : match (%d / %d).\n", cwm.ItemID64, cwm.Quantity)
gDepositForwardMux.Lock()
gDepositForwardMsg = append(gDepositForwardMsg, j.Trigger)
gDepositForwardMux.Unlock()
err = setJobDone(j.ID64)
logOnError(err, "jobGDepositForward : setJobDone")
} else {
//log.Printf("jobGDepositForward : found (%d / %d), expected (%d / %d).\n", cwm.ItemID64, cwm.Quantity, p.ItemID64, p.Quantity)
err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
logOnError(err, "jobGDepositForward : rescheduleJob")
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_g_deposit_req`])
}
return
}
func jobGDeposit(j Job) {
var p JobPayloadGDeposit
err := setJobStart(j.ID64)
logOnError(err, "jobGDeposit : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGDeposit : Unmarshal payload")
if p.Status == 0 { /* handle remaining resources to be stored */
var res, misc, alch, craft, equip bool = false, false, false, false, false
var delay time.Duration = 0 * time.Second
var b []byte
if len(p.ResObjID64) > 0 {
for i := range p.ResObjID64 {
obj, err := getObjItem(p.ResObjID64[i])
logOnError(err, "jobGDeposit : getObjItem")
if err == nil {
switch obj.ItemTypeID {
case cacheObjSubType[`item_res`]:
res = true
case cacheObjSubType[`item_alch`]:
alch = true
case cacheObjSubType[`item_misc`]:
misc = true
case cacheObjSubType[`item_recipe`]:
craft = true
case cacheObjSubType[`item_part`]:
craft = true
case cacheObjSubType[`item_other`]:
equip = true
case cacheObjSubType[`item_unique`]:
equip = true
default:
}
}
}
}
if res {
clientSendCWMsgDelay(p.ChatID64, `📦Resources`, delay)
p.Status = cacheObjSubType[`msg_stock_ack`]
b, _ = json.Marshal(&p)
err = createJobCallback(cacheObjSubType[`job_gdeposit`], j.UserID64, cacheObjSubType[`msg_stock_ack`], b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if alch {
clientSendCWMsgDelay(p.ChatID64, `Alchemy`, delay)
p.Status = 1 // FIXME UPDATE WITH PROPER TYPE
b, _ = json.Marshal(&p)
err = createJobCallback(cacheObjSubType[`job_gdeposit`], j.UserID64, cacheObjSubType[`msg_orderbook_acl`], b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if misc {
clientSendCWMsgDelay(p.ChatID64, `🗃Misc`, delay)
p.Status = 1 // FIXME UPDATE WITH PROPER TYPE
b, _ = json.Marshal(&p)
err = createJobCallback(cacheObjSubType[`job_gdeposit`], j.UserID64, cacheObjSubType[`msg_orderbook_acl`], b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if craft {
clientSendCWMsgDelay(p.ChatID64, `⚒Crafting`, delay)
p.Status = cacheObjSubType[`msg_stock_any_ack`]
b, _ = json.Marshal(&p)
err = createJobCallback(cacheObjSubType[`job_gdeposit`], j.UserID64, cacheObjSubType[`msg_stock_any_ack`], b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if equip {
clientSendCWMsgDelay(p.ChatID64, `🏷Equipment`, delay)
p.Status = 1 // FIXME UPDATE WITH PROPER TYPE
b, _ = json.Marshal(&p)
err = createJobCallback(cacheObjSubType[`job_gdeposit`], j.UserID64, cacheObjSubType[`msg_orderbook_acl`], b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
return
} else if p.Status == 1 { /* handle that one resource from the cacheObjSubType[`msg_orderbook_acl`] msg */
log.Printf("jobGDeposit : 1 : %d.\n", j.Trigger)
} else if p.Status == cacheObjSubType[`msg_stock_ack`] {
//log.Printf("jobGDeposit : cacheObjSubType[`msg_stock_ack`] : %d.\n", j.Trigger)
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGDeposit : getObjMsg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGDeposit : getMsgParsingRule")
cwm, err := parseSubTypeMessageStockAck(msg, rule.re)
for stockIdx := range cwm.Stock {
for resIdx := range p.ResObjID64 {
if cwm.Stock[stockIdx].ItemID64 == p.ResObjID64[resIdx] {
//log.Printf("jobGDeposit : cacheObjSubType[`msg_stock_ack`] : Matching ItemID %d (%d).\n", p.ResObjID64[resIdx], cwm.Stock[stockIdx].Quantity)
item, _ := getObjItem(p.ResObjID64[resIdx])
clientSendCWMsg(p.ChatID64, fmt.Sprintf("/g_deposit %s %d", item.Code, cwm.Stock[stockIdx].Quantity))
p2 := JobPayloadGDepositForward{
ItemID64: p.ResObjID64[resIdx],
Quantity: cwm.Stock[stockIdx].Quantity,
}
b2, _ := json.Marshal(p2)
err = createJobCallback(cacheObjSubType[`job_gdeposit_fwd`], j.UserID64, cacheObjSubType[`msg_g_deposit_req`], b2, time.Duration(1*time.Minute))
}
}
}
} else if p.Status == cacheObjSubType[`msg_stock_any_ack`] {
log.Printf("jobGDeposit : cacheObjSubType[`msg_stock_any_ack`] : %d.\n", j.Trigger)
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGDeposit : getObjMsg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGDeposit : getMsgParsingRule")
cwm, err := parseSubTypeMessageStockAnyAck(msg, rule.re)
for stockIdx := range cwm.Stock {
for resIdx := range p.ResObjID64 {
if cwm.Stock[stockIdx].ItemID64 == p.ResObjID64[resIdx] {
log.Printf("jobGDeposit : cacheObjSubType[`msg_stock_any_ack`] : Matching ItemID %d (%d).\n", p.ResObjID64[resIdx], cwm.Stock[stockIdx].Quantity)
item, _ := getObjItem(p.ResObjID64[resIdx])
clientSendCWMsg(p.ChatID64, fmt.Sprintf("/g_deposit %s %d", item.Code, cwm.Stock[stockIdx].Quantity))
p2 := JobPayloadGDepositForward{
ItemID64: p.ResObjID64[resIdx],
Quantity: cwm.Stock[stockIdx].Quantity,
}
b2, _ := json.Marshal(p2)
err = createJobCallback(cacheObjSubType[`job_gdeposit_fwd`], j.UserID64, cacheObjSubType[`msg_g_deposit_req`], b2, time.Duration(1*time.Minute))
}
}
}
}
err = setJobDone(j.ID64)
logOnError(err, "jobGDeposit : setJobDone")
return
}
func jobVaultItemStatus(j Job) {
var (
p JobPayloadVaultItemStatus
itemID64, currentItemID64 int64
user, deposit, withdraw int64
userList, depositList, withdrawList []int64
)
err := setJobStart(j.ID64)
logOnError(err, "jobVaultItemStatus : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobVaultItemStatus : Unmarshal payload")
stmt := `SELECT x.item_id
,x.user_id
,(SELECT COALESCE(SUM(omv.quantity), 0)
FROM obj_msg_vault_v omv
WHERE omv.user_id = x.user_id
AND omv.item_id = x.item_id
AND omv.msg_type_id = ` + strconv.FormatInt(cacheObjSubType[`msg_g_deposit_ack`], 10) + `
AND omv.chat_id = x.chat_id) deposit
,(SELECT COALESCE(SUM(omv.quantity), 0)
FROM obj_msg_vault_v omv
WHERE omv.user_id = x.user_id
AND omv.item_id = x.item_id
AND omv.msg_type_id = ` + strconv.FormatInt(cacheObjSubType[`msg_withdraw_rcv`], 10) + `
AND omv.chat_id = x.chat_id) withdraw
FROM (SELECT DISTINCT
omv.user_id
,omv.chat_id
,omv.item_id
FROM obj_msg_vault_v omv
WHERE omv.chat_id = ?
AND omv.item_id in (?` + strings.Repeat(",?", len(p.ItemListID64)-1) + `)) x
ORDER BY x.user_id ASC;`
args := make([]interface{}, len(p.ItemListID64)+1)
args[0] = p.DepositChatID64
for i, id := range p.ItemListID64 {
args[i+1] = id
}
rows, err := db.Query(stmt, args...)
logOnError(err, "jobVaultItemStatus : Get rows")
if err != nil {
err = setJobDone(j.ID64)
logOnError(err, "jobVaultItemStatus : setJobDone")
return
}
currentItemID64 = 0
for rows.Next() {
err = rows.Scan(&itemID64, &user, &deposit, &withdraw)
logOnError(err, "jobVaultItemStatus : scan next val")
if itemID64 != currentItemID64 {
if currentItemID64 != 0 {
// display info
out := fmt.Sprintf("<code>%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `User`)
for i, userId := range userList {
logOnError(err, "jobVaultItemStatus : getObjItem")
chat, err := bot.ChatByID(strconv.FormatInt(userId, 10))
logOnError(err, "jobVaultItemStatus : ChatByID")
if err == nil {
out = fmt.Sprintf("%s@%-31s |%6d |%6d |%6d\n", out, chat.Username, depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
} else {
out = fmt.Sprintf("%s#%-31d |%6d |%6d |%6d\n", out, userId, depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
}
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandSendMsg,
Text: out,
ToChatID64: p.UserID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
}
currentItemID64 = itemID64
userList = nil
depositList = nil
withdrawList = nil
}
userList = append(userList, user)
depositList = append(depositList, deposit)
withdrawList = append(withdrawList, withdraw)
}
if currentItemID64 != 0 {
// display info
out := fmt.Sprintf("<code>%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `User`)
for i, userId := range userList {
logOnError(err, "jobVaultItemStatus : getObjItem")
chat, err := bot.ChatByID(strconv.FormatInt(userId, 10))
logOnError(err, "jobVaultItemStatus : ChatByID")
if err == nil {
out = fmt.Sprintf("%s@%-31s |%6d |%6d |%6d\n", out, chat.Username, depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
} else {
out = fmt.Sprintf("%s#%-31d |%6d |%6d |%6d\n", out, userId, depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
}
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandSendMsg,
Text: out,
ToChatID64: p.UserID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
} else {
c := TGCommand{
Type: commandSendMsg,
Text: "Nothing to report",
ToChatID64: p.UserID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
}
err = rows.Err()
logOnError(err, "jobVaultItemStatus : query end")
rows.Close()
err = setJobDone(j.ID64)
logOnError(err, "jobVaultItemStatus : setJobDone")
return
}
func jobVaultUserStatus(j Job) {
var (
p JobPayloadVaultUserStatus
userID64, currentUserID64 int64
itemID64, deposit, withdraw int64
itemList, depositList, withdrawList []int64
)
err := setJobStart(j.ID64)
logOnError(err, "jobVaultUserStatus : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobVaultUserStatus : Unmarshal payload")
stmt := `SELECT x.user_id
,x.item_id
,(SELECT COALESCE(SUM(omv.quantity), 0)
FROM obj_msg_vault_v omv
WHERE omv.user_id = x.user_id
AND omv.item_id = x.item_id
AND omv.msg_type_id = ` + strconv.FormatInt(cacheObjSubType[`msg_g_deposit_ack`], 10) + `
AND omv.chat_id = x.chat_id) deposit
,(SELECT COALESCE(SUM(omv.quantity), 0)
FROM obj_msg_vault_v omv
WHERE omv.user_id = x.user_id
AND omv.item_id = x.item_id
AND omv.msg_type_id = ` + strconv.FormatInt(cacheObjSubType[`msg_withdraw_rcv`], 10) + `
AND omv.chat_id = x.chat_id) withdraw
FROM (SELECT DISTINCT
omv.user_id
,omv.chat_id
,omv.item_id
FROM obj_msg_vault_v omv
WHERE omv.chat_id = ?
AND omv.user_id IN (?` + strings.Repeat(",?", len(p.UserListID64)-1) + `)
AND omv.item_type_id IN (?` + strings.Repeat(",?", len(p.ItemTypeListID64)-1) + `)) x
ORDER BY x.user_id ASC
,(SELECT oi.intl_id FROM obj_item oi WHERE oi.obj_id = x.item_id) ASC;`
args := make([]interface{}, len(p.UserListID64)+len(p.ItemTypeListID64)+1)
args[0] = p.DepositChatID64
for i, id := range p.UserListID64 {
args[i+1] = id
}
for i, id := range p.ItemTypeListID64 {
args[i+1+len(p.UserListID64)] = id
}
rows, err := db.Query(stmt, args...)
logOnError(err, "jobVaultUserStatus : Get rows")
if err != nil {
err = setJobDone(j.ID64)
logOnError(err, "jobVaultUserStatus : setJobDone")
return
}
currentUserID64 = 0
for rows.Next() {
err = rows.Scan(&userID64, &itemID64, &deposit, &withdraw)
logOnError(err, "jobVaultUserStatus : scan next val")
if userID64 != currentUserID64 {
if currentUserID64 != 0 {
// display info
out := fmt.Sprintf("<code>%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `Item`)
for i, itemId := range itemList {
item, err := getObjItem(itemId)
logOnError(err, "jobVaultUserStatus : getObjItem")
out = fmt.Sprintf("%s%-32s |%6d |%6d |%6d\n", out, item.Names[0], depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandSendMsg,
Text: out,
ToChatID64: p.UserID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
}
currentUserID64 = userID64
itemList = nil
depositList = nil
withdrawList = nil
}
itemList = append(itemList, itemID64)
depositList = append(depositList, deposit)
withdrawList = append(withdrawList, withdraw)
}
if currentUserID64 != 0 {
//display info
out := fmt.Sprintf("<code>%-32s | Depo. | Recv. | Total\n────────────────────────────┼──────┼──────┼──────\n", `Item`)
for i, itemId := range itemList {
item, err := getObjItem(itemId)
logOnError(err, "jobVaultUserStatus : getObjItem")
out = fmt.Sprintf("%s%-32s |%6d |%6d |%6d\n", out, item.Names[0], depositList[i], withdrawList[i], depositList[i]-withdrawList[i])
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandSendMsg,
Text: out,
ToChatID64: p.UserID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
}
err = rows.Err()
logOnError(err, "jobVaultUserStatus : query end")
rows.Close()
err = setJobDone(j.ID64)
logOnError(err, "jobVaultUserStatus : setJobDone")
return
}
func jobGWithdraw(j Job) {
var (
p JobPayloadGWithdraw
p2 JobPayloadGetVault
b []byte
vault map[string]int64
)
log.Printf("jobGWithdraw[%d] : Starting handling job.\n", j.ID64)
err := setJobStart(j.ID64)
logOnError(err, "jobGWithdraw : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGWithdraw : Unmarshal payload")
if p.Status == 0 {
p2.JobCallbackID64 = j.ID64
p2.ItemTypeList = make([]int64, 0)
for k, item := range p.Items {
id := getSilentObjItemID(item.Code, ``)
if id != 0 {
obj, _ := getObjItem(id)
p2.ItemTypeList = append(p2.ItemTypeList, obj.ItemTypeID)
} else if ok, _ := regexp.MatchString(`^u[0-9]+$`, item.Code); ok {
p.Items[k].Inspect = true
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_other`])
}
}
b, err = json.Marshal(p2)
logOnError(err, "jobGWithdraw : Marshal(p2)")
jobID64, err := createJob(cacheObjSubType[`job_get_vault`], objJobPriority, j.UserID64, 0, time.Now().UTC(), b)
p.Status = 1
p.VaultJobID64 = jobID64
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobGWithdraw : setJobPayloadJSON(p)")
log.Printf("jobGWithdraw[%d] : Calling GetVault job.\n", j.ID64)
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
return
} else if p.Status == 1 {
/* loop through items and get unique/inspect */
b = getJobPayload(p.VaultJobID64)
err = json.Unmarshal(b, &p2)
logOnError(err, "jobGStock : Unmarshal(p2)")
vault = make(map[string]int64)
for _, i := range p2.Vault {
vault[i.Code] = i.Quantity
}
for k, req := range p.Items {
ava, _ := vault[req.Code]
p.Items[k].Available = ava
if ok, _ := regexp.MatchString(`^u[0-9]+$`, req.Code); ok {
if ava > 0 {
p.Items[k].Inspect = false
} else if p.Inspecting == `` {
p.Inspecting = req.Code
}
}
}
p.Status = 2
err = setJobPayloadJSON(j.ID64, p)
log.Printf("jobGWithdraw[%d] : received GetVault job.\n", j.ID64)
if p.Inspecting != `` {
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_g_inspect_req`])
setJobCallback(j.ID64, int64(bot.Me.ID), cacheObjSubType[`msg_inspect_ack`])
setJobCallback(j.ID64, int64(bot.Me.ID), cacheObjSubType[`msg_invalid_action`])
clientSendCWMsg(j.UserID64, fmt.Sprintf("/g_inspect_%s", p.Inspecting))
log.Printf("jobGWithdraw[%d] : Inspecting missing unique item.\n", j.ID64)
return
}
}
if j.Trigger != 0 {
id, err := getObjSubTypeId(j.Trigger)
logOnError(err, "jobGWithdraw : getObjSubType("+strconv.FormatInt(j.Trigger, 10)+")")
if err == nil {
m, err := getObjMsg(j.Trigger)
logOnError(err, "jobGWithdraw : getObjMsg")
rule, err := getMsgParsingRule(m)
logOnError(err, "jobGWithdraw : getMsgParsingRule")
p.CleanupMsg = append(p.CleanupMsg, *m)
switch id {
case cacheObjSubType[`msg_g_inspect_req`]:
log.Printf("jobGWithdraw[%d] : Deleting unique inspect req.\n", j.ID64)
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobGWithdraw : setJobPayloadJSON")
err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
logOnError(err, "jobGWithdraw : rescheduleJob")
case cacheObjSubType[`msg_inspect_ack`]:
log.Printf("jobGWithdraw[%d] : Handling unique inspect ack.\n", j.ID64)
cwm, err := parseSubTypeMessageInspectAck(m, rule.re)
logOnError(err, "jobGWithdraw : parseSubTypeMessageInspectAck")
for k, req := range p.Items {
if req.Code == p.Inspecting {
p.Items[k].Available = 1
p.Items[k].Name = cwm.Name
p.Items[k].Inspect = false
break
}
}
p.Inspecting = ``
for k, req := range p.Items {
if req.Inspect {
p.Inspecting = req.Code
p.Items[k].Inspect = false
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobGWithdraw : setJobPayloadJSON")
err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
logOnError(err, "jobGWithdraw : rescheduleJob")
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_g_inspect_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_invalid_action`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_inspect_ack`])
clientSendCWMsg(j.UserID64, fmt.Sprintf("/g_inspect_%s", p.Inspecting))
return
}
}
case cacheObjSubType[`msg_invalid_action`]:
log.Printf("jobGWithdraw[%d] : Handling invalid unique item.\n", j.ID64)
for k, req := range p.Items {
if req.Code == p.Inspecting {
p.Items[k].Available = 0
p.Items[k].Inspect = false
break
}
}
p.Inspecting = ``
for k, req := range p.Items {
if req.Inspect {
p.Inspecting = req.Code
p.Items[k].Inspect = false
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobGWithdraw : setJobPayloadJSON")
err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
logOnError(err, "jobGWithdraw : rescheduleJob")
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_g_inspect_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_invalid_action`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_inspect_ack`])
clientSendCWMsg(j.UserID64, fmt.Sprintf("/g_inspect_%s", p.Inspecting))
logOnError(err, "jobGWithdraw : setJobDone")
return
}
}
case cacheObjSubType[`msg_msg_job_gwithdraw_ack`]:
log.Printf("jobGWithdraw[%d] : Handling withdrawal validation ack.\n", j.ID64)
cwm, err := parseSubTypeMessageJobGWithdrawAck(m, rule.re)
logOnError(err, "jobGWithdraw : parseSubTypeMessageJobGWithdrawAck")
in, err := hex.DecodeString(cwm.Ref)
logOnError(err, "msgJobGWithdrawAck : DecodeString")
sha256 := sha256.Sum256([]byte(cfg.Telegram.Token))
sha128 := sha256[:aes.BlockSize]
c, err := aes.NewCipher(sha128)
out := make([]byte, len(in))
c.Decrypt(out, in)
uid, _ := binary.Uvarint(out[:8])
jobID64 := int64(uid)
uid, _ = binary.Uvarint(out[8:16])
userID64 := int64(uid)
if jobID64 == j.ID64 {
if userID64 == cwm.Msg.TGSenderUserID64 {
cmd := TGCommand{
Type: commandReplyMsg,
Text: "You cannot validate your own withdrawl",
FromMsgID64: cwm.Msg.ID64,
FromChatID64: cwm.Msg.ChatID64,
}
TGCmdQueue <- cmd
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobGWithdraw : setJobPayloadJSON")
err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
logOnError(err, "jobGWithdraw : rescheduleJob")
setJobCallback(j.ID64, int64(bot.Me.ID), cacheObjSubType[`msg_job_gwithdraw_ack`])
return
} else {
p.Validated = true
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_withdraw_code`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_withdraw_req`])
// TODO
}
} else {
err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
logOnError(err, "jobGWithdraw : rescheduleJob")
setJobCallback(j.ID64, int64(bot.Me.ID), cacheObjSubType[`msg_job_gwithdraw_ack`])
return
}
return
case cacheObjSubType[`msg_withdraw_code`]:
log.Printf("jobGWithdraw[%d] : Handling withdraw code.\n", j.ID64)
if false /* check if it's ours */ {
for _, d := range p.CleanupMsg {
if d.TGSenderUserID64 == int64(bot.Me.ID) {
delmsg := tb.StoredMessage{
MessageID: fmt.Sprintf("%d", d.ID64),
ChatID: d.ChatID64,
}
err = bot.Delete(delmsg)
logOnError(err, "jobGWithdraw : Delete")
} else {
}
}
p.CleanupMsg = []ChatWarsMessage{}
} else {
err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
logOnError(err, "jobGWithdraw : rescheduleJob")
setJobCallback(j.ID64, int64(bot.Me.ID), cacheObjSubType[`msg_job_gwithdraw_ack`])
}
default:
log.Printf("jobGWithdraw[%d] : No handling for this message.\n", j.ID64)
}
}
}
/*
c, err := getLockedRoleClient(`commander`)
logOnError(err, "jobGWithdraw: getLockedRoleClient(commander)")
if err == nil {
c.Mux.Unlock()
}
*/
log.Printf("jobGWithdraw[%d] : Preparing withdrawal guild link.\n", j.ID64)
var stock string
for _, i := range p.Items {
if i.Available > i.Required {
stock = fmt.Sprintf("%s\n%d x %s", stock, i.Required, i.Name)
} else if i.Available > 0 {
stock = fmt.Sprintf("%s\n%d x %s", stock, i.Available, i.Name)
}
}
if len(stock) > 0 {
err := setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobGWithdraw : setJobPayloadJSON")
setJobCallback(j.ID64, int64(bot.Me.ID), cacheObjSubType[`msg_job_gwithdraw_ack`])
sha256 := sha256.Sum256([]byte(cfg.Telegram.Token))
sha128 := sha256[:aes.BlockSize]
c, err := aes.NewCipher(sha128)
in := make([]byte, 0)
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(j.ID64))
in = append(in, buf...)
binary.LittleEndian.PutUint64(buf, uint64(p.UserID64))
in = append(in, buf...)
out := make([]byte, len(in))
ref := hex.EncodeToString(in)
log.Printf("jobGWithdraw[%d] : in string : %s.\n", j.ID64, ref)
c.Encrypt(out, in)
ref = hex.EncodeToString(out)
log.Printf("jobGWithdraw[%d] : out string : %s.\n", j.ID64, ref)
ref = hex.EncodeToString(out)
m, err := getObjMsg(j.Trigger)
logOnError(err, "jobGWithdraw : getObjMsg")
p.CleanupMsg = append(p.CleanupMsg, *m)
b, err = json.Marshal(p)
log.Printf("jobGWithdraw[%d] : %s\n", j.ID64, string(b))
u, err := bot.ChatByID(fmt.Sprintf("%d", p.UserID64))
logOnError(err, "jobGWithdraw : ChatByID")
msg := fmt.Sprintf("Click to validate @%s's withdrawal of<code>%s</code>\n/withdraw_%s", u.Username, stock, string(ref))
cmd := TGCommand{
Type: commandSendMsg,
Text: msg,
ToChatID64: cfg.Bot.Mainchat,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- cmd
return
} else {
cmd := TGCommand{
Type: commandReplyMsg,
Text: "No stock available whatsoever",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- cmd
}
err = setJobDone(j.ID64)
logOnError(err, "jobGWithdraw : setJobDone")
return
}
func jobSetDef(j Job) {
var p JobPayloadSetDef
err := setJobStart(j.ID64)
logOnError(err, "jobSetDef : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobSetDef : Unmarshal payload")
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobSetDef : getObjMsg msg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobSetDef : getMsgParsingRule")
cwm, err := parseSubTypeMessageMeAck(msg, rule.re)
if cwm.State == `🛌Rest` {
clientSendCWMsg(j.UserID64, `🛡Defend`)
}
err = setJobDone(j.ID64)
logOnError(err, "jobSetDef : setJobDone")
return
}
func jobGetHammerTime(j Job) {
var p JobPayloadSetDef
err := setJobStart(j.ID64)
logOnError(err, "jobGetHammerTime : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGetHammerTime : Unmarshal payload")
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGetHammerTime : getObjMsg msg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGetHammerTime : getMsgParsingRule")
cwm, err := parseSubTypeMessageTimeAck(msg, rule.re)
out := ``
if hammerTimeNow(cwm.TimeOfDay, cwm.Weather) {
if hammerTimeNext(cwm.TimeOfDay, cwm.WeatherNext) ||
hammerTimeNext(cwm.TimeOfDay, cwm.Weather) {
out = `Perfect weather for the next 2 hours, possibly 4.`
} else {
out = `Perfect weather only for the next 2 hours.`
}
c := TGCommand{
Type: commandSendMsg,
Text: out,
ToChatID64: cfg.Bot.Mainchat,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
}
/*
} else {
if hammerTimeNext(cwm.TimeOfDay, cwm.WeatherNext) ||
hammerTimeNext(cwm.TimeOfDay, cwm.Weather) {
out = `Perfect weather maybe in 2 hours.`
} else {
out = `No perfect weather in sight for the next 4 hours.`
}
}
*/
err = setJobDone(j.ID64)
logOnError(err, "jobGetHammerTime : setJobDone")
return
}
func jobCraftItem(j Job) {
var (
p JobPayloadCraftItem
p2 JobPayloadGetVault
b []byte
item *ChatWarsItem
totalMana int64
requiredItems map[string]int64
missingItems map[string]int64
availableItems map[string]int64
craftItems map[string]int64
)
err := setJobStart(j.ID64)
logOnError(err, "jobCraftItem : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobCraftItem : Unmarshal payload")
if p.Status == 0 {
p2.JobCallbackID64 = j.ID64
p2.ItemTypeList = make([]int64, 0)
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_res`])
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_recipe`])
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_part`])
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_alch`])
b, err = json.Marshal(p2)
logOnError(err, "jobCraftItem : Marshal(p2)")
jobID64, err := createJob(cacheObjSubType[`job_get_vault`], objJobPriority, j.UserID64, 0, time.Now().UTC(), b)
p.Status = 1
p.VaultJobID64 = jobID64
b, err = json.Marshal(p)
logOnError(err, "jobCraftItem : Marshal(p)")
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobCraftItem : setJobPayloadJSON(p)")
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
return
}
b = getJobPayload(p.VaultJobID64)
err = json.Unmarshal(b, &p2)
logOnError(err, "jobCraftItem : Unmarshal(p2)")
item, err = getObjItem(p.ObjItemID64)
logOnError(err, "jobCraftItem : getObjItem")
availableItems = make(map[string]int64)
requiredItems = make(map[string]int64)
missingItems = make(map[string]int64)
craftItems = make(map[string]int64)
totalMana = item.Craft.Mana * p.Quantity
for _, v := range item.Craft.Items {
requiredItems[v.Code] = v.Quantity * p.Quantity
missingItems[v.Code] = 0
craftItems[v.Code] = 0
}
for _, v := range p2.Vault {
availableItems[v.Code] = v.Quantity
}
update := true
for update {
update = false
for code, req := range requiredItems {
ava, _ := availableItems[code]
craft, _ := craftItems[code]
missing, _ := missingItems[code]
if (ava + craft + missing) < req {
update = true
obj, err := getObjItem(getObjItemID(code, ``))
logOnError(err, "jobCraftItem : getObjItem")
if obj.Craft != nil {
craftItems[code] = req - ava
totalMana = totalMana + (req-ava)*obj.Craft.Mana
for _, v := range obj.Craft.Items {
req2, _ := requiredItems[v.Code]
requiredItems[v.Code] = req2 + v.Quantity*(req-ava)
}
} else {
if obj.Craftable {
w := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("Item missing recipe : %s\n", code),
ToUserID64: cfg.Bot.Admin,
}
TGCmdQueue <- w
}
missingItems[code] = req - ava
}
}
}
}
/* we can finish the job */
out := fmt.Sprintf("<code>Summary for %d %s\n", p.Quantity, item.Names[0])
out = fmt.Sprintf("%s Mana : %d\n", out, totalMana)
out = fmt.Sprintf("%s Items :\n", out)
for k, v := range requiredItems {
obj, _ := getObjItem(getObjItemID(k, ``))
ava, _ := availableItems[k]
out = fmt.Sprintf("%s [%s] %s : %d (%d)\n", out, obj.Code, obj.Names[0], v, ava)
}
out = fmt.Sprintf("%s Missing :\n", out)
for k, v := range missingItems {
if v > 0 {
obj, _ := getObjItem(getObjItemID(k, ``))
out = fmt.Sprintf("%s [%s] %s : %d\n", out, obj.Code, obj.Names[0], v)
}
}
out = fmt.Sprintf("%s To craft :\n", out)
for k, v := range craftItems {
if v > 0 {
obj, _ := getObjItem(getObjItemID(k, ``))
out = fmt.Sprintf("%s [%s] %s : %d\n", out, obj.Code, obj.Names[0], v)
}
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandReplyMsg,
Text: out,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
err = setJobDone(j.ID64)
logOnError(err, "jobCraftItem : setJobDone")
return
}
func jobCraftAll(j Job) {
var (
p JobPayloadCraftAll
p2 JobPayloadGetVault
b []byte
itemParts map[int64]string
itemRecipes map[int64]string
ratioItems map[string]int64
totalParts map[string]int64
totalRecipes map[string]int64
completeItems map[string]float64
maxItems int64
)
err := setJobStart(j.ID64)
logOnError(err, "jobCraftAll : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobCraftAll : Unmarshal payload")
if p.Status == 0 {
p2.JobCallbackID64 = j.ID64
p2.ItemTypeList = make([]int64, 0)
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_recipe`])
p2.ItemTypeList = append(p2.ItemTypeList, cacheObjSubType[`item_part`])
b, err = json.Marshal(p2)
logOnError(err, "jobCraftAll : Marshal(p2)")
jobID64, err := createJob(cacheObjSubType[`job_get_vault`], objJobPriority, j.UserID64, 0, time.Now().UTC(), b)
p.Status = 1
p.VaultJobID64 = jobID64
b, err = json.Marshal(p)
logOnError(err, "jobCraftAll : Marshal(p)")
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobCraftAll : setJobPayloadJSON(p)")
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
return
}
b = getJobPayload(p.VaultJobID64)
err = json.Unmarshal(b, &p2)
logOnError(err, "jobCraftAll : Unmarshal(p2)")
itemParts = make(map[int64]string)
itemRecipes = make(map[int64]string)
ratioItems = make(map[string]int64)
totalParts = make(map[string]int64)
totalRecipes = make(map[string]int64)
completeItems = make(map[string]float64)
muxObjItem.RLock()
for _, o := range objItems {
if o.ItemTypeID == cacheObjSubType[`item_other`] {
if o.Craftable && o.Craft != nil {
for _, i := range o.Craft.Items {
p, _ := getObjItem(i.ItemID64)
if p.ItemTypeID == cacheObjSubType[`item_part`] {
ratioItems[o.Code] = i.Quantity
itemParts[p.ObjID64] = o.Code
} else if p.ItemTypeID == cacheObjSubType[`item_recipe`] {
itemRecipes[p.ObjID64] = o.Code
}
}
}
}
}
muxObjItem.RUnlock()
for _, i := range p2.Vault {
if item, ok := itemParts[i.ItemID64]; ok {
totalParts[item] = i.Quantity
} else if item, ok := itemRecipes[i.ItemID64]; ok {
totalRecipes[item] = i.Quantity
}
}
for k, i := range ratioItems {
recipes, _ := totalRecipes[k]
parts, _ := totalParts[k]
if (recipes > 0 && parts > (i-1)) || (parts >= i) {
completeItems[k] = float64(MinInt64(recipes*i, parts) / i)
maxItems = MaxInt64(maxItems, int64(completeItems[k]))
}
}
/* we can finish the job */
out := fmt.Sprintf("<code>Vault crafting summary\n")
for maxItems > 0 {
out = fmt.Sprintf("%s%d Items :\n", out, maxItems)
for k, v := range completeItems {
if maxItems == int64(v) {
o, _ := getObjItem(getSilentObjItemID(k, ``))
out = fmt.Sprintf("%s %s - %s\n", out, k, o.Names[0])
}
}
maxItems -= 1
}
out = fmt.Sprintf("%s1 Part missing :\n", out)
for k, v := range completeItems {
if int64(v) == 0 {
parts, _ := totalParts[k]
if parts == ratioItems[k]-1 {
o, _ := getObjItem(getSilentObjItemID(k, ``))
out = fmt.Sprintf("%s %s - %s\n", out, k, o.Names[0])
}
}
}
out = fmt.Sprintf("%sRecipe missing :\n", out)
for k, v := range completeItems {
if int64(v) == 0 {
recipe, _ := totalRecipes[k]
if recipe == 0 {
o, _ := getObjItem(getSilentObjItemID(k, ``))
out = fmt.Sprintf("%s %s - %s\n", out, k, o.Names[0])
}
}
}
out = fmt.Sprintf("%s</code>", out)
c := TGCommand{
Type: commandReplyMsg,
Text: out,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
err = setJobDone(j.ID64)
logOnError(err, "jobCraftAll : setJobDone")
return
}
func jobCheckVaultLimit(j Job) {
var (
p JobPayloadCheckVaultLimit
p2 JobPayloadGetVault
b []byte
)
err := setJobStart(j.ID64)
logOnError(err, "jobCheckVaultLimit : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobCheckVaultLimit : Unmarshal payload")
if p.Status == 0 {
p2.JobCallbackID64 = j.ID64
p2.ItemTypeList = make([]int64, 0)
for _, c := range cfg.Bot.VaultLimit {
o, err := getObjItem(getSilentObjItemID(c.Item, ``))
logOnError(err, "jobCheckVaultLimit : getObjItem")
p2.ItemTypeList = append(p2.ItemTypeList, o.ItemTypeID)
}
b, err = json.Marshal(p2)
logOnError(err, "jobCheckVaultLimit : Marshal(p2)")
jobID64, err := createJob(cacheObjSubType[`job_get_vault`], objJobPriority, j.UserID64, 0, time.Now().UTC(), b)
p.Status = 1
p.VaultJobID64 = jobID64
b, err = json.Marshal(p)
logOnError(err, "jobCheckVaultLimit : Marshal(p)")
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobCheckVaultLimit : setJobPayloadJSON(p)")
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
return
}
b = getJobPayload(p.VaultJobID64)
err = json.Unmarshal(b, &p2)
logOnError(err, "jobCheckVaultLimit : Unmarshal(p2)")
err = setJobDone(j.ID64)
logOnError(err, "jobCheckVaultLimit : setJobDone")
return
}
func jobShops(j Job) {
var p JobPayloadShops
err := setJobStart(j.ID64)
logOnError(err, "jobShops : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobShops : Unmarshal payload")
l := make([]int64, 0)
for _, m := range p.Msgs {
rule, err := getMsgParsingRule(&m)
logOnError(err, "jobShopsSlave : getMsgParsingRule")
if rule.MsgTypeID64 == cacheObjSubType[`msg_shop_main_ack`] {
l = append(l, m.ObjID64)
}
}
args := make([]interface{}, len(l))
for i, id := range l {
args[i] = id
}
query := fmt.Sprintf(`SELECT omsm.obj_id id
FROM obj_msg_shop_main omsm
WHERE omsm.obj_id in (%d`+strings.Repeat(", %d", len(l)-1)+`)
ORDER BY COALESCE(omsm.guru, 'ZZZ') ASC
,omsm.open DESC
,omsm.mana DESC;`, args...)
ids := getSQLListID64(query)
guru := `ZZZ`
out := fmt.Sprintf("Shop summary:\n")
for _, id := range ids {
o, err := getObjMsg(id)
rule, err := getMsgParsingRule(o)
logOnError(err, "jobShopsSlave : getMsgParsingRule")
if rule.MsgTypeID64 == cacheObjSubType[`msg_shop_main_ack`] {
cwm, err := parseSubTypeMessageShopMainAck(o, rule.re)
logOnError(err, "jobShops : parseSubTypeMessageShopMainAck")
if cwm.Guru != guru {
guru = cwm.Guru
out = fmt.Sprintf("%s - %s\n", out, guru)
}
if cwm.Open {
out = fmt.Sprintf("%s ✔️", out)
} else {
out = fmt.Sprintf("%s ❌", out)
}
c := getObjCastle(cwm.CastleID64)
out = fmt.Sprintf("%s%s", out, c.Logo)
out = fmt.Sprintf("%s<a href=\"https://t.me/share/url?url=/ws_%s\">%s</a>", out, cwm.Link, cwm.User)
out = fmt.Sprintf("%s [%d : %d]", out, cwm.Mana, cwm.ManaTotal)
if cwm.Mana < cwm.ManaTotal {
d := time.Duration(((cwm.ManaTotal-cwm.Mana)*60)/((cwm.ManaTotal/250)+1)) * time.Second
out = fmt.Sprintf("%s %s\n", out, d.String())
} else {
out = fmt.Sprintf("%s\n", out)
}
}
}
for _, m := range p.Msgs {
clientDelTGMsg(m.TGUserID64, m.ID64, m.ChatID64)
}
c := TGCommand{
Type: commandReplyMsg,
Text: out,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
ParseMode: cmdParseModeHTML,
}
TGCmdQueue <- c
err = setJobDone(j.ID64)
logOnError(err, "jobShops : setJobDone")
return
}
func jobShopsSlave(j Job) {
var p JobPayloadShopsSlave
err := setJobStart(j.ID64)
logOnError(err, "jobShopsSlave : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobShopsSlave : Unmarshal payload")
if j.Trigger != 0 {
id, err := getObjSubTypeId(j.Trigger)
logOnError(err, "jobShopsSlave : getObjSubType("+strconv.FormatInt(j.Trigger, 10)+")")
if err == nil {
m, err := getObjMsg(j.Trigger)
logOnError(err, "jobShopsSlave : getObjMsg")
rule, err := getMsgParsingRule(m)
logOnError(err, "jobShopsSlave : getMsgParsingRule")
switch id {
case cacheObjSubType[`msg_shop_main_req`]:
muxObjJob.Lock()
b2 := getJobPayloadUnsafe(p.JobCallbackID64)
var p2 JobPayloadShops
err = json.Unmarshal(b2, &p2)
p2.Msgs = append(p2.Msgs, *m)
err = setJobPayloadJSONUnsafe(p.JobCallbackID64, p2)
logOnError(err, "jobShopsSlave : setJobPayloadJSONUnsafe")
muxObjJob.Unlock()
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC()) // reschedule so that it gets picked up by shop main ack callback
return
case cacheObjSubType[`msg_shop_main_ack`]:
cwm, err := parseSubTypeMessageShopMainAck(m, rule.re)
logOnError(err, "jobShopsSlave : parseSubTypeMessageShopMainAck")
muxObjJob.Lock()
b2 := getJobPayloadUnsafe(p.JobCallbackID64)
var p2 JobPayloadShops
err = json.Unmarshal(b2, &p2)
p2.Msgs = append(p2.Msgs, *m)
err = setJobPayloadJSONUnsafe(p.JobCallbackID64, p2)
muxObjJob.Unlock()
for i, link := range p.Shops {
if cwm.Link == link {
p.Shops = append(p.Shops[:i], p.Shops[i+1:]...)
break
}
}
err = setJobPayloadJSON(j.ID64, p)
logOnError(err, "jobShopsSlave : setJobPayloadJSON")
default:
}
}
}
if len(p.Shops) != 0 {
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC()) // reschedule so that it gets picked up by shop main ack callback
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_shop_main_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_shop_main_ack`])
clientSendCWMsgDelay(j.UserID64, fmt.Sprintf("/ws_%s", p.Shops[0]), 4*time.Second)
return
}
// we update the master status now that the slave job is done
muxObjJob.Lock()
b2 := getJobPayloadUnsafe(p.JobCallbackID64)
var p2 JobPayloadShops
err = json.Unmarshal(b2, &p2)
p2.Status += 1
err = setJobPayloadJSONUnsafe(p.JobCallbackID64, p2)
muxObjJob.Unlock()
// if last job to finish then we wake up the master
if p2.Status == p.Slaves {
rescheduleJob(p.JobCallbackID64, 0, time.Now().UTC())
}
err = setJobDone(j.ID64)
logOnError(err, "jobShopsSlave : setJobDone")
return
}
func jobGetVault(j Job) {
var (
p JobPayloadGetVault
reqTab map[int64]int64
doneTab map[int64]int64
)
items := []string{`item_res`, `item_alch`, `item_misc`, `item_recipe`, `item_part`, `item_other`}
reqTab = make(map[int64]int64)
for k, v := range items {
reqTab[cacheObjSubType[v]] = 1 << uint(k)
}
doneTab = make(map[int64]int64)
for k, v := range items {
doneTab[cacheObjSubType[v]] = 1 << (10 + uint(k))
}
err := setJobStart(j.ID64)
logOnError(err, "jobGetVault : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGetVault : Unmarshal payload")
if p.Status == 0 {
for _, typeID64 := range p.ItemTypeList {
p.Status = p.Status | reqTab[typeID64]
}
for _, v := range items {
if (p.Status & reqTab[cacheObjSubType[v]]) == 0 {
p.Status = p.Status | doneTab[cacheObjSubType[v]]
}
}
}
if j.Trigger != 0 {
id, err := getObjSubTypeId(j.Trigger)
logOnError(err, "jobGetVault : getObjSubType("+strconv.FormatInt(j.Trigger, 10)+")")
if err == nil {
m, err := getObjMsg(j.Trigger)
logOnError(err, "jobGetVault : getObjMsg")
rule, err := getMsgParsingRule(m)
logOnError(err, "jobGetVault : getMsgParsingRule")
switch id {
case cacheObjSubType[`msg_gstock_any_ack`]:
cwm, err := parseSubTypeMessageGStockAnyAck(m, rule.re)
logOnError(err, "jobGetVault : parseSubTypeMessageGStockAnyAck")
for _, disp := range cwm.Stock {
item := ChatWarsItems{
Code: disp.Code,
ItemID64: disp.ItemID64,
Name: disp.Name,
Quantity: disp.Quantity,
}
p.Vault = append(p.Vault, item)
}
p.CleanupMsg = append(p.CleanupMsg, *m)
case cacheObjSubType[`msg_gstock_oth_req`]:
fallthrough
case cacheObjSubType[`msg_gstock_res_req`]:
fallthrough
case cacheObjSubType[`msg_gstock_alch_req`]:
fallthrough
case cacheObjSubType[`msg_gstock_misc_req`]:
fallthrough
case cacheObjSubType[`msg_gstock_rec_req`]:
fallthrough
case cacheObjSubType[`msg_gstock_part_req`]:
p.CleanupMsg = append(p.CleanupMsg, *m)
setJobPayloadJSON(j.ID64, p)
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
return
default:
}
}
}
if (p.Status & reqTab[cacheObjSubType[`item_res`]]) == reqTab[cacheObjSubType[`item_res`]] {
p.Status = p.Status &^ reqTab[cacheObjSubType[`item_res`]]
p.Status = p.Status | doneTab[cacheObjSubType[`item_res`]]
setJobPayloadJSON(j.ID64, p)
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_res_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_any_ack`])
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
clientSendCWMsgDelay(j.UserID64, `/g_stock_res`, 4*time.Second)
return
} else if (p.Status & reqTab[cacheObjSubType[`item_alch`]]) == reqTab[cacheObjSubType[`item_alch`]] {
p.Status = p.Status &^ reqTab[cacheObjSubType[`item_alch`]]
p.Status = p.Status | doneTab[cacheObjSubType[`item_alch`]]
setJobPayloadJSON(j.ID64, p)
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_alch_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_any_ack`])
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
clientSendCWMsgDelay(j.UserID64, `/g_stock_alch`, 4*time.Second)
return
} else if (p.Status & reqTab[cacheObjSubType[`item_misc`]]) == reqTab[cacheObjSubType[`item_misc`]] {
p.Status = p.Status &^ reqTab[cacheObjSubType[`item_misc`]]
p.Status = p.Status | doneTab[cacheObjSubType[`item_misc`]]
setJobPayloadJSON(j.ID64, p)
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_misc_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_any_ack`])
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
clientSendCWMsgDelay(j.UserID64, `/g_stock_misc`, 4*time.Second)
return
} else if (p.Status & reqTab[cacheObjSubType[`item_recipe`]]) == reqTab[cacheObjSubType[`item_recipe`]] {
p.Status = p.Status &^ reqTab[cacheObjSubType[`item_recipe`]]
p.Status = p.Status | doneTab[cacheObjSubType[`item_recipe`]]
setJobPayloadJSON(j.ID64, p)
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_rec_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_any_ack`])
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
clientSendCWMsgDelay(j.UserID64, `/g_stock_rec`, 4*time.Second)
return
} else if (p.Status & reqTab[cacheObjSubType[`item_part`]]) == reqTab[cacheObjSubType[`item_part`]] {
p.Status = p.Status &^ reqTab[cacheObjSubType[`item_part`]]
p.Status = p.Status | doneTab[cacheObjSubType[`item_part`]]
setJobPayloadJSON(j.ID64, p)
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_part_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_any_ack`])
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
clientSendCWMsgDelay(j.UserID64, `/g_stock_parts`, 4*time.Second)
return
} else if (p.Status & reqTab[cacheObjSubType[`item_other`]]) == reqTab[cacheObjSubType[`item_other`]] {
p.Status = p.Status &^ reqTab[cacheObjSubType[`item_other`]]
p.Status = p.Status | doneTab[cacheObjSubType[`item_other`]]
setJobPayloadJSON(j.ID64, p)
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_oth_req`])
setJobCallback(j.ID64, j.UserID64, cacheObjSubType[`msg_gstock_any_ack`])
rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
clientSendCWMsgDelay(j.UserID64, `/g_stock_other`, 4*time.Second)
return
} else {
setJobPayloadJSON(j.ID64, p)
}
/* clean up the messages */
for _, d := range p.CleanupMsg {
clientDelTGMsg(j.UserID64, d.ID64, d.ChatID64)
}
/* wake up the callback */
err = rescheduleJob(p.JobCallbackID64, 0, time.Now().UTC())
logOnError(err, "jobGetVault : rescheduleJob")
/* no more req to send, all ack came through, we can finish the job */
err = setJobDone(j.ID64)
logOnError(err, "jobGetVault : setJobDone")
return
}