diff --git a/bot.go b/bot.go index d83a47d..d14187c 100644 --- a/bot.go +++ b/bot.go @@ -638,10 +638,7 @@ func botGDepositAll(m *tb.Message) { Status: 0, } - p.ResObjID64 = append(p.ResObjID64, getObjItemID(`02`, `Stick`)) - p.ResObjID64 = append(p.ResObjID64, getObjItemID(`04`, `Bone`)) p.ResObjID64 = append(p.ResObjID64, getObjItemID(`09`, `Cloth`)) - p.ResObjID64 = append(p.ResObjID64, getObjItemID(`21`, `Bone powder`)) p.ResObjID64 = append(p.ResObjID64, getObjItemID(`39`, `Stinky Sumac`)) p.ResObjID64 = append(p.ResObjID64, getObjItemID(`s01`, `📕Scroll of Rage`)) p.ResObjID64 = append(p.ResObjID64, getObjItemID(`r01`, `Champion Sword recipe`)) diff --git a/def.go b/def.go index 9a22a2a..7c5757f 100644 --- a/def.go +++ b/def.go @@ -9,6 +9,8 @@ import ( tb "gopkg.in/tucnak/telebot.v2" ) +const maxUnixTimestamp int64 = 253402300799 + type DataBackup struct { Messages []ChatWarsMessage `json:"messages"` } @@ -231,6 +233,7 @@ type Job struct { ID64 int64 JobTypeID int32 Trigger int64 + Timeout time.Time UserID64 int64 Payload []byte } diff --git a/job.go b/job.go index 17323e7..40ee030 100644 --- a/job.go +++ b/job.go @@ -37,8 +37,8 @@ func createJob(jobTypeID int32, priority int32, userID64 int64, trigger int64, s return 0, err } - stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, trigger_id, seq_nr, schedule, is_done, in_work, inserted, pulled, started, ended, payload) - VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) + stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, trigger_id, seq_nr, schedule, is_done, in_work, inserted, timeout, pulled, started, ended, payload) + VALUES (?, ?, ?, ?, NULL, ?, 0, 0, ?, NULL, NULL, NULL, NULL, ?);`) logOnError(err, "createJob : prepare insert obj_job") if err != nil { return 0, err @@ -54,9 +54,9 @@ func createJob(jobTypeID int32, priority int32, userID64 int64, trigger int64, s return objId, nil } -func createJobCallback(jobTypeID int32, userID64 int64, msgTypeID64 int64, payload []byte) error { - t, err := time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00") - jobID64, err := createJob(jobTypeID, objJobPriority, userID64, 0, t, payload) +func createJobCallback(jobTypeID int32, userID64 int64, msgTypeID64 int64, payload []byte, timeout time.Duration) error { + //t, err := time.Parse(time.RFC3339, "9999-12-31T00:00:00+00:00") + jobID64, err := createJob(jobTypeID, objJobPriority, userID64, 0, time.Unix(maxUnixTimestamp, 0), payload) if err != nil { return err } @@ -75,6 +75,24 @@ func setJobCallback(jobID64 int64, userID64 int64, msgTypeID64 int64) { muxCallbacks.Unlock() } +func setJobTimeout(jobID64 int64, d time.Duration) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.timeout = ? WHERE j.obj_id = ?;`) + logOnError(err, "setJobTimeout : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(time.Now().UTC().Add(d), jobId) + s := fmt.Sprintf("setJobTimeout, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + + return nil +} + func setJobDone(jobId int64) error { stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) logOnError(err, "setJobDone : prepare update obj_job") @@ -132,6 +150,7 @@ func loadCurrentJobs() ([]Job, error) { jobTypeId int32 userID64 int64 trigger int64 + timeout time.Time payload []byte jobs []Job ) @@ -145,7 +164,7 @@ func loadCurrentJobs() ([]Job, error) { return jobs, err } - stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") + stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.trigger_id, j.user_id, j.payload, j.timeout FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;") logOnError(err, "loadCurrentJobs : prepare select statement") if err != nil { stmt.Close() @@ -161,7 +180,7 @@ func loadCurrentJobs() ([]Job, error) { } for rows.Next() { - err = rows.Scan(&objId, &jobTypeId, &trigger, &userID64, &payload) + err = rows.Scan(&objId, &jobTypeId, &trigger, &userID64, &payload, &timeout) logOnError(err, "loadCurrentJobs : scan query rows") job := Job{ ID64: objId, @@ -169,6 +188,7 @@ func loadCurrentJobs() ([]Job, error) { Trigger: trigger, UserID64: userID64, Payload: payload, + Timeout: timeout, } jobs = append(jobs, job) } @@ -742,55 +762,47 @@ func jobGDeposit(j Job) { } log.Printf("jobGDeposit : Sending messages.\n") var delay int64 = 0 + var b []string + if res || alch || misc || craft || equip { + p.Status = 1 + b, _ = json.Marshal(&p) + } if res { log.Printf("jobGDeposit : Sending res.\n") clientSendCWMsgDelay(p.ChatID64, `📦Resources`, delay) + err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 10*time.Second) + logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2 } if alch { log.Printf("jobGDeposit : Sending alch.\n") clientSendCWMsgDelay(p.ChatID64, `⚗️Alchemy`, delay) + err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 10*time.Second) + logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2 } if misc { log.Printf("jobGDeposit : Sending misc.\n") clientSendCWMsgDelay(p.ChatID64, `🗃Misc`, delay) + err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 10*time.Second) + logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2 } if craft { log.Printf("jobGDeposit : Sending craft.\n") clientSendCWMsgDelay(p.ChatID64, `⚒Crafting`, delay) + err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 10*time.Second) + logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2 } if equip { log.Printf("jobGDeposit : Sending equip.\n") clientSendCWMsgDelay(p.ChatID64, `🏷Equipment`, delay) + err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b, 10*time.Second) + logOnError(err, "jobGDeposit : createJobCallback") delay = delay + 2 } - /* - - p2 := JobPayloadGDeposit{ - MsgID64: p.MsgID64, - ChatID64: p.ChatID64, - ResObjID64: nil, - Status: 1, - } - p2.ResObjID64 = append(p2.ResObjID64, obj.ObjID64) - b2, _ := json.Marshal(&p2) - err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b2) - clientSendCWMsg(p.ChatID64, fmt.Sprintf("/t_%s", obj.Code)) - logOnError(err, "jobGDeposit : createJobCallback") - } - - p.ResObjID64 = p.ResObjID64[1:] - } - if len(p.ResObjID64) > 0 { - b, _ := json.Marshal(&p) - err = createJobCallback(objSubTypeJobGDeposit, j.UserID64, objSubTypeMessageOrderbookAck, b) - logOnError(err, "jobGDeposit : createJobCallback") - } - */ return } else if p.Status == 1 { /* handle that one resource from the objSubTypeMessageOrderbookAck msg */ diff --git a/sql.go b/sql.go index b8aadcb..2e0afe5 100644 --- a/sql.go +++ b/sql.go @@ -362,6 +362,7 @@ func initDB() { ,pulled TIMESTAMP ,started TIMESTAMP ,ended TIMESTAMP + ,timeout TIMESTAMP ,payload VARCHAR(4000) ,FOREIGN KEY (obj_id) REFERENCES obj(id) ON DELETE CASCADE ,KEY (is_done) diff --git a/workers.go b/workers.go index 6942579..89c9c7a 100644 --- a/workers.go +++ b/workers.go @@ -511,6 +511,7 @@ func SQLJobWorker() { func JobWorker(id int, jobs <-chan Job) { //log.Printf("jobWorker[" + strconv.Itoa(id) + "] : Starting.") + // FIXME : discard timed out jobs for j := range jobs { //log.Printf("JobWorker[%d] : Starting job %d.\n", id, j.ID64) switch j.JobTypeID {