2019-05-11 06:54:12 +02:00
package main
import (
2019-06-11 17:39:04 +02:00
"archive/zip"
"bytes"
2019-05-11 06:55:05 +02:00
"encoding/json"
2019-05-11 06:54:12 +02:00
"fmt"
2019-06-11 17:39:04 +02:00
"io/ioutil"
2019-06-11 17:53:29 +02:00
"log"
2019-06-11 17:40:24 +02:00
"net/http"
2019-05-27 05:11:16 +02:00
"strconv"
2019-06-11 16:56:48 +02:00
"strings"
2019-05-11 06:55:05 +02:00
"time"
2019-06-11 17:55:31 +02:00
tb "gopkg.in/tucnak/telebot.v2"
2019-05-11 06:54:12 +02:00
)
2019-06-11 16:56:48 +02:00
func createJob ( jobTypeID int32 , priority int32 , userID64 int64 , schedule time . Time , payload [ ] byte ) ( int64 , error ) {
stmt , err := db . Prepare ( ` INSERT INTO obj ( obj_type_id , obj_sub_type_id )
VALUES ( ? , ? ) ; ` )
logOnError ( err , "createJob : prepare insert obj" )
if err != nil {
return 0 , err
}
defer stmt . Close ( )
2019-05-27 05:08:10 +02:00
2019-06-11 16:56:48 +02:00
res , err := stmt . Exec ( objTypeJob , jobTypeID )
s := fmt . Sprintf ( "createJob, insert obj(%d, %d)" , objTypeJob , jobTypeID )
logOnError ( err , s )
if err != nil {
return 0 , err
}
2019-05-27 05:08:10 +02:00
2019-06-11 16:56:48 +02:00
objId , err := res . LastInsertId ( )
logOnError ( err , "createJob : get last insert Id" )
if err != nil {
return 0 , err
}
2019-05-27 05:08:10 +02:00
2019-06-11 16:56:48 +02:00
stmt , err = db . Prepare ( ` INSERT INTO obj_job ( obj_id , priority , user_id , status , seq_nr , schedule , is_done , in_work , inserted , pulled , started , ended , payload )
VALUES ( ? , ? , ? , ? , NULL , ? , 0 , 0 , ? , NULL , NULL , NULL , ? ) ; ` )
logOnError ( err , "createJob : prepare insert obj_job" )
if err != nil {
return 0 , err
}
defer stmt . Close ( )
2019-05-27 05:08:10 +02:00
2019-06-11 16:56:48 +02:00
_ , err = stmt . Exec ( objId , priority , userID64 , objJobStatusNew , schedule . UTC ( ) , time . Now ( ) . UTC ( ) , payload )
logOnError ( err , "createJob : insert obj_job" )
if err != nil {
return 0 , err
}
2019-05-27 05:08:10 +02:00
2019-06-11 16:56:48 +02:00
return objId , nil
2019-06-11 16:50:01 +02:00
}
2019-05-27 05:08:10 +02:00
2019-06-11 16:56:48 +02:00
func setJobDone ( jobId int64 ) error {
stmt , err := db . Prepare ( ` UPDATE obj_job j SET j.is_done = 1, j.in_work = 0, j.ended = ? WHERE j.obj_id = ?; ` )
logOnError ( err , "setJobDone : prepare update obj_job" )
if err != nil {
return err
2019-05-27 05:08:10 +02:00
}
2019-06-11 16:56:48 +02:00
defer stmt . Close ( )
2019-05-27 05:08:10 +02:00
2019-06-11 16:56:48 +02:00
_ , err = stmt . Exec ( time . Now ( ) . UTC ( ) , jobId )
s := fmt . Sprintf ( "setJobDone, update obj_job(%d)" , jobId )
logOnError ( err , s )
if err != nil {
return err
}
return nil
2019-06-11 16:50:01 +02:00
}
2019-05-27 05:08:10 +02:00
2019-06-11 16:56:48 +02:00
func setJobStart ( jobId int64 ) error {
stmt , err := db . Prepare ( ` UPDATE obj_job j SET j.started = coalesce(j.started, ?) WHERE j.obj_id = ?; ` )
logOnError ( err , "setJobStart : prepare update obj_job" )
if err != nil {
return err
}
defer stmt . Close ( )
2019-05-27 05:08:10 +02:00
2019-06-11 16:56:48 +02:00
_ , err = stmt . Exec ( time . Now ( ) . UTC ( ) , jobId )
s := fmt . Sprintf ( "setJobStart, update obj_job(%d)" , jobId )
logOnError ( err , s )
if err != nil {
return err
}
return nil
2019-05-27 05:08:10 +02:00
}
2019-06-11 16:56:48 +02:00
func rescheduleJob ( jobID64 int64 , status int32 , schedule time . Time ) error {
stmt , err := db . Prepare ( ` UPDATE obj_job j SET j.is_done = 0, j.in_work = 0, j.schedule = ?, j.status = ? WHERE j.obj_id = ?; ` )
logOnError ( err , "rescheduleJob : prepare update obj_job" )
if err != nil {
return err
2019-06-11 16:50:01 +02:00
}
2019-06-11 16:56:48 +02:00
defer stmt . Close ( )
_ , err = stmt . Exec ( schedule . UTC ( ) , status , jobID64 )
s := fmt . Sprintf ( "rescheduleJob, update obj_job(%d)" , jobID64 )
logOnError ( err , s )
if err != nil {
return err
2019-06-11 16:50:01 +02:00
}
2019-06-11 16:56:48 +02:00
return nil
2019-06-11 16:50:01 +02:00
}
2019-05-11 07:06:40 +02:00
2019-06-11 16:56:48 +02:00
func loadCurrentJobs ( ) ( [ ] Job , error ) {
var (
objId int64
jobTypeId int32
userID64 int64
status int32
payload [ ] byte
jobs [ ] Job
)
t := time . Now ( ) . UTC ( )
r := RndInt64 ( )
_ , err := db . Exec ( "UPDATE obj_job j SET j.pulled = ?, j.in_work = 1, j.seq_nr = ? WHERE j.is_done = 0 AND j.in_work = 0 AND j.schedule <= ? ORDER BY j.priority ASC, j.obj_id ASC LIMIT ?;" , t , r , t , SQLJobSliceSize )
logOnError ( err , "loadCurrentJobs : update intial rows" )
2019-08-01 05:49:05 +02:00
if err != nil {
return jobs , err
}
2019-06-11 16:56:48 +02:00
stmt , err := db . Prepare ( "SELECT o.id, o.obj_sub_type_id, j.status, j.user_id, j.payload FROM obj_job j, obj o WHERE j.obj_id = o.id AND j.is_done = 0 AND j.in_work = 1 AND j.seq_nr = ? ORDER BY j.priority ASC, j.obj_id ASC;" )
logOnError ( err , "loadCurrentJobs : prepare select statement" )
2019-08-01 05:49:05 +02:00
if err != nil {
stmt . Close ( )
return jobs , err
}
2019-06-11 16:56:48 +02:00
rows , err := stmt . Query ( r )
// rows, err := stmt.Query(time.Now())
logOnError ( err , "loadCurrentJobs : query select statement" )
2019-08-01 05:49:05 +02:00
if err != nil {
stmt . Close ( )
return jobs , err
}
2019-06-11 16:56:48 +02:00
for rows . Next ( ) {
err = rows . Scan ( & objId , & jobTypeId , & status , & userID64 , & payload )
logOnError ( err , "loadCurrentJobs : scan query rows" )
job := Job {
ID64 : objId ,
JobTypeID : jobTypeId ,
Status : status ,
UserID64 : userID64 ,
Payload : payload ,
}
jobs = append ( jobs , job )
2019-06-11 16:50:01 +02:00
}
2019-06-11 16:56:48 +02:00
err = rows . Err ( )
logOnError ( err , "loadCurrentJobs : scan end rows" )
rows . Close ( )
2019-08-01 05:49:05 +02:00
if err != nil {
stmt . Close ( )
return jobs , err
}
2019-05-11 07:06:40 +02:00
2019-06-11 16:56:48 +02:00
err = stmt . Close ( )
logOnError ( err , "loadCurrentJobs : close select statement" )
2019-08-01 05:49:05 +02:00
if err != nil {
return jobs , err
}
2019-06-11 16:56:48 +02:00
return jobs , nil
2019-06-11 16:50:01 +02:00
}
2019-05-11 07:06:40 +02:00
2019-06-11 16:56:48 +02:00
func jobRescan ( j Job ) {
2019-06-14 06:18:26 +02:00
var p JobPayloadRescanMsg
2019-05-19 05:24:45 +02:00
2019-06-11 16:56:48 +02:00
err := setJobStart ( j . ID64 )
logOnError ( err , "jobRescan : setJobStart" )
2019-05-11 07:06:40 +02:00
2019-06-14 06:18:26 +02:00
err = json . Unmarshal ( j . Payload , & p )
2019-06-11 16:56:48 +02:00
logOnError ( err , "jobRescan : Unmarshal payload" )
start := time . Now ( )
2019-06-14 06:15:45 +02:00
milestone := time . Now ( )
2019-06-11 16:50:01 +02:00
2019-06-14 06:19:28 +02:00
ids := getSQLListID64 ( p . Query )
2019-06-11 16:56:48 +02:00
if len ( ids ) > 1 {
2019-06-14 06:15:45 +02:00
txt := fmt . Sprintf ( "Rescanning %d messages." , len ( ids ) )
m := TGCommand {
Type : commandReplyMsg ,
Text : txt ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
}
TGCmdQueue <- m
i := 0
2019-06-11 16:56:48 +02:00
for _ , id := range ids {
SQLMsgIdentifyQueue <- id
2019-06-14 06:15:45 +02:00
i = i + 1
if time . Now ( ) . After ( milestone . Add ( 1 * time . Minute ) ) {
2019-06-14 06:18:26 +02:00
//txt := fmt.Sprintf("Rescanned %d/%d messages.", i, len(ids))
2019-06-14 06:15:45 +02:00
m = TGCommand {
Type : commandReplyMsg ,
2019-06-14 06:18:26 +02:00
Text : fmt . Sprintf ( "Rescanned %d/%d messages." , i , len ( ids ) ) ,
2019-06-14 06:15:45 +02:00
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
}
TGCmdQueue <- m
milestone = time . Now ( )
}
2019-06-11 16:56:48 +02:00
}
2019-06-14 06:15:45 +02:00
2019-06-14 06:18:26 +02:00
r := JobPayloadSetDone {
2019-06-11 16:56:48 +02:00
JobID64 : j . ID64 ,
2019-06-14 06:18:26 +02:00
MsgID64 : p . MsgID64 ,
ChatID64 : p . ChatID64 ,
2019-06-11 16:56:48 +02:00
Text : fmt . Sprintf ( "%d messages processed in %s." , len ( ids ) , time . Since ( start ) ) ,
2019-05-11 06:54:12 +02:00
}
2019-06-14 06:18:26 +02:00
b , _ := json . Marshal ( r )
2019-06-11 16:56:48 +02:00
_ , err := createJob ( objSubTypeJobSetJobDone , objJobPriorityRescanAllMsg , j . UserID64 , time . Now ( ) . UTC ( ) , b )
logOnError ( err , "jobRescan : createJob(objSubTypeJobSetJobDone)" )
} else if len ( ids ) == 1 {
SQLMsgIdentifyQueue <- ids [ 0 ]
err = setJobDone ( j . ID64 )
logOnError ( err , "jobRescan : setJobDone(1)" )
2019-06-14 06:18:52 +02:00
if p . MsgID64 != 0 || p . ChatID64 != 0 {
2019-06-11 16:56:48 +02:00
m := TGCommand {
2019-05-16 05:06:38 +02:00
Type : commandReplyMsg ,
2019-06-11 16:56:48 +02:00
Text : "One message processed." ,
2019-06-14 06:18:52 +02:00
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
2019-05-16 05:06:38 +02:00
}
2019-06-11 16:56:48 +02:00
TGCmdQueue <- m
}
} else {
err = setJobDone ( j . ID64 )
logOnError ( err , "jobRescan : setJobDone(0)" )
2019-06-14 06:19:28 +02:00
if p . MsgID64 != 0 || p . ChatID64 != 0 {
2019-06-11 16:56:48 +02:00
m := TGCommand {
2019-05-16 05:06:38 +02:00
Type : commandReplyMsg ,
2019-06-11 16:56:48 +02:00
Text : "No message processed." ,
2019-06-14 06:18:52 +02:00
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
2019-05-16 05:06:38 +02:00
}
2019-06-11 16:56:48 +02:00
TGCmdQueue <- m
2019-05-16 04:49:34 +02:00
}
2019-05-11 06:54:12 +02:00
}
return
}
2019-06-11 16:56:48 +02:00
func jobSetDone ( j Job ) {
var r JobPayloadSetDone
2019-05-11 07:06:40 +02:00
2019-06-11 16:56:48 +02:00
err := setJobStart ( j . ID64 )
logOnError ( err , "jobSetDone : setJobStart" )
2019-05-11 07:06:40 +02:00
2019-06-11 16:56:48 +02:00
err = json . Unmarshal ( j . Payload , & r )
logOnError ( err , "jobSetDone : Unmarshal payload" )
2019-05-11 07:06:40 +02:00
2019-06-11 16:56:48 +02:00
err = setJobDone ( r . JobID64 )
logOnError ( err , "jobSetDone : setJobDone(child)" )
2019-05-11 07:06:40 +02:00
2019-06-11 16:56:48 +02:00
err = setJobDone ( j . ID64 )
logOnError ( err , "jobSetDone : setJobDone" )
2019-06-11 16:50:01 +02:00
2019-06-11 16:56:48 +02:00
m := TGCommand {
Type : commandReplyMsg ,
Text : r . Text ,
FromMsgID64 : r . MsgID64 ,
FromChatID64 : r . ChatID64 ,
2019-06-11 16:50:01 +02:00
}
2019-06-11 16:56:48 +02:00
TGCmdQueue <- m
2019-05-16 04:52:30 +02:00
2019-05-11 06:54:12 +02:00
return
}
2019-05-16 14:39:12 +02:00
2019-06-11 16:56:48 +02:00
func jobPillage ( j Job ) {
var r JobPayloadPillage
err := setJobStart ( j . ID64 )
logOnError ( err , "jobPillage : setJobStart" )
err = json . Unmarshal ( j . Payload , & r )
logOnError ( err , "jobPillage : Unmarshal payload" )
// check if we have a acknoledgment of go or a timeout within 3m30 of the PillageInc from the Job
ids := getSQLListID64 ( ` select ox . id
from obj ox
, obj_msg omx
, obj op
, obj_msg omp
, obj_job oj
where oj . obj_id = ` + strconv.FormatInt(j.ID64, 10) + `
and omx . user_id = oj . user_id
and omx . sender_user_id = ` + strconv.Itoa(userID64ChtWrsBot) + `
and omx . obj_id = ox . id
and ox . obj_sub_type_id in ( ` + strconv . Itoa ( objSubTypeMessagePillageGo ) +
` , ` + strconv . Itoa ( objSubTypeMessagePillageTimeout ) +
` , ` + strconv . Itoa ( objSubTypeMessagePillageLoss ) +
` , ` + strconv . Itoa ( objSubTypeMessagePillageWin ) + ` )
and op . id = ` + strconv.FormatInt(r.ObjID64, 10) + `
and omp . obj_id = op . id
and omx . date between omp . date and addtime ( omp . date , ' 0 0 : 3 : 30.000000 ' )
order by case ox . obj_sub_type_id when ` + strconv.Itoa(objSubTypeMessagePillageWin) + ` then 0
when ` + strconv.Itoa(objSubTypeMessagePillageLoss) + ` then 1
when ` + strconv.Itoa(objSubTypeMessagePillageTimeout) + ` then 2
when ` + strconv.Itoa(objSubTypeMessagePillageGo) + ` then 3
else 4 end asc
limit 1 ; ` )
if len ( ids ) > 1 { // issue there ?
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "More than one outcome for pillage #%d" , r . ObjID64 ) ,
ToUserID64 : j . UserID64 ,
}
TGCmdQueue <- s
} else if len ( ids ) == 1 { // we've got a match, job is done whether we prevented the pillage or not
m , err := getObjMsg ( ids [ 0 ] )
logOnError ( err , "jobPillage : getMsg(objSubTypeMessagePillageGo, objSubTypeMessagePillageTimeout)" )
if err == nil {
if m . Date . Add ( 60 * time . Second ) . After ( time . Now ( ) . UTC ( ) ) {
msgTypeID64 , err := getObjSubTypeId ( ids [ 0 ] )
logOnError ( err , "jobPillage : getObjSubTypeId" )
if err == nil {
if msgTypeID64 == objSubTypeMessagePillageGo {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We avoided a pillage (%s)" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : j . UserID64 ,
}
TGCmdQueue <- s
} else if msgTypeID64 == objSubTypeMessagePillageWin {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We avoided a pillage (%s))" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : j . UserID64 ,
}
TGCmdQueue <- s
} else if msgTypeID64 == objSubTypeMessagePillageLoss {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We got pillaged (%s)" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : j . UserID64 ,
}
TGCmdQueue <- s
} else if msgTypeID64 == objSubTypeMessagePillageTimeout {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We got pillaged (%s)" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : j . UserID64 ,
}
TGCmdQueue <- s
} else {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We don't know what happened (%s)" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : j . UserID64 ,
}
TGCmdQueue <- s
}
}
}
2019-05-27 05:08:10 +02:00
}
2019-06-11 16:56:48 +02:00
err = setJobDone ( j . ID64 )
logOnError ( err , "jobSetDone : setJobDone" )
2019-05-27 05:08:10 +02:00
return
}
2019-07-31 04:06:05 +02:00
// is the job outdated now ?
2019-08-01 12:37:11 +02:00
if time . Now ( ) . UTC ( ) . After ( r . Date . Add ( time . Minute * 3 + time . Second * 30 ) ) {
2019-08-04 16:33:23 +02:00
// log.Printf("jobPillage :\n\tPillageTime : %s\n\tNowTime : %s\n", r.Date.Format(time.RFC3339), time.Now().UTC().Format(time.RFC3339))
2019-07-31 04:06:05 +02:00
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "Pillage interception expired" ) ,
ToUserID64 : j . UserID64 ,
}
TGCmdQueue <- s
return
}
2019-06-11 16:56:48 +02:00
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "No outcome for the pillage yet" ) ,
ToUserID64 : j . UserID64 ,
}
TGCmdQueue <- s
//no outcome yet, have we sent a "/go" in the last 30 sec ?
ids = getSQLListID64 ( ` select ox . id
from obj ox
, obj_msg omx
, obj_job oj
where oj . obj_id = ` + strconv.FormatInt(j.ID64, 10) + `
and omx . user_id = oj . user_id
and omx . sender_user_id = oj . user_id
and omx . obj_id = ox . id
and ox . obj_sub_type_id = ` + strconv.Itoa(objSubTypeMessageGo) + `
and omx . date between addtime ( oj . schedule , ' - 30 ' ) and oj . schedule ; ` )
if len ( ids ) > 0 { // we did, so we reschedule the job to check the outcome and wait
m , err := getObjMsg ( ids [ 0 ] )
logOnError ( err , "jobPillage : getMsg(objSubTypeMessageGo)" )
if err == nil {
s := TGCommand {
Type : commandSendMsg ,
Text : fmt . Sprintf ( "We started intercepting the pillage (%s)" , m . Date . Format ( time . RFC3339 ) ) ,
ToUserID64 : j . UserID64 ,
}
TGCmdQueue <- s
2019-05-27 05:08:10 +02:00
}
2019-06-11 16:56:48 +02:00
err = rescheduleJob ( j . ID64 , j . Status + 1 , time . Now ( ) . Add ( 30 * time . Second ) . UTC ( ) )
logOnError ( err , "jobPillage : rescheduleJob(objSubTypeMessageGo)" )
} else { //no /go in the last 30 sec so we go ahead, send one and reschedule to check again in 25sec
clientSendCWMsg ( j . UserID64 , "/go" )
err = rescheduleJob ( j . ID64 , j . Status + 1 , time . Now ( ) . Add ( 30 * time . Second ) . UTC ( ) )
logOnError ( err , "jobPillage : rescheduleJob" )
2019-05-27 05:08:10 +02:00
}
2019-05-16 14:39:12 +02:00
return
}
2019-05-26 15:06:12 +02:00
2019-06-11 16:56:48 +02:00
func jobMsgRefresh ( j Job ) {
var p JobPayloadMsgRefresh
2019-06-10 06:03:47 +02:00
2019-06-11 16:56:48 +02:00
// identify whether the message has been properly refreshed ? create new job ? reschedule same job ?
err := setJobStart ( j . ID64 )
logOnError ( err , "jobMsgRefresh : setJobStart" )
err = json . Unmarshal ( j . Payload , & p )
logOnError ( err , "jobMsgRefresh : Unmarshal payload" )
m , err := getObjMsg ( p . ObjID64 )
if err != nil && strings . Compare ( err . Error ( ) , ` sql: no rows in result set ` ) == 0 {
err = setJobDone ( j . ID64 )
logOnError ( err , "joMsgClient : setJobDone" )
return
2019-06-11 09:44:28 +02:00
} else {
2019-06-11 16:56:48 +02:00
logOnError ( err , "jobMsgRefresh : getObjMsg" )
err = setJobDone ( j . ID64 )
logOnError ( err , "joMsgClient : setJobDone" )
2019-06-11 09:44:28 +02:00
return
}
2019-06-10 06:03:47 +02:00
2019-06-11 16:56:48 +02:00
err = delObj ( p . ObjID64 )
logOnError ( err , "jobMsgRefresh : delObj" )
2019-06-10 06:03:47 +02:00
2019-07-31 10:57:45 +02:00
clientRefreshCWMsg ( m . TGUserID64 , m . ChatID64 , m . ID64 )
2019-06-10 06:03:47 +02:00
2019-06-11 16:56:48 +02:00
err = setJobDone ( j . ID64 )
logOnError ( err , "joMsgClient : setJobDone" )
2019-06-10 06:03:47 +02:00
return
}
2019-06-11 16:56:48 +02:00
func jobMsgClient ( j Job ) {
var p JobPayloadMsgClient
err := setJobStart ( j . ID64 )
logOnError ( err , "jobMsgClient : setJobStart" )
2019-05-26 15:06:12 +02:00
2019-06-11 16:56:48 +02:00
err = json . Unmarshal ( j . Payload , & p )
logOnError ( err , "jobMsgClient : Unmarshal payload" )
if err == nil {
clientSendCWMsg ( j . UserID64 , p . Text )
m := TGCommand {
2019-06-03 03:01:18 +02:00
Type : commandReplyMsg ,
2019-06-11 16:56:48 +02:00
Text : "Message sent." ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
2019-06-03 03:01:18 +02:00
}
2019-06-11 16:56:48 +02:00
TGCmdQueue <- m
2019-05-30 06:12:01 +02:00
}
2019-05-26 15:06:12 +02:00
2019-06-11 16:56:48 +02:00
err = setJobDone ( j . ID64 )
logOnError ( err , "joMsgClient : setJobDone" )
2019-05-26 15:06:12 +02:00
return
}
2019-06-11 03:59:20 +02:00
2019-06-11 16:56:48 +02:00
func jobBackupExport ( j Job ) {
var p JobPayloadBackupExport
err := setJobStart ( j . ID64 )
logOnError ( err , "jobBackupExport : setJobStart" )
2019-06-11 03:59:20 +02:00
2019-06-11 16:56:48 +02:00
err = json . Unmarshal ( j . Payload , & p )
logOnError ( err , "jobBackupExport : Unmarshal payload" )
bkp := DataBackup { }
start := time . Now ( )
milestone := time . Now ( )
s := new ( [ ] ChatWarsMessage )
msgs := * s
ids := getSQLListID64 ( ` SELECT om.obj_id id FROM obj_msg om; ` )
txt := fmt . Sprintf ( "Backing up %d messages." , len ( ids ) )
m := TGCommand {
Type : commandReplyMsg ,
Text : txt ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
2019-06-11 03:59:20 +02:00
}
2019-06-11 16:56:48 +02:00
TGCmdQueue <- m
2019-06-11 03:59:20 +02:00
2019-06-11 16:56:48 +02:00
i := 0
for _ , id := range ids {
2019-06-11 17:41:31 +02:00
msg , err := getObjMsg ( id )
2019-06-11 16:56:48 +02:00
logOnError ( err , "jobBackupExport : getMsg" )
if err == nil {
2019-06-11 17:41:31 +02:00
msgs = append ( msgs , * msg )
2019-06-11 16:50:01 +02:00
}
2019-06-11 16:56:48 +02:00
i = i + 1
2019-06-11 18:00:20 +02:00
if time . Now ( ) . After ( milestone . Add ( 1 * time . Minute ) ) {
2019-06-11 16:56:48 +02:00
txt := fmt . Sprintf ( "Exported %d/%d messages." , i , len ( ids ) )
m = TGCommand {
Type : commandReplyMsg ,
Text : txt ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
}
TGCmdQueue <- m
milestone = time . Now ( )
2019-06-11 16:50:01 +02:00
}
}
2019-06-11 03:59:20 +02:00
2019-06-11 16:56:48 +02:00
bkp . Messages = msgs
b , err := json . Marshal ( bkp )
logOnError ( err , "jobBackupExport : Marshal" )
m = TGCommand {
Type : commandReplyMsg ,
Text : ` Compressing archive ` ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
2019-06-11 16:50:01 +02:00
}
2019-06-11 16:56:48 +02:00
TGCmdQueue <- m
zbuf := new ( bytes . Buffer )
zw := zip . NewWriter ( zbuf )
zf , err := zw . Create ( ` backup.json ` )
logOnError ( err , "jobBackupExport : Create" )
_ , err = zf . Write ( b )
logOnError ( err , "jobBackupExport : Write" )
err = zw . Close ( )
logOnError ( err , "jobBackupExport : Close" )
d := tb . Document { }
d . File = tb . FromReader ( bytes . NewReader ( zbuf . Bytes ( ) ) )
d . FileName = fmt . Sprintf ( "%s.backup.zip" , start . Format ( "20060102150405" ) )
d . Caption = d . FileName
d . MIME = ` application/zip `
m = TGCommand {
2019-06-11 16:50:01 +02:00
Type : commandReplyMsg ,
2019-06-11 16:56:48 +02:00
Text : ` Export done. ` ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
}
TGCmdQueue <- m
m = TGCommand {
Type : commandSendDocument ,
Document : d ,
ToChatID64 : p . ChatID64 ,
2019-06-11 16:50:01 +02:00
}
2019-06-11 16:56:48 +02:00
TGCmdQueue <- m
err = setJobDone ( j . ID64 )
logOnError ( err , "jobBackupExport : setJobDone" )
2019-06-11 16:50:01 +02:00
return
}
2019-06-11 03:59:20 +02:00
2019-06-11 16:56:48 +02:00
func jobBackupImport ( j Job ) {
var p JobPayloadBackupImport
err := setJobStart ( j . ID64 )
logOnError ( err , "jobBackupImport : setJobStart" )
2019-06-11 16:50:01 +02:00
2019-06-11 16:56:48 +02:00
err = json . Unmarshal ( j . Payload , & p )
logOnError ( err , "jobBackupImport : Unmarshal payload" )
2019-06-11 16:50:01 +02:00
2019-06-11 17:38:12 +02:00
resp , err := http . Get ( p . URL )
logOnError ( err , "jobBackupImport : Get" )
defer resp . Body . Close ( )
buf := new ( bytes . Buffer )
buf . ReadFrom ( resp . Body )
m := TGCommand {
Type : commandReplyMsg ,
Text : "File downloaded." ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
}
TGCmdQueue <- m
z := buf . Bytes ( )
r := bytes . NewReader ( z )
zr , err := zip . NewReader ( r , int64 ( len ( z ) ) )
for _ , f := range zr . File {
if strings . Compare ( f . Name , "backup.json" ) == 0 {
rc , err := f . Open ( )
logOnError ( err , "jobBackupImport : Open" )
if err != nil {
2019-06-11 17:53:29 +02:00
return
2019-06-11 17:38:12 +02:00
}
data , err := ioutil . ReadAll ( rc )
logOnError ( err , "jobBackupImport : ReadAll" )
if err != nil {
2019-06-11 17:53:29 +02:00
return
2019-06-11 17:38:12 +02:00
}
log . Printf ( "jobBackupImport : %d uncompressed bytes.\n" , len ( data ) )
rc . Close ( )
bkp := DataBackup { }
err = json . Unmarshal ( data , & bkp )
logOnError ( err , "jobBackupImport : Unmarshal" )
if err != nil {
2019-06-11 17:53:29 +02:00
return
2019-06-11 17:38:12 +02:00
}
2019-06-11 17:58:19 +02:00
for _ , msg := range bkp . Messages {
MQCWMsgQueue <- msg
2019-06-11 17:38:12 +02:00
}
m := TGCommand {
Type : commandReplyMsg ,
Text : "Backup restored." ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
}
TGCmdQueue <- m
err = setJobDone ( j . ID64 )
logOnError ( err , "jobBackupImport : setJobDone" )
return
2019-06-11 16:56:48 +02:00
}
2019-06-11 03:59:20 +02:00
}
2019-06-11 17:55:31 +02:00
m = TGCommand {
2019-06-11 17:38:12 +02:00
Type : commandReplyMsg ,
Text : "Not backup file found in archive." ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
}
TGCmdQueue <- m
2019-06-11 16:56:48 +02:00
err = setJobDone ( j . ID64 )
logOnError ( err , "jobBackupImport : setJobDone" )
2019-06-11 03:59:20 +02:00
return
}
2019-08-08 14:39:23 +02:00
func jobGStock ( j Job ) {
var p JobPayloadGStock
err := setJobStart ( j . ID64 )
logOnError ( err , "jobGStock : setJobStart" )
err = json . Unmarshal ( j . Payload , & p )
logOnError ( err , "jobGStock : Unmarshal payload" )
/ *
clientSendCWMsg ( m . Chat . ID , "/g_stock_res" )
clientSendCWMsg ( m . Chat . ID , "/g_stock_alch" )
clientSendCWMsg ( m . Chat . ID , "/g_stock_misc" )
clientSendCWMsg ( m . Chat . ID , "/g_stock_rec" )
clientSendCWMsg ( m . Chat . ID , "/g_stock_parts" )
clientSendCWMsg ( m . Chat . ID , "/g_stock_other" )
* /
if err == nil {
m := TGCommand {
Type : commandReplyMsg ,
Text : "Message sent." ,
FromMsgID64 : p . MsgID64 ,
FromChatID64 : p . ChatID64 ,
}
TGCmdQueue <- m
}
err = setJobDone ( j . ID64 )
logOnError ( err , "jobGStock : setJobDone" )
return
}