chirpnest/job.go
2019-08-30 15:23:56 +08:00

893 lines
26 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
"time"
tb "gopkg.in/tucnak/telebot.v2"
)
func createJob(jobTypeID int32, priority int32, userID64 int64, trigger int64, schedule time.Time, payload []byte) (int64, error) {
stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id)
VALUES (? , ?);`)
logOnError(err, "createJob : prepare insert obj")
if err != nil {
return 0, err
}
defer stmt.Close()
res, err := stmt.Exec(objTypeJob, jobTypeID)
s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID)
logOnError(err, s)
if err != nil {
return 0, err
}
objId, err := res.LastInsertId()
logOnError(err, "createJob : get last insert Id")
if err != nil {
return 0, err
}
stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, trigger_id, seq_nr, schedule, is_done, in_work, inserted, timeout, pulled, started, ended, payload)
VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, ?, NULL, NULL, NULL, ?);`)
logOnError(err, "createJob : prepare insert obj_job")
if err != nil {
return 0, err
}
defer stmt.Close()
_, err = stmt.Exec(objId, priority, userID64, trigger, schedule.UTC(), time.Now().UTC(), time.Unix(maxUnixTimestamp, 0).UTC(), payload)
logOnError(err, "createJob : insert obj_job")
if err != nil {
return 0, err
}
return objId, nil
}
func createJobCallback(jobTypeID int32, userID64 int64, msgTypeID64 int64, payload []byte, timeout time.Duration) error {
//t, err := time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00")
jobID64, err := createJob(jobTypeID, objJobPriority, userID64, 0, time.Unix(maxUnixTimestamp, 0).UTC(), payload)
if err != nil {
return err
}
setJobCallback(jobID64, userID64, msgTypeID64)
err = setJobTimeout(jobID64, timeout)
logOnError(err, "createJobCallback : setJobTimeout")
return nil
}
func setJobCallback(jobID64 int64, userID64 int64, msgTypeID64 int64) {
muxCallbacks.Lock()
if _, ok := callbacks[userID64]; !ok {
callbacks[userID64] = make(map[int64][]int64)
}
s := callbacks[userID64][msgTypeID64]
s = append(s, jobID64)
callbacks[userID64][msgTypeID64] = s
muxCallbacks.Unlock()
}
func setJobTimeout(jobID64 int64, d time.Duration) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.timeout = ? WHERE j.obj_id = ?;`)
logOnError(err, "setJobTimeout : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(time.Now().UTC().Add(d), jobID64)
logOnError(err, fmt.Sprintf("setJobTimeout, update obj_job(%d)", jobID64))
if err != nil {
return err
}
return nil
}
func setJobDone(jobID64 int64) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`)
logOnError(err, "setJobDone : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(time.Now().UTC(), jobID64)
s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobID64)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func setJobStart(jobId int64) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`)
logOnError(err, "setJobStart : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(time.Now().UTC(), jobId)
s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func rescheduleJob(jobID64 int64, trigger int64, schedule time.Time) error {
stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ?, j.trigger_id = ? WHERE j.obj_id = ?;`)
logOnError(err, "rescheduleJob : prepare update obj_job")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(schedule.UTC(), trigger, jobID64)
s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobID64)
logOnError(err, s)
if err != nil {
return err
}
return nil
}
func loadCurrentJobs() ([]Job, error) {
var (
objId int64
jobTypeId int32
userID64 int64
trigger int64
timeout time.Time
payload []byte
jobs []Job
)
t := time.Now().UTC()
r := RndInt64()
_, err := db.Exec("UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", t, r, t, SQLJobSliceSize)
logOnError(err, "loadCurrentJobs : update intial rows")
if err != nil {
return jobs, err
}
stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.payload, j.timeout FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;")
logOnError(err, "loadCurrentJobs : prepare select statement")
if err != nil {
stmt.Close()
return jobs, err
}
rows, err := stmt.Query(r)
// rows, err := stmt.Query(time.Now())
logOnError(err, "loadCurrentJobs : query select statement")
if err != nil {
stmt.Close()
return jobs, err
}
for rows.Next() {
err = rows.Scan(&objId, &jobTypeId, &trigger, &userID64, &payload, &timeout)
logOnError(err, "loadCurrentJobs : scan query rows")
job := Job{
ID64: objId,
JobTypeID: jobTypeId,
Trigger: trigger,
UserID64: userID64,
Payload: payload,
Timeout: timeout,
}
jobs = append(jobs, job)
}
err = rows.Err()
logOnError(err, "loadCurrentJobs : scan end rows")
rows.Close()
if err != nil {
stmt.Close()
return jobs, err
}
err = stmt.Close()
logOnError(err, "loadCurrentJobs : close select statement")
if err != nil {
return jobs, err
}
return jobs, nil
}
func jobRescan(j Job) {
var p JobPayloadRescanMsg
err := setJobStart(j.ID64)
logOnError(err, "jobRescan : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobRescan : Unmarshal payload")
start := time.Now()
milestone := time.Now()
ids := getSQLListID64(p.Query)
if len(ids) > 1 {
txt := fmt.Sprintf("Rescanning %d messages.", len(ids))
m := TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
i := 0
for _, id := range ids {
SQLMsgIdentifyQueue <- id
i = i + 1
if time.Now().After(milestone.Add(1 * time.Minute)) {
//txt := fmt.Sprintf("Rescanned %d/%d messages.", i, len(ids))
m = TGCommand{
Type: commandReplyMsg,
Text: fmt.Sprintf("Rescanned %d/%d messages.", i, len(ids)),
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
milestone = time.Now()
}
}
r := JobPayloadSetDone{
JobID64: j.ID64,
MsgID64: p.MsgID64,
ChatID64: p.ChatID64,
Text: fmt.Sprintf("%d messages processed in %s.", len(ids), time.Since(start)),
}
b, _ := json.Marshal(r)
_, err := createJob(objSubTypeJobSetJobDone, objJobPriorityRescanAllMsg, j.UserID64, j.ID64, time.Now().UTC(), b)
logOnError(err, "jobRescan : createJob(objSubTypeJobSetJobDone)")
} else if len(ids) == 1 {
SQLMsgIdentifyQueue <- ids[0]
err = setJobDone(j.ID64)
logOnError(err, "jobRescan : setJobDone(1)")
if p.MsgID64 != 0 || p.ChatID64 != 0 {
m := TGCommand{
Type: commandReplyMsg,
Text: "One message processed.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
} else {
err = setJobDone(j.ID64)
logOnError(err, "jobRescan : setJobDone(0)")
if p.MsgID64 != 0 || p.ChatID64 != 0 {
m := TGCommand{
Type: commandReplyMsg,
Text: "No message processed.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
}
return
}
func jobSetDone(j Job) {
var r JobPayloadSetDone
err := setJobStart(j.ID64)
logOnError(err, "jobSetDone : setJobStart")
err = json.Unmarshal(j.Payload, &r)
logOnError(err, "jobSetDone : Unmarshal payload")
err = setJobDone(r.JobID64)
logOnError(err, "jobSetDone : setJobDone(child)")
err = setJobDone(j.ID64)
logOnError(err, "jobSetDone : setJobDone")
m := TGCommand{
Type: commandReplyMsg,
Text: r.Text,
FromMsgID64: r.MsgID64,
FromChatID64: r.ChatID64,
}
TGCmdQueue <- m
return
}
func jobPillage(j Job) {
var r JobPayloadPillage
err := setJobStart(j.ID64)
logOnError(err, "jobPillage : setJobStart")
err = json.Unmarshal(j.Payload, &r)
logOnError(err, "jobPillage : Unmarshal payload")
// check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job
ids := getSQLListID64(` select ox.id
from obj ox
,obj_msg omx
,obj op
,obj_msg omp
,obj_job oj
where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + `
and omx.user_id = oj.user_id
and omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + `
and omx.obj_id = ox.id
and ox.obj_sub_type_id in (` + strconv.Itoa(objSubTypeMessagePillageGo) +
`, ` + strconv.Itoa(objSubTypeMessagePillageTimeout) +
`, ` + strconv.Itoa(objSubTypeMessagePillageLoss) +
`, ` + strconv.Itoa(objSubTypeMessagePillageWin) + `)
and op.id = ` + strconv.FormatInt(r.ObjID64, 10) + `
and omp.obj_id = op.id
and omx.date between omp.date and addtime(omp.date, '0 0:3:30.000000')
order by case ox.obj_sub_type_id when ` + strconv.Itoa(objSubTypeMessagePillageWin) + ` then 0
when ` + strconv.Itoa(objSubTypeMessagePillageLoss) + ` then 1
when ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` then 2
when ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` then 3
else 4 end asc
limit 1;`)
if len(ids) > 1 { // issue there ?
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not
m, err := getObjMsg(ids[0])
logOnError(err, "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)")
if err == nil {
if m.Date.Add(60 * time.Second).After(time.Now().UTC()) {
msgTypeID64, err := getObjSubTypeId(ids[0])
logOnError(err, "jobPillage : getObjSubTypeId")
if err == nil {
if msgTypeID64 == objSubTypeMessagePillageGo {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if msgTypeID64 == objSubTypeMessagePillageWin {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We avoided a pillage (%s))", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if msgTypeID64 == objSubTypeMessagePillageLoss {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else if msgTypeID64 == objSubTypeMessagePillageTimeout {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
} else {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
}
}
}
}
err = setJobDone(j.ID64)
logOnError(err, "jobSetDone : setJobDone")
return
}
// is the job outdated now ?
if time.Now().UTC().After(r.Date.Add(time.Minute*3 + time.Second*30)) {
// log.Printf("jobPillage :\n\tPillageTime : %s\n\tNowTime : %s\n", r.Date.Format(time.RFC3339), time.Now().UTC().Format(time.RFC3339))
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("Pillage interception expired"),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
return
}
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("No outcome for the pillage yet"),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
//no outcome yet, have we sent a "/go" in the last 30 sec ?
ids = getSQLListID64(` select ox.id
from obj ox
,obj_msg omx
,obj_job oj
where oj.obj_id = ` + strconv.FormatInt(j.ID64, 10) + `
and omx.user_id = oj.user_id
and omx.sender_user_id = oj.user_id
and omx.obj_id = ox.id
and ox.obj_sub_type_id =` + strconv.Itoa(objSubTypeMessageGo) + `
and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`)
if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait
m, err := getObjMsg(ids[0])
logOnError(err, "jobPillage : getMsg(objSubTypeMessageGo)")
if err == nil {
s := TGCommand{
Type: commandSendMsg,
Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)),
ToUserID64: j.UserID64,
}
TGCmdQueue <- s
}
err = rescheduleJob(j.ID64, j.Trigger, time.Now().Add(30*time.Second).UTC())
logOnError(err, "jobPillage : rescheduleJob(objSubTypeMessageGo)")
} else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec
clientSendCWMsg(j.UserID64, "/go")
err = rescheduleJob(j.ID64, j.Trigger, time.Now().Add(30*time.Second).UTC())
logOnError(err, "jobPillage : rescheduleJob")
}
return
}
func jobMsgRefresh(j Job) {
var p JobPayloadMsgRefresh
// identify whether the message has been properly refreshed ? create new job ? reschedule same job ?
err := setJobStart(j.ID64)
logOnError(err, "jobMsgRefresh : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobMsgRefresh : Unmarshal payload")
m, err := getObjMsg(p.ObjID64)
if err != nil && strings.Compare(err.Error(), `sql: no rows in result set`) == 0 {
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
} else {
logOnError(err, "jobMsgRefresh : getObjMsg")
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
}
err = delObj(p.ObjID64)
logOnError(err, "jobMsgRefresh : delObj")
clientRefreshCWMsg(m.TGUserID64, m.ChatID64, m.ID64)
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
}
func jobMsgClient(j Job) {
var p JobPayloadMsgClient
err := setJobStart(j.ID64)
logOnError(err, "jobMsgClient : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobMsgClient : Unmarshal payload")
if err == nil {
clientSendCWMsg(j.UserID64, p.Text)
m := TGCommand{
Type: commandReplyMsg,
Text: "Message sent.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
err = setJobDone(j.ID64)
logOnError(err, "joMsgClient : setJobDone")
return
}
func jobBackupExport(j Job) {
var p JobPayloadBackupExport
err := setJobStart(j.ID64)
logOnError(err, "jobBackupExport : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobBackupExport : Unmarshal payload")
bkp := DataBackup{}
start := time.Now()
milestone := time.Now()
s := new([]ChatWarsMessage)
msgs := *s
ids := getSQLListID64(`SELECT om.obj_id id FROM obj_msg om;`)
txt := fmt.Sprintf("Backing up %d messages.", len(ids))
m := TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
i := 0
for _, id := range ids {
msg, err := getObjMsg(id)
logOnError(err, "jobBackupExport : getMsg")
if err == nil {
msgs = append(msgs, *msg)
}
i = i + 1
if time.Now().After(milestone.Add(1 * time.Minute)) {
txt := fmt.Sprintf("Exported %d/%d messages.", i, len(ids))
m = TGCommand{
Type: commandReplyMsg,
Text: txt,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
milestone = time.Now()
}
}
bkp.Messages = msgs
b, err := json.Marshal(bkp)
logOnError(err, "jobBackupExport : Marshal")
m = TGCommand{
Type: commandReplyMsg,
Text: `Compressing archive`,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
zbuf := new(bytes.Buffer)
zw := zip.NewWriter(zbuf)
zf, err := zw.Create(`backup.json`)
logOnError(err, "jobBackupExport : Create")
_, err = zf.Write(b)
logOnError(err, "jobBackupExport : Write")
err = zw.Close()
logOnError(err, "jobBackupExport : Close")
d := tb.Document{}
d.File = tb.FromReader(bytes.NewReader(zbuf.Bytes()))
d.FileName = fmt.Sprintf("%s.backup.zip", start.Format("20060102150405"))
d.Caption = d.FileName
d.MIME = `application/zip`
m = TGCommand{
Type: commandReplyMsg,
Text: `Export done.`,
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
m = TGCommand{
Type: commandSendDocument,
Document: d,
ToChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupExport : setJobDone")
return
}
func jobBackupImport(j Job) {
var p JobPayloadBackupImport
err := setJobStart(j.ID64)
logOnError(err, "jobBackupImport : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobBackupImport : Unmarshal payload")
resp, err := http.Get(p.URL)
logOnError(err, "jobBackupImport : Get")
defer resp.Body.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
m := TGCommand{
Type: commandReplyMsg,
Text: "File downloaded.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
z := buf.Bytes()
r := bytes.NewReader(z)
zr, err := zip.NewReader(r, int64(len(z)))
for _, f := range zr.File {
if strings.Compare(f.Name, "backup.json") == 0 {
rc, err := f.Open()
logOnError(err, "jobBackupImport : Open")
if err != nil {
return
}
data, err := ioutil.ReadAll(rc)
logOnError(err, "jobBackupImport : ReadAll")
if err != nil {
return
}
log.Printf("jobBackupImport : %d uncompressed bytes.\n", len(data))
rc.Close()
bkp := DataBackup{}
err = json.Unmarshal(data, &bkp)
logOnError(err, "jobBackupImport : Unmarshal")
if err != nil {
return
}
for _, msg := range bkp.Messages {
MQCWMsgQueue <- msg
}
m := TGCommand{
Type: commandReplyMsg,
Text: "Backup restored.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupImport : setJobDone")
return
}
}
m = TGCommand{
Type: commandReplyMsg,
Text: "Not backup file found in archive.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
err = setJobDone(j.ID64)
logOnError(err, "jobBackupImport : setJobDone")
return
}
func jobGStock(j Job) {
var p JobPayloadGStock
err := setJobStart(j.ID64)
logOnError(err, "jobGStock : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGStock : Unmarshal payload")
/*
clientSendCWMsg(m.Chat.ID, "/g_stock_res")
clientSendCWMsg(m.Chat.ID, "/g_stock_alch")
clientSendCWMsg(m.Chat.ID, "/g_stock_misc")
clientSendCWMsg(m.Chat.ID, "/g_stock_rec")
clientSendCWMsg(m.Chat.ID, "/g_stock_parts")
clientSendCWMsg(m.Chat.ID, "/g_stock_other")
*/
if err == nil {
m := TGCommand{
Type: commandReplyMsg,
Text: "Message sent.",
FromMsgID64: p.MsgID64,
FromChatID64: p.ChatID64,
}
TGCmdQueue <- m
}
err = setJobDone(j.ID64)
logOnError(err, "jobGStock : setJobDone")
return
}
func jobGDepositForward(j Job) {
var p JobPayloadGDepositForward
err := setJobStart(j.ID64)
logOnError(err, "jobGDepositForward : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGDepositForward : Unmarshal payload")
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGDepositForward : getObjMsg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGDepositForward : getMsgParsingRule")
cwm, err := parseSubTypeMessageGDepositReq(msg, rule.re)
if cwm.ItemID64 == p.ItemID64 && cwm.Quantity == p.Quantity {
log.Printf("jobGDepositForward : match (%d / %d).\n", cwm.ItemID64, cwm.Quantity)
gDepositForwardMux.Lock()
gDepositForwardMsg = append(gDepositForwardMsg, j.Trigger)
gDepositForwardMux.Unlock()
err = setJobDone(j.ID64)
logOnError(err, "jobGDeposit : setJobDone")
} else {
log.Printf("jobGDepositForward : found (%d / %d), expected (%d / %d).\n", cwm.ItemID64, cwm.Quantity, p.ItemID64, p.Quantity)
err = rescheduleJob(j.ID64, 0, time.Unix(maxUnixTimestamp, 0).UTC())
logOnError(err, "jobGDepositForward : rescheduleJob")
setJobCallback(j.ID64, j.UserID64, objSubTypeMessageGDepositReq)
}
return
}
func jobGDeposit(j Job) {
var p JobPayloadGDeposit
err := setJobStart(j.ID64)
logOnError(err, "jobGDeposit : setJobStart")
err = json.Unmarshal(j.Payload, &p)
logOnError(err, "jobGDeposit : Unmarshal payload")
if p.Status == 0 { /* handle remaining resources to be stored */
var res, misc, alch, craft, equip bool = false, false, false, false, false
var delay time.Duration = 0 * time.Second
var b []byte
if len(p.ResObjID64) > 0 {
for i := range p.ResObjID64 {
obj, err := getObjItem(p.ResObjID64[i])
logOnError(err, "jobGDeposit : getObjItem")
if err == nil {
switch obj.ItemTypeID {
case objSubTypeItemResource:
res = true
case objSubTypeItemAlch:
alch = true
case objSubTypeItemMisc:
misc = true
case objSubTypeItemRecipe:
craft = true
case objSubTypeItemPart:
craft = true
case objSubTypeItemOther:
equip = true
case objSubTypeItemUnique:
equip = true
default:
}
}
}
}
if res {
clientSendCWMsgDelay(p.ChatID64, `📦Resources`, delay)
p.Status = objSubTypeMessageStockAck
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageStockAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if alch {
clientSendCWMsgDelay(p.ChatID64, `Alchemy`, delay)
p.Status = 1 // FIXME UPDATE WITH PROPER TYPE
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if misc {
clientSendCWMsgDelay(p.ChatID64, `🗃Misc`, delay)
p.Status = 1 // FIXME UPDATE WITH PROPER TYPE
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if craft {
clientSendCWMsgDelay(p.ChatID64, `⚒Crafting`, delay)
p.Status = objSubTypeMessageStockAnyAck
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageStockAnyAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
if equip {
clientSendCWMsgDelay(p.ChatID64, `🏷Equipment`, delay)
p.Status = 1 // FIXME UPDATE WITH PROPER TYPE
b, _ = json.Marshal(&p)
err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 1*time.Minute)
logOnError(err, "jobGDeposit : createJobCallback")
delay = delay + 2*time.Second
}
return
} else if p.Status == 1 { /* handle that one resource from the objSubTypeMessageOrderbookAck msg */
log.Printf("jobGDeposit : 1 : %d.\n", j.Trigger)
} else if p.Status == objSubTypeMessageStockAck {
//log.Printf("jobGDeposit : objSubTypeMessageStockAck : %d.\n", j.Trigger)
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGDeposit : getObjMsg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGDeposit : getMsgParsingRule")
cwm, err := parseSubTypeMessageStockAck(msg, rule.re)
for stockIdx := range cwm.Stock {
for resIdx := range p.ResObjID64 {
if cwm.Stock[stockIdx].ItemID64 == p.ResObjID64[resIdx] {
//log.Printf("jobGDeposit : objSubTypeMessageStockAck : Matching ItemID %d (%d).\n", p.ResObjID64[resIdx], cwm.Stock[stockIdx].Quantity)
item, _ := getObjItem(p.ResObjID64[resIdx])
clientSendCWMsg(p.ChatID64, fmt.Sprintf("/g_deposit %s %d", item.Code, cwm.Stock[stockIdx].Quantity))
p2 := JobPayloadGDepositForward{
ItemID64: p.ResObjID64[resIdx],
Quantity: cwm.Stock[stockIdx].Quantity,
}
b2, _ := json.Marshal(p2)
err = createJobCallback(objSubTypeJobGDepositForward, j.UserID64, objSubTypeMessageGDepositReq, b2, time.Duration(1*time.Minute))
}
}
}
} else if p.Status == objSubTypeMessageStockAnyAck {
log.Printf("jobGDeposit : objSubTypeMessageStockAnyAck : %d.\n", j.Trigger)
msg, err := getObjMsg(j.Trigger)
logOnError(err, "jobGDeposit : getObjMsg")
rule, err := getMsgParsingRule(msg)
logOnError(err, "jobGDeposit : getMsgParsingRule")
cwm, err := parseSubTypeMessageStockAnyAck(msg, rule.re)
for stockIdx := range cwm.Stock {
for resIdx := range p.ResObjID64 {
if cwm.Stock[stockIdx].ItemID64 == p.ResObjID64[resIdx] {
log.Printf("jobGDeposit : objSubTypeMessageStockAnyAck : Matching ItemID %d (%d).\n", p.ResObjID64[resIdx], cwm.Stock[stockIdx].Quantity)
item, _ := getObjItem(p.ResObjID64[resIdx])
clientSendCWMsg(p.ChatID64, fmt.Sprintf("/g_deposit %s %d", item.Code, cwm.Stock[stockIdx].Quantity))
p2 := JobPayloadGDepositForward{
ItemID64: p.ResObjID64[resIdx],
Quantity: cwm.Stock[stockIdx].Quantity,
}
b2, _ := json.Marshal(p2)
err = createJobCallback(objSubTypeJobGDepositForward, j.UserID64, objSubTypeMessageGDepositReq, b2, time.Duration(1*time.Minute))
}
}
}
}
err = setJobDone(j.ID64)
logOnError(err, "jobGDeposit : setJobDone")
return
}