-
Notifications
You must be signed in to change notification settings - Fork 8
/
reference-verify.js
224 lines (188 loc) · 6.25 KB
/
reference-verify.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
'use strict'
const Status = require('./reference-status-enum')
const { Buffer } = require('buffer')
const kHadEnd = Symbol('had end')
const kHadError = Symbol('had error')
const kSentError = Symbol('sent error')
const kPullInProgress = Symbol('pull in progress')
const kPullInProgressError = Symbol('pull in progress error')
class Verify {
constructor () {
this.sink = null
this.source = null
this[kHadEnd] = false
this[kHadError] = false
this[kSentError] = false
this[kPullInProgress] = false
this[kPullInProgressError] = null
}
bindSource (source) {
if (this.source !== null) {
throw new Error('[verify] already bound to a Source')
}
if (typeof source !== 'object') {
throw new TypeError('[verify] source must be an object')
}
if (typeof source.pull !== 'function') {
throw new TypeError('[verify] source must have a pull() function')
}
if (typeof source.bindSink !== 'function') {
throw new TypeError('[verify] source must have a bindSink() function')
}
// `stop` must be a function if it exists on a source (extension-stop)
if (source.stop !== undefined &&
typeof source.stop !== 'function') {
throw new TypeError(`[verify] extension-stop conflict with stop() property on source: ${source.stop}`)
}
source.bindSink(this)
this.source = source
return this
}
bindSink (sink) {
if (this.sink !== null) {
throw new Error('[verify] already bound to a Sink')
}
if (typeof sink !== 'object') {
throw new TypeError('[verify] sink must be an object')
}
if (typeof sink.next !== 'function') {
throw new TypeError('[verify] sink must have a next() function')
}
// Start() must always exist on non-passthroughs
if (typeof sink.pull !== 'function' &&
typeof sink.next !== 'function') {
throw new TypeError('[verify] non-passthrough sink must have a start() function')
}
this.sink = sink
}
next (status, error, buffer, bytes) {
checkBind(this)
// console.error(`Verify.next [${Status[status]}]`)
if (this[kHadError]) {
if (this[kSentError]) {
return
}
this[kSentError] = true
this.sink.next(Status.error, error, buffer, bytes)
}
// next after end
if (this[kHadEnd]) {
throw new Error('[verify] next was called after stream ended')
}
// the proper place to notify of a multiple pull
if (this[kPullInProgressError] !== null) {
// console.error('Verify.next kPullInProgressError')
this.source.pull(this[kPullInProgressError], Buffer.alloc(0))
this[kPullInProgressError] = null
return
}
try {
// status
if (typeof status !== 'number' &&
Status[Status] === undefined) {
throw new TypeError(`[verify] status passed to next() was not a valid status: ${status}`)
}
// error
if (error !== null && !(error instanceof Error)) {
throw new TypeError(`[verify] error passed to next() was non-null and not an Error: ${error}`)
}
if (error !== null && status !== Status.error) {
throw new TypeError(`[verify] error passed to next() but status was not 'error', instead: ${Status[status]}`)
}
// buffer
if (error !== null && !Buffer.isBuffer(buffer)) {
throw new TypeError(`[verify] buffer passed to next() was not a Buffer: ${buffer}`)
}
// bytes
if (error !== null && typeof bytes !== 'number') {
throw new TypeError(`[verify] bytes passed to next() was not a number: ${bytes}`)
}
// buffer and bytes are mutual
if ((buffer && bytes === undefined) || (bytes !== undefined && !buffer)) {
throw new TypeError(`[verify] buffer and bytes must be mutually provided`)
}
} catch (err) {
this[kHadError] = true
this.source.pull(err, Buffer.alloc(0))
return
}
// duplicate or unwarrented next()
if (!this[kPullInProgress]) {
// If doing this explodes things were wrong enough to warrent it.
this[kHadError] = true
// console.error('Verify.next unwarrentedNext')
this.source.pull(
new Error('[verify] next() was called without a pull in progress'),
Buffer.alloc(0)
)
return
}
this[kPullInProgress] = false
if (status === Status.end) {
this[kHadEnd] = true
}
if (error !== null) {
this[kHadError] = true
this[kSentError] = true
}
this.sink.next(status, error, buffer, bytes)
}
pull (error, buffer) {
checkBind(this)
// console.error(`Verify.pull [${error}]`)
// multiple pull()
if (this[kPullInProgress]) {
this[kPullInProgressError] = new Error(`[verify] a pull() was already in progress, came from: ${this.sink}`)
return
}
this[kPullInProgress] = true
// pull after errored
if (this[kHadError]) {
if (this[kSentError]) {
return
}
// If this occurs all upstream components are already in a closed or errored state.
// Return to the nearest available handler, the component which sent us the new pull.
this[kSentError] = true
this.sink.next(
Status.error,
new Error(`[verify] verify has already tracked an error, invalid post-mortem pull() made by: ${this.sink}`),
Buffer.alloc(0),
0
)
return
}
// pull after end
if (this[kHadEnd]) {
throw new Error('[verify] pull was called after stream ended')
}
try {
// error
if (error !== null && !(error instanceof Error)) {
throw new TypeError(`[verify] error passed to pull() was non-null and not an Error: ${error}`)
}
// buffer
if (buffer !== undefined && !Buffer.isBuffer(buffer)) {
throw new TypeError(`[verify] buffer passed to pull() was not a Buffer: ${buffer}`)
}
} catch (err) {
this[kHadError] = true
// console.error('Verify.pull kHadError')
this.source.pull(err, Buffer.alloc(0))
return
}
if (error !== null) {
this[kHadError] = true
}
this.source.pull(error, buffer)
}
}
function checkBind (self) {
if (!self.source) {
throw new Error('[verify] source not yet bound')
}
if (!self.sink) {
throw new Error('[verify] sink not yet bound')
}
}
module.exports = Verify