From d9bec92c80bde61bfc68098922462590a9b35318 Mon Sep 17 00:00:00 2001 From: shoopea Date: Mon, 27 May 2019 11:08:10 +0800 Subject: [PATCH] test --- job.go | 257 +++++++++++++++++++++++++++++++++++++++++++++++++++------ sql.go | 113 ------------------------- 2 files changed, 232 insertions(+), 138 deletions(-) diff --git a/job.go b/job.go index 4b45870..1e1ef17 100644 --- a/job.go +++ b/job.go @@ -6,6 +6,136 @@ import ( "time" ) +func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Time, payload []byte) (int64, error) { + stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id) + VALUES (? , ?);`) + logOnError(err, "createJob : prepare insert obj") + if err != nil { + return 0, err + } + defer stmt.Close() + + res, err := stmt.Exec(objTypeJob, jobTypeID) + s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID) + logOnError(err, s) + if err != nil { + return 0, err + } + + objId, err := res.LastInsertId() + logOnError(err, "createJob : get last insert Id") + if err != nil { + return 0, err + } + + stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, session_id, status, schedule, is_done, in_work, inserted, pulled, started, ended, payload) + VALUES (?, ?, ?, NULL, ?, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) + logOnError(err, "createJob : prepare insert obj_job") + if err != nil { + return 0, err + } + defer stmt.Close() + + _, err = stmt.Exec(objId, priority, userID64, objJobStatusNew, schedule, time.Now(), payload) + logOnError(err, "createJob : insert obj_job") + if err != nil { + return 0, err + } + + return objId, nil +} + +func setJobDone(jobId int64) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) + logOnError(err, "setJobDone : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(time.Now(), jobId) + s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func setJobStart(jobId int64) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?;`) + logOnError(err, "setJobStart : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(time.Now(), jobId) + s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func rescheduleJob(jobID64 int64, schedule time.Time) error { + stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ? WHERE j.obj_id = ?;`) + logOnError(err, "rescheduleJob : prepare update obj_job") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(time.Now(), jobId) + s := fmt.Sprintf("rescheduleJob, update obj_job(%d)", jobId) + logOnError(err, s) + if err != nil { + return err + } + return nil +} + +func loadCurrentJobs(sid int) ([]Job, error) { + var ( + objId int64 + jobTypeId int32 + status int32 + payload []byte + jobs []Job + ) + + _, err := db.Exec("UPDATE obj_job j SET session_id = ?, j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", sid, time.Now(), time.Now(), SQLJobSliceSize) + logOnError(err, "loadCurrentJobs : update intial rows") + + stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.session_id = ? ORDER BY j.priority ASC, j.obj_id ASC;") + logOnError(err, "loadCurrentJobs : prepare select statement") + + rows, err := stmt.Query(sid) + // rows, err := stmt.Query(time.Now()) + logOnError(err, "loadCurrentJobs : query select statement") + + for rows.Next() { + err = rows.Scan(&objId, &jobTypeId, &status, &payload) + logOnError(err, "loadCurrentJobs : scan query rows") + job := Job{ + ID64: objId, + JobTypeID: jobTypeId, + Status: status, + Payload: payload, + } + jobs = append(jobs, job) + } + err = rows.Err() + logOnError(err, "loadCurrentJobs : scan end rows") + rows.Close() + + err = stmt.Close() + logOnError(err, "loadCurrentJobs : close select statement") + + return jobs, nil +} + func jobRescan(j Job) { var r JobPayloadRescanMsg @@ -89,35 +219,112 @@ func jobSetDone(j Job) { } func jobPillage(j Job) { - /* - var r JobPayloadPillage + var r JobPayloadPillage - err := setJobStart(j.ID64) - logOnError(err, "jobPillage : setJobStart") + err := setJobStart(j.ID64) + logOnError(err, "jobPillage : setJobStart") - err = json.Unmarshal(j.Payload, &r) - logOnError(err, "jobPillage : Unmarshal payload") + err = json.Unmarshal(j.Payload, &r) + logOnError(err, "jobPillage : Unmarshal payload") - ids := getSQLListID64(` select og.id - from obj og - ,obj_msg omg - ,obj op - ,obj_msg omp - ,obj_job oj - where omg.obj_id = og.id - and og.obj_sub_type_id = ` + strconv.Itoa(objSubTypeMessageGo) ` - and omg.user_id = oj.user_id - and omg.sender_user_id = oj.user_id - and oj.obj_id = ` + strconv.Itoa(j.ID64) ` - and omp.obj_id = op.id - and op.id = ` + strconv.Itoa(r.ObjID64) ` - and omg.date between omp.date and addtime(omp.date, '0 0:3:0.000000');` + // check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job + ids := getSQLListID64(` select ox.id + from obj ox + ,obj_msg omx + ,obj op + ,obj_msg omp + ,obj_job oj + where oj.obj_id = ` + strconv.Itoa(j.ID64) + ` + and omx.user_id = oj.user_id + and omx.sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + ` + and omx.obj_id = ox.id + and ox.obj_sub_type_id in (` + strconv.Itoa(objSubTypeMessagePillageGo) + `, ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` + and op.id = ` + strconv.Itoa(r.ObjID64) + ` + and omp.obj_id = op.id + and omx.date between omp.date and addtime(omp.date, '0 0:3:30.000000');`) + if len(ids) > 1 { // issue there ? + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("More than one outcome for pillage #%d", r.ObjID64), + ToUserID64: m.UserID64, + } + TGCmdQueue <- s + } else if len(ids) == 1 { // we've got a match, job is done whether we prevented the pillage or not + m, err := getMsg(ids[0]) + logOnError(err, "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)") + if err == nil { + if m.Date.Add(60 * time.Second).After(time.Now()) { + msgTypeID, err := getObjSubTypeId(ids[0]) + logOnError(err, "jobPillage : getObjSubTypeId") + if err == nil { + if msgTypeID == objSubTypeMessagePillageGo { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We avoided a pillage (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: m.UserID64, + } + TGCmdQueue <- s + } else if msgTypeId == objSubTypeMessagePillageTimeout { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We got pillaged (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: m.UserID64, + } + TGCmdQueue <- s + } else { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We don't know what happened (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: m.UserID64, + } + TGCmdQueue <- s + } + } + } + } + err := setJobDone(j.ID64) + logOnError(err, "jobSetDone : setJobDone") + return + } + + //no outcome yet, have we sent a "/go" in the last 30 sec ? + ids = getSQLListID64(` select ox.id + from obj ox + ,obj_msg omx + ,obj_job oj + where oj.obj_id = ` + strconv.Itoa(j.ID64) + ` + and omx.user_id = oj.user_id + and omx.sender_user_id = oj.user_id + and omx.obj_id = ox.id + and ox.obj_sub_type_id =` + strconv.Itoa(objSubTypeMessageGo) + ` + and omx.date between addtime(oj.schedule, '-30') and oj.schedule;`) + + if len(ids) > 0 { // we did, so we reschedule the job to check the outcome and wait + m, err := getMsg(ids[0]) + logOnError(err, "jobPillage : getMsg(objSubTypeMessageGo)") + if err == nil { + s := TGCommand{ + Type: commandSendMsg, + Text: fmt.Sprintf("We started intercepting the pillage (%s)", m.Date.Format(time.RFC3339)), + ToUserID64: m.UserID64, + } + TGCmdQueue <- s + } + err = rescheduleJob(j.ID64, time.Now().Add(30*time.Second)) + logOnError(err, "jobPillage : rescheduleJob(objSubTypeMessageGo)") + } else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec + c := TGCommand{ + Type: commandSendMsg, + Text: "/go", + FromUserID64: m.Chat.ID, + ToChatID64: userID64ChtWrsBot, + } + MQTGCmdQueue <- c + err = rescheduleJob(j.ID64, time.Now().Add(30*time.Second)) + logOnError(err, "jobPillage : rescheduleJob") + + } - err = setJobStatus(j.ID64, objJobStatusPillageGo, time.Now().Add(5 * time.second)) - logOnError(err, "jobPillage : setJobDone") - */ - err := setJobDone(j.ID64) - logOnError(err, "jobSetDone : setJobDone") return } diff --git a/sql.go b/sql.go index b7fd3de..c75958d 100644 --- a/sql.go +++ b/sql.go @@ -1505,79 +1505,6 @@ func insertMsgPillageInc(m *ChatWarsMessagePillageInc) error { return nil } -func createJob(jobTypeID int32, priority int32, userID64 int64, schedule time.Time, payload []byte) (int64, error) { - stmt, err := db.Prepare(`INSERT INTO obj (obj_type_id, obj_sub_type_id) - VALUES (? , ?);`) - logOnError(err, "createJob : prepare insert obj") - if err != nil { - return 0, err - } - defer stmt.Close() - - res, err := stmt.Exec(objTypeJob, jobTypeID) - s := fmt.Sprintf("createJob, insert obj(%d, %d)", objTypeJob, jobTypeID) - logOnError(err, s) - if err != nil { - return 0, err - } - - objId, err := res.LastInsertId() - logOnError(err, "createJob : get last insert Id") - if err != nil { - return 0, err - } - - stmt, err = db.Prepare(`INSERT INTO obj_job (obj_id, priority, user_id, session_id, status, schedule, is_done, in_work, inserted, pulled, started, ended, payload) - VALUES (?, ?, ?, NULL, ?, ?, 0, 0, ?, NULL, NULL, NULL, ?);`) - logOnError(err, "createJob : prepare insert obj_job") - if err != nil { - return 0, err - } - defer stmt.Close() - - _, err = stmt.Exec(objId, priority, userID64, objJobStatusNew, schedule, time.Now(), payload) - logOnError(err, "createJob : insert obj_job") - if err != nil { - return 0, err - } - - return objId, nil -} - -func setJobDone(jobId int64) error { - stmt, err := db.Prepare(`UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?;`) - logOnError(err, "setJobDone : prepare update obj_job") - if err != nil { - return err - } - defer stmt.Close() - - _, err = stmt.Exec(time.Now(), jobId) - s := fmt.Sprintf("setJobDone, update obj_job(%d)", jobId) - logOnError(err, s) - if err != nil { - return err - } - return nil -} - -func setJobStart(jobId int64) error { - stmt, err := db.Prepare(`UPDATE obj_job j SET j.started = ? WHERE j.obj_id = ?;`) - logOnError(err, "setJobStart : prepare update obj_job") - if err != nil { - return err - } - defer stmt.Close() - - _, err = stmt.Exec(time.Now(), jobId) - s := fmt.Sprintf("setJobStart, update obj_job(%d)", jobId) - logOnError(err, s) - if err != nil { - return err - } - return nil -} - func loadMsgParsingRules() (m map[int]MessageParsingRule, err error) { var ( id int32 @@ -1624,46 +1551,6 @@ func loadMsgParsingRules() (m map[int]MessageParsingRule, err error) { return m, nil } -func loadCurrentJobs(sid int) ([]Job, error) { - var ( - objId int64 - jobTypeId int32 - status int32 - payload []byte - jobs []Job - ) - - _, err := db.Exec("UPDATE obj_job j SET session_id = ?, j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;", sid, time.Now(), time.Now(), SQLJobSliceSize) - logOnError(err, "loadCurrentJobs : update intial rows") - - stmt, err := db.Prepare("SELECT o.id, o.obj_sub_type_id, j.status, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.session_id = ? ORDER BY j.priority ASC, j.obj_id ASC;") - logOnError(err, "loadCurrentJobs : prepare select statement") - - rows, err := stmt.Query(sid) - // rows, err := stmt.Query(time.Now()) - logOnError(err, "loadCurrentJobs : query select statement") - - for rows.Next() { - err = rows.Scan(&objId, &jobTypeId, &status, &payload) - logOnError(err, "loadCurrentJobs : scan query rows") - job := Job{ - ID64: objId, - JobTypeID: jobTypeId, - Status: status, - Payload: payload, - } - jobs = append(jobs, job) - } - err = rows.Err() - logOnError(err, "loadCurrentJobs : scan end rows") - rows.Close() - - err = stmt.Close() - logOnError(err, "loadCurrentJobs : close select statement") - - return jobs, nil -} - func getSQLListID64(q string) []int64 { var ( id int64