-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathCouchDbChangeHundler.js
More file actions
91 lines (84 loc) · 2.6 KB
/
CouchDbChangeHundler.js
File metadata and controls
91 lines (84 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
const debug = require('debug')
const stream = require('stream')
const ChangesStream = require('changes-stream-fix-retry')
const PouchDB = require('pouchdb-core')
.plugin(require('pouchdb-adapter-http'))
.plugin(require('pouchdb-upsert'))
class CouchDbChangeHundler {
constructor() {
this.logTag = 'cch'
this.seqKey = 'seq'
this.exitOnSigInt = true
}
start() {
this._logInfo = debug(`${this.logTag}:info`)
this._logError = debug(`${this.logTag}:error`)
this._registerSigInt()
this._db = new PouchDB(this.options.db, {skipSetup: true, ajax: {timeout: 120000}})
this._logInfo('get last seq')
return this._db.get(this.seqId)
.then(seqDoc => {
this._logInfo('start since', seqDoc[this.seqKey])
this.options.since = seqDoc[this.seqKey]
}, error => {
if ('not_found' === error.name) {
this._logInfo('starting with first seq')
} else {
this._logError('Cannot get seq', error)
throw error
}
})
.then(() => this._startFollow())
.catch(error => this._logError('error get seq', error))
}
stop() {
this._logInfo('stop')
this._changeStream.destroy()
}
_registerSigInt() {
if (this.exitOnSigInt) {
process.on('SIGINT', function() {
process.stdout.write('\n')
process.exit(2)
})
}
}
_startFollow() {
this._logInfo('starting changes stream', this.options)
this._changeStream = new ChangesStream(this.options)
this._changeStream.pipe(new stream.Writable({
write: (change, encoding, next) => {
if (this.seqId === change.id) {
this._logInfo('seq is equals')
return
}
Promise.resolve()
.then(() => this.handler(null, change))
.then(() => {
this._logInfo('upsert', change.seq)
return this._db.upsert(this.seqId, (docSeq) => {
if (!docSeq[this.seqKey] || parseInt(docSeq[this.seqKey], 10) < parseInt(change.seq, 10)) {
docSeq[this.seqKey] = change.seq
this._logInfo('save seq', docSeq[this.seqKey])
return docSeq
}
else {
this._logError('seq is wrong', docSeq[this.seqKey])
}
})
})
.then(() => {
this._logInfo('resume changes')
next()
})
.catch(error => {
this._logError('_onChange', error)
next(error)
return this.handler(error, null)
})
},
objectMode: true
}))
}
}
module.exports = CouchDbChangeHundler