2019-05-11 06:54:12 +02:00
package main
import (
2019-05-11 06:55:05 +02:00
"encoding/json"
2019-05-11 06:54:12 +02:00
"fmt"
2019-05-11 06:55:05 +02:00
"time"
2019-05-11 06:54:12 +02:00
)
2019-05-27 05:08:10 +02:00
func createJob ( jobTypeID int32 , priority int32 , userID64 int64 , schedule time . Time , payload [ ] byte ) ( int64 , error ) {
stmt , err := db . Prepare ( ` INSERT INTO obj ( obj_type_id , obj_sub_type_id )
VALUES ( ? , ? ) ; ` )
logOnError ( err , "createJob : prepare insert obj" )
if err != nil {
return 0 , err
}
defer stmt . Close ( )
res , err := stmt . Exec ( objTypeJob , jobTypeID )
s := fmt . Sprintf ( "createJob, insert obj(%d, %d)" , objTypeJob , jobTypeID )
logOnError ( err , s )
if err != nil {
return 0 , err
}
objId , err := res . LastInsertId ( )
logOnError ( err , "createJob : get last insert Id" )
if err != nil {
return 0 , err
}
stmt , err = db . Prepare ( ` INSERT INTO obj_job ( obj_id , priority , user_id , session_id , status , schedule , is_done , in_work , inserted , pulled , started , ended , payload )
VALUES ( ? , ? , ? , NULL , ? , ? , 0 , 0 , ? , NULL , NULL , NULL , ? ) ; ` )
logOnError ( err , "createJob : prepare insert obj_job" )
if err != nil {
return 0 , err
}
defer stmt . Close ( )
_ , err = stmt . Exec ( objId , priority , userID64 , objJobStatusNew , schedule , time . Now ( ) , payload )
logOnError ( err , "createJob : insert obj_job" )
if err != nil {
return 0 , err
}
return objId , nil
}
func setJobDone ( jobId int64 ) error {
stmt , err := db . Prepare ( ` UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?; ` )
logOnError ( err , "setJobDone : prepare update obj_job" )
if err != nil {
return err
}
defer stmt . Close ( )
_ , err = stmt . Exec ( time . Now ( ) , jobId )
s := fmt . Sprintf ( "setJobDone, update obj_job(%d)" , jobId )
logOnError ( err , s )
if err != nil {
return err
}
return nil
}
func setJobStart ( jobId int64 ) error {
stmt , err := db . Prepare ( ` UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?; ` )
logOnError ( err , "setJobStart : prepare update obj_job" )
if err != nil {
return err
}
defer stmt . Close ( )
_ , err = stmt . Exec ( time . Now ( ) , jobId )
s := fmt . Sprintf ( "setJobStart, update obj_job(%d)" , jobId )
logOnError ( err , s )
if err != nil {
return err
}
return nil
}
func rescheduleJob ( jobID64 int64 , schedule time . Time ) error {
stmt , err := db . Prepare ( ` UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ? WHERE j.obj_id = ?; ` )
logOnError ( err , "rescheduleJob : prepare update obj_job" )
if err != nil {
return err
}
defer stmt . Close ( )
_ , err = stmt . Exec ( time . Now ( ) , jobId )
s := fmt . Sprintf ( "rescheduleJob, update obj_job(%d)" , jobId )
logOnError ( err , s )
if err != nil {
return err
}
return nil
}
func loadCurrentJobs ( sid int ) ( [ ] Job , error ) {
var (
objId int64
jobTypeId int32
status int32
payload [ ] byte
jobs [ ] Job
)
_ , err := db . Exec ( "UPDATE obj_job j SET session_id = ?, j.pulled = ?, j.in_work = 1 WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;" , sid , time . Now ( ) , time . Now ( ) , SQLJobSliceSize )
logOnError ( err , "loadCurrentJobs : update intial rows" )
stmt , err := db . Prepare ( "SELECT o.id, o.obj_sub_type_id, j.status, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.session_id = ? ORDER BY j.priority ASC, j.obj_id ASC;" )
logOnError ( err , "loadCurrentJobs : prepare select statement" )
rows , err := stmt . Query ( sid )
// rows, err := stmt.Query(time.Now())
logOnError ( err , "loadCurrentJobs : query select statement" )
for rows . Next ( ) {
err = rows . Scan ( & objId , & jobTypeId , & status , & payload )
logOnError ( err , "loadCurrentJobs : scan query rows" )
job := Job {
ID64 : objId ,
JobTypeID : jobTypeId ,
Status : status ,
Payload : payload ,
}
jobs = append ( jobs , job )
}
err = rows . Err ( )
logOnError ( err , "loadCurrentJobs : scan end rows" )
rows . Close ( )
err = stmt . Close ( )
logOnError ( err , "loadCurrentJobs : close select statement" )
return jobs , nil
}
2019-05-11 06:54:49 +02:00
func jobRescan ( j Job ) {
2019-05-11 07:06:40 +02:00
var r JobPayloadRescanMsg
2019-05-11 06:54:12 +02:00
err := setJobStart ( j . ID64 )
logOnError ( err , "jobRescan : setJobStart" )
2019-05-11 07:06:40 +02:00
2019-05-11 06:56:48 +02:00
err = json . Unmarshal ( j . Payload , & r )
2019-05-11 07:06:40 +02:00
logOnError ( err , "jobRescan : Unmarshal payload" )
2019-05-19 05:24:45 +02:00
start := time . Now ( )
2019-05-11 06:54:12 +02:00
ids := getSQLListID64 ( r . Query )
2019-05-11 07:06:40 +02:00
2019-05-11 06:54:12 +02:00
if len ( ids ) > 1 {
2019-05-19 05:20:01 +02:00
for _ , id := range ids {
SQLMsgIdentifyQueue <- id
2019-05-11 06:54:12 +02:00
}
2019-05-19 05:20:01 +02:00
p := JobPayloadSetDone {
JobID64 : j . ID64 ,
MsgID64 : r . MsgID64 ,
ChatID64 : r . ChatID64 ,
2019-05-19 05:24:45 +02:00
Text : fmt . Sprintf ( "%d messages processed in %s." , len ( ids ) , time . Since ( start ) ) ,
2019-05-11 06:54:12 +02:00
}
2019-05-19 05:20:01 +02:00
b , _ := json . Marshal ( p )
2019-05-26 15:46:54 +02:00
_ , err := createJob ( objSubTypeJobSetJobDone , objJobPriorityRescanAllMsg , j . UserID64 , time . Now ( ) , b )
2019-05-19 05:20:01 +02:00
logOnError ( err , "jobRescan : createJob(objSubTypeJobSetJobDone)" )
2019-05-11 06:54:12 +02:00
} else if len ( ids ) == 1 {
SQLMsgIdentifyQueue <- ids [ 0 ]
err = setJobDone ( j . ID64 )
2019-05-11 07:06:40 +02:00
logOnError ( err , "jobRescan : setJobDone(1)" )
2019-05-16 05:06:38 +02:00
if r . MsgID64 != 0 || r . ChatID64 != 0 {
m := TGCommand {
Type : commandReplyMsg ,
Text : "One message processed." ,
FromMsgID64 : r . MsgID64 ,
FromChatID64 : r . ChatID64 ,
}
TGCmdQueue <- m
2019-05-16 04:49:34 +02:00
}
2019-05-11 07:06:40 +02:00
} else {
err = setJobDone ( j . ID64 )
logOnError ( err , "jobRescan : setJobDone(0)" )
2019-05-16 05:06:38 +02:00
if r . MsgID64 != 0 || r . ChatID64 != 0 {
m := TGCommand {
Type : commandReplyMsg ,
2019-05-16 05:11:56 +02:00
Text : "No message processed." ,
2019-05-16 05:06:38 +02:00
FromMsgID64 : r . MsgID64 ,
FromChatID64 : r . ChatID64 ,
}
TGCmdQueue <- m
2019-05-16 04:49:34 +02:00
}
2019-05-11 06:54:12 +02:00
}
return
}
2019-05-11 06:54:49 +02:00
func jobSetDone ( j Job ) {
2019-05-11 07:06:40 +02:00
var r JobPayloadSetDone
2019-05-11 06:54:12 +02:00
err := setJobStart ( j . ID64 )
logOnError ( err , "jobSetDone : setJobStart" )
2019-05-11 07:06:40 +02:00
2019-05-11 06:56:48 +02:00
err = json . Unmarshal ( j . Payload , & r )
2019-05-11 07:06:40 +02:00
logOnError ( err , "jobSetDone : Unmarshal payload" )
2019-05-11 06:54:12 +02:00
err = setJobDone ( r . JobID64 )
logOnError ( err , "jobSetDone : setJobDone(child)" )
2019-05-11 07:06:40 +02:00
2019-05-11 06:54:12 +02:00
err = setJobDone ( j . ID64 )
logOnError ( err , "jobSetDone : setJobDone" )
2019-05-11 07:06:40 +02:00
2019-05-16 04:52:30 +02:00
m := TGCommand {
Type : commandReplyMsg ,
Text : r . Text ,
FromMsgID64 : r . MsgID64 ,
FromChatID64 : r . ChatID64 ,
}
TGCmdQueue <- m
2019-05-11 06:54:12 +02:00
return
}
2019-05-16 14:39:12 +02:00
func jobPillage ( j Job ) {
2019-05-27 05:08:10 +02:00
var r JobPayloadPillage
err := setJobStart ( j . ID64 )
logOnError ( err , "jobPillage : setJobStart" )
err = json . Unmarshal ( j . Payload , & r )
logOnError ( err , "jobPillage : Unmarshal payload" )
// check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job
ids := getSQLListID64 ( ` select ox . id
from obj ox
, obj_msg omx
, obj op
, obj_msg omp
, obj_job oj
where oj . obj_id = ` + strconv.Itoa(j.ID64) + `
and omx . user_id = oj . user_id
and omx . sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + `
and omx . obj_id = ox . id
and ox . obj_sub_type_id in ( ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` , ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + `
and op . id = ` + strconv.Itoa(r.ObjID64) + `
and omp . obj_id = op . id
and omx . date between omp . date and addtime ( omp . date , ' 0 0 : 3 : 30.000000 ' ) ; ` )
if len ( ids ) > 1 { // issue there ?
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "More than one outcome for pillage #%d" , r . ObjID64 ) ,
ToUserID64 : m . UserID64 ,
}
TGCmdQueue <- s
} else if len ( ids ) == 1 { // we've got a match, job is done whether we prevented the pillage or not
m , err := getMsg ( ids [ 0 ] )
logOnError ( err , "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)" )
if err == nil {
if m . Date . Add ( 60 * time . Second ) . After ( time . Now ( ) ) {
msgTypeID , err := getObjSubTypeId ( ids [ 0 ] )
logOnError ( err , "jobPillage : getObjSubTypeId" )
if err == nil {
if msgTypeID == objSubTypeMessagePillageGo {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We avoided a pillage (%s)" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : m . UserID64 ,
}
TGCmdQueue <- s
} else if msgTypeId == objSubTypeMessagePillageTimeout {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We got pillaged (%s)" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : m . UserID64 ,
}
TGCmdQueue <- s
} else {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We don't know what happened (%s)" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : m . UserID64 ,
}
TGCmdQueue <- s
}
}
}
}
err := setJobDone ( j . ID64 )
logOnError ( err , "jobSetDone : setJobDone" )
return
}
//no outcome yet, have we sent a "/go" in the last 30 sec ?
ids = getSQLListID64 ( ` select ox . id
from obj ox
, obj_msg omx
, obj_job oj
where oj . obj_id = ` + strconv.Itoa(j.ID64) + `
and omx . user_id = oj . user_id
and omx . sender_user_id = oj . user_id
and omx . obj_id = ox . id
and ox . obj_sub_type_id = ` + strconv.Itoa(objSubTypeMessageGo) + `
and omx . date between addtime ( oj . schedule , ' - 30 ' ) and oj . schedule ; ` )
if len ( ids ) > 0 { // we did, so we reschedule the job to check the outcome and wait
m , err := getMsg ( ids [ 0 ] )
logOnError ( err , "jobPillage : getMsg(objSubTypeMessageGo)" )
if err == nil {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We started intercepting the pillage (%s)" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : m . UserID64 ,
}
TGCmdQueue <- s
}
err = rescheduleJob ( j . ID64 , time . Now ( ) . Add ( 30 * time . Second ) )
logOnError ( err , "jobPillage : rescheduleJob(objSubTypeMessageGo)" )
} else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec
c := TGCommand {
Type : commandSendMsg ,
Text : "/go" ,
FromUserID64 : m . Chat . ID ,
ToChatID64 : userID64ChtWrsBot ,
}
MQTGCmdQueue <- c
err = rescheduleJob ( j . ID64 , time . Now ( ) . Add ( 30 * time . Second ) )
logOnError ( err , "jobPillage : rescheduleJob" )
}
2019-05-16 14:39:12 +02:00
return
}
2019-05-26 15:06:12 +02:00
func jobMsgClient ( j Job ) {
var c TGCommand
err := setJobStart ( j . ID64 )
logOnError ( err , "jobMsgClient : setJobStart" )
err = json . Unmarshal ( j . Payload , & c )
logOnError ( err , "jobMsgClient : Unmarshal payload" )
MQTGCmdQueue <- c
2019-05-26 15:07:28 +02:00
err = setJobDone ( j . ID64 )
2019-05-26 15:06:12 +02:00
logOnError ( err , "joMsgClient : setJobDone" )
return
}