-
Notifications
You must be signed in to change notification settings - Fork 44
/
crdt.go
1554 lines (1367 loc) · 42.3 KB
/
crdt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Package crdt provides a replicated go-datastore (key-value store)
// implementation using Merkle-CRDTs built with IPLD nodes.
//
// This Datastore is agnostic to how new MerkleDAG roots are broadcasted to
// the rest of replicas (`Broadcaster` component) and to how the IPLD nodes
// are made discoverable and retrievable to by other replicas (`DAGSyncer`
// component).
//
// The implementation is based on the "Merkle-CRDTs: Merkle-DAGs meet CRDTs"
// paper by Héctor Sanjuán, Samuli Pöyhtäri and Pedro Teixeira.
//
// Note that, in the absence of compaction (which must be performed manually),
// a crdt.Datastore will only grow in size even when keys are deleted.
//
// The time to be fully synced for new Datastore replicas will depend on how
// fast they can retrieve the DAGs announced by the other replicas, but newer
// values will be available before older ones.
package crdt
import (
"context"
"fmt"
"io"
"math/rand"
"sync"
"sync/atomic"
"time"
dshelp "github.com/ipfs/boxo/datastore/dshelp"
pb "github.com/ipfs/go-ds-crdt/pb"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/pkg/errors"
)
var _ ds.Datastore = (*Datastore)(nil)
var _ ds.Batching = (*Datastore)(nil)
// datastore namespace keys. Short keys save space and memory.
const (
headsNs = "h" // heads
setNs = "s" // set
processedBlocksNs = "b" // blocks
dirtyBitKey = "d" // dirty
versionKey = "crdt_version"
)
// Common errors.
var (
ErrNoMoreBroadcast = errors.New("receiving blocks aborted since no new blocks will be broadcasted")
)
// A Broadcaster provides a way to send (notify) an opaque payload to
// all replicas and to retrieve payloads broadcasted.
type Broadcaster interface {
// Send payload to other replicas.
Broadcast(context.Context, []byte) error
// Obtain the next payload received from the network.
Next(context.Context) ([]byte, error)
}
// A SessionDAGService is a Sessions-enabled DAGService. This type of DAG-Service
// provides an optimized NodeGetter to make multiple related requests. The
// same session-enabled NodeGetter is used to download DAG branches when
// the DAGSyncer supports it.
type SessionDAGService interface {
ipld.DAGService
Session(context.Context) ipld.NodeGetter
}
// Options holds configurable values for Datastore.
type Options struct {
Logger logging.StandardLogger
RebroadcastInterval time.Duration
// The PutHook function is triggered whenever an element
// is successfully added to the datastore (either by a local
// or remote update), and only when that addition is considered the
// prevalent value.
PutHook func(k ds.Key, v []byte)
// The DeleteHook function is triggered whenever a version of an
// element is successfully removed from the datastore (either by a
// local or remote update). Unordered and concurrent updates may
// result in the DeleteHook being triggered even though the element is
// still present in the datastore because it was re-added or not fully
// tombstoned. If that is relevant, use Has() to check if the removed
// element is still part of the datastore.
DeleteHook func(k ds.Key)
// NumWorkers specifies the number of workers ready to walk DAGs
NumWorkers int
// DAGSyncerTimeout specifies how long to wait for a DAGSyncer.
// Set to 0 to disable.
DAGSyncerTimeout time.Duration
// MaxBatchDeltaSize will automatically commit any batches whose
// delta size gets too big. This helps keep DAG nodes small
// enough that they will be transferred by the network.
MaxBatchDeltaSize int
// RepairInterval specifies how often to walk the full DAG until
// the root(s) if it has been marked dirty. 0 to disable.
RepairInterval time.Duration
// MultiHeadProcessing lets several new heads to be processed in
// parallel. This results in more branching in general. More
// branching is not necessarily a bad thing and may improve
// throughput, but everything depends on usage.
MultiHeadProcessing bool
}
func (opts *Options) verify() error {
if opts == nil {
return errors.New("options cannot be nil")
}
if opts.RebroadcastInterval <= 0 {
return errors.New("invalid RebroadcastInterval")
}
if opts.Logger == nil {
return errors.New("the Logger is undefined")
}
if opts.NumWorkers <= 0 {
return errors.New("bad number of NumWorkers")
}
if opts.DAGSyncerTimeout < 0 {
return errors.New("invalid DAGSyncerTimeout")
}
if opts.MaxBatchDeltaSize <= 0 {
return errors.New("invalid MaxBatchDeltaSize")
}
if opts.RepairInterval < 0 {
return errors.New("invalid RepairInterval")
}
return nil
}
// DefaultOptions initializes an Options object with sensible defaults.
func DefaultOptions() *Options {
return &Options{
Logger: logging.Logger("crdt"),
RebroadcastInterval: time.Minute,
PutHook: nil,
DeleteHook: nil,
NumWorkers: 5,
DAGSyncerTimeout: 5 * time.Minute,
// always keeping
// https://github.com/libp2p/go-libp2p-core/blob/master/network/network.go#L23
// in sight
MaxBatchDeltaSize: 1 * 1024 * 1024, // 1MB,
RepairInterval: time.Hour,
MultiHeadProcessing: false,
}
}
// Datastore makes a go-datastore a distributed Key-Value store using
// Merkle-CRDTs and IPLD.
type Datastore struct {
ctx context.Context
cancel context.CancelFunc
opts *Options
logger logging.StandardLogger
// permanent storage
store ds.Datastore
namespace ds.Key
set *set
heads *heads
dagService ipld.DAGService
broadcaster Broadcaster
seenHeadsMux sync.RWMutex
seenHeads map[cid.Cid]struct{}
curDeltaMux sync.Mutex
curDelta *pb.Delta // current, unpublished delta
wg sync.WaitGroup
jobQueue chan *dagJob
sendJobs chan *dagJob
// keep track of children to be fetched so only one job does every
// child
queuedChildren *cidSafeSet
}
type dagJob struct {
ctx context.Context // A job context for tracing
session *sync.WaitGroup // A waitgroup to wait for all related jobs to conclude
nodeGetter *crdtNodeGetter // a node getter to use
root cid.Cid // the root of the branch we are walking down
rootPrio uint64 // the priority of the root delta
delta *pb.Delta // the current delta
node ipld.Node // the current ipld Node
}
// New returns a Merkle-CRDT-based Datastore using the given one to persist
// all the necessary data under the given namespace. It needs a DAG-Service
// component for IPLD nodes and a Broadcaster component to distribute and
// receive information to and from the rest of replicas. Actual implementation
// of these must be provided by the user, but it normally means using
// ipfs-lite (https://github.com/hsanjuan/ipfs-lite) as a DAG Service and the
// included libp2p PubSubBroadcaster as a Broadcaster.
//
// The given Datastore is used to back all CRDT-datastore contents and
// accounting information. When using an asynchronous datastore, the user is
// in charge of calling Sync() regularly. Sync() will persist paths related to
// the given prefix, but note that if other replicas are modifying the
// datastore, the prefixes that will need syncing are not only those modified
// by the local replica. Therefore the user should consider calling Sync("/"),
// with an empty prefix, in that case, or use a synchronous underlying
// datastore that persists things directly on write.
//
// The CRDT-Datastore should call Close() before the given store is closed.
func New(
store ds.Datastore,
namespace ds.Key,
dagSyncer ipld.DAGService,
bcast Broadcaster,
opts *Options,
) (*Datastore, error) {
if opts == nil {
opts = DefaultOptions()
}
if err := opts.verify(); err != nil {
return nil, err
}
// <namespace>/set
fullSetNs := namespace.ChildString(setNs)
// <namespace>/heads
fullHeadsNs := namespace.ChildString(headsNs)
setPutHook := func(k string, v []byte) {
if opts.PutHook == nil {
return
}
dsk := ds.NewKey(k)
opts.PutHook(dsk, v)
}
setDeleteHook := func(k string) {
if opts.DeleteHook == nil {
return
}
dsk := ds.NewKey(k)
opts.DeleteHook(dsk)
}
ctx, cancel := context.WithCancel(context.Background())
set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error setting up crdt set")
}
heads, err := newHeads(ctx, store, fullHeadsNs, opts.Logger)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error building heads")
}
dstore := &Datastore{
ctx: ctx,
cancel: cancel,
opts: opts,
logger: opts.Logger,
store: store,
namespace: namespace,
set: set,
heads: heads,
dagService: dagSyncer,
broadcaster: bcast,
seenHeads: make(map[cid.Cid]struct{}),
jobQueue: make(chan *dagJob, opts.NumWorkers),
sendJobs: make(chan *dagJob),
queuedChildren: newCidSafeSet(),
}
err = dstore.applyMigrations(ctx)
if err != nil {
cancel()
return nil, err
}
headList, maxHeight, err := dstore.heads.List(ctx)
if err != nil {
cancel()
return nil, err
}
dstore.logger.Infof(
"crdt Datastore created. Number of heads: %d. Current max-height: %d. Dirty: %t",
len(headList),
maxHeight,
dstore.IsDirty(ctx),
)
// sendJobWorker + NumWorkers
dstore.wg.Add(1 + dstore.opts.NumWorkers)
go func() {
defer dstore.wg.Done()
dstore.sendJobWorker(ctx)
}()
for i := 0; i < dstore.opts.NumWorkers; i++ {
go func() {
defer dstore.wg.Done()
dstore.dagWorker()
}()
}
dstore.wg.Add(4)
go func() {
defer dstore.wg.Done()
dstore.handleNext(ctx)
}()
go func() {
defer dstore.wg.Done()
dstore.rebroadcast(ctx)
}()
go func() {
defer dstore.wg.Done()
dstore.repair(ctx)
}()
go func() {
defer dstore.wg.Done()
dstore.logStats(ctx)
}()
return dstore, nil
}
func (store *Datastore) handleNext(ctx context.Context) {
if store.broadcaster == nil { // offline
return
}
for {
select {
case <-ctx.Done():
return
default:
}
data, err := store.broadcaster.Next(ctx)
if err != nil {
if err == ErrNoMoreBroadcast || ctx.Err() != nil {
return
}
store.logger.Error(err)
continue
}
bCastHeads, err := store.decodeBroadcast(ctx, data)
if err != nil {
store.logger.Error(err)
continue
}
processHead := func(ctx context.Context, c cid.Cid) {
err = store.handleBlock(ctx, c) //handleBlock blocks
if err != nil {
store.logger.Errorf("error processing new head: %s", err)
// For posterity: do not mark the store as
// Dirty if we could not handle a block. If an
// error happens here, it means the node could
// not be fetched, thus it could not be
// processed, thus it did not leave a branch
// half-processed and there's nothign to
// recover.
// disabled: store.MarkDirty()
}
}
// if we have no heads, make seen-heads heads immediately. On
// a fresh start, this allows us to start building on top of
// recent heads, even if we have not fully synced rather than
// creating new orphan branches.
curHeadCount, err := store.heads.Len(ctx)
if err != nil {
store.logger.Error(err)
continue
}
if curHeadCount == 0 {
dg := &crdtNodeGetter{NodeGetter: store.dagService}
for _, head := range bCastHeads {
prio, err := dg.GetPriority(ctx, head)
if err != nil {
store.logger.Error(err)
continue
}
err = store.heads.Add(ctx, head, prio)
if err != nil {
store.logger.Error(err)
}
}
}
// For each head, we process it.
for _, head := range bCastHeads {
// A thing to try here would be to process heads in
// the same broadcast in parallel, but do not process
// heads from multiple broadcasts in parallel.
if store.opts.MultiHeadProcessing {
go processHead(ctx, head)
} else {
processHead(ctx, head)
}
store.seenHeadsMux.Lock()
store.seenHeads[head] = struct{}{}
store.seenHeadsMux.Unlock()
}
// TODO: We should store trusted-peer signatures associated to
// each head in a timecache. When we broadcast, attach the
// signatures (along with our own) to the broadcast.
// Other peers can use the signatures to verify that the
// received CIDs have been issued by a trusted peer.
}
}
func (store *Datastore) decodeBroadcast(ctx context.Context, data []byte) ([]cid.Cid, error) {
// Make a list of heads we received
bcastData := pb.CRDTBroadcast{}
err := proto.Unmarshal(data, &bcastData)
if err != nil {
return nil, err
}
// Compatibility: before we were publishing CIDs directly
msgReflect := bcastData.ProtoReflect()
if len(msgReflect.GetUnknown()) > 0 {
// Backwards compatibility
c, err := cid.Cast(msgReflect.GetUnknown())
if err != nil {
return nil, err
}
store.logger.Debugf("a legacy CID broadcast was received for: %s", c)
return []cid.Cid{c}, nil
}
bCastHeads := make([]cid.Cid, len(bcastData.Heads))
for i, protoHead := range bcastData.Heads {
c, err := cid.Cast(protoHead.Cid)
if err != nil {
return bCastHeads, err
}
bCastHeads[i] = c
}
return bCastHeads, nil
}
func (store *Datastore) encodeBroadcast(ctx context.Context, heads []cid.Cid) ([]byte, error) {
bcastData := pb.CRDTBroadcast{}
for _, c := range heads {
bcastData.Heads = append(bcastData.Heads, &pb.Head{Cid: c.Bytes()})
}
return proto.Marshal(&bcastData)
}
func randomizeInterval(d time.Duration) time.Duration {
// 30% of the configured interval
leeway := (d * 30 / 100)
// A random number between -leeway|+leeway
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
randomInterval := time.Duration(randGen.Int63n(int64(leeway*2))) - leeway
return d + randomInterval
}
func (store *Datastore) rebroadcast(ctx context.Context) {
timer := time.NewTimer(randomizeInterval(store.opts.RebroadcastInterval))
for {
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
store.rebroadcastHeads(ctx)
timer.Reset(randomizeInterval(store.opts.RebroadcastInterval))
}
}
}
func (store *Datastore) repair(ctx context.Context) {
if store.opts.RepairInterval == 0 {
return
}
timer := time.NewTimer(0) // fire immediately on start
for {
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
if !store.IsDirty(ctx) {
store.logger.Info("store is marked clean. No need to repair")
} else {
store.logger.Warn("store is marked dirty. Starting DAG repair operation")
err := store.repairDAG(ctx)
if err != nil {
store.logger.Error(err)
}
}
timer.Reset(store.opts.RepairInterval)
}
}
}
// regularly send out a list of heads that we have not recently seen
func (store *Datastore) rebroadcastHeads(ctx context.Context) {
// Get our current list of heads
heads, _, err := store.heads.List(ctx)
if err != nil {
store.logger.Error(err)
return
}
var headsToBroadcast []cid.Cid
store.seenHeadsMux.RLock()
{
headsToBroadcast = make([]cid.Cid, 0, len(store.seenHeads))
for _, h := range heads {
if _, ok := store.seenHeads[h]; !ok {
headsToBroadcast = append(headsToBroadcast, h)
}
}
}
store.seenHeadsMux.RUnlock()
// Send them out
err = store.broadcast(ctx, headsToBroadcast)
if err != nil {
store.logger.Warn("broadcast failed: %v", err)
}
// Reset the map
store.seenHeadsMux.Lock()
store.seenHeads = make(map[cid.Cid]struct{})
store.seenHeadsMux.Unlock()
}
// Log some stats every 5 minutes.
func (store *Datastore) logStats(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
for {
select {
case <-ticker.C:
heads, height, err := store.heads.List(ctx)
if err != nil {
store.logger.Errorf("error listing heads: %s", err)
}
store.logger.Infof(
"Number of heads: %d. Current max height: %d. Queued jobs: %d. Dirty: %t",
len(heads),
height,
len(store.jobQueue),
store.IsDirty(ctx),
)
case <-ctx.Done():
ticker.Stop()
return
}
}
}
// handleBlock takes care of vetting, retrieving and applying
// CRDT blocks to the Datastore.
func (store *Datastore) handleBlock(ctx context.Context, c cid.Cid) error {
// Ignore already processed blocks.
// This includes the case when the block is a current
// head.
isProcessed, err := store.isProcessed(ctx, c)
if err != nil {
return errors.Wrapf(err, "error checking for known block %s", c)
}
if isProcessed {
store.logger.Debugf("%s is known. Skip walking tree", c)
return nil
}
return store.handleBranch(ctx, c, c)
}
// send job starting at the given CID in a branch headed by a given head.
// this can be used to continue branch processing from a certain point.
func (store *Datastore) handleBranch(ctx context.Context, head, c cid.Cid) error {
// Walk down from this block
cctx, cancel := context.WithCancel(ctx)
defer cancel()
dg := &crdtNodeGetter{NodeGetter: store.dagService}
if sessionMaker, ok := store.dagService.(SessionDAGService); ok {
dg = &crdtNodeGetter{NodeGetter: sessionMaker.Session(cctx)}
}
var session sync.WaitGroup
err := store.sendNewJobs(ctx, &session, dg, head, 0, []cid.Cid{c})
session.Wait()
return err
}
// dagWorker should run in its own goroutine. Workers are launched during
// initialization in New().
func (store *Datastore) dagWorker() {
for job := range store.jobQueue {
ctx := job.ctx
select {
case <-ctx.Done():
// drain jobs from queue when we are done
job.session.Done()
continue
default:
}
children, err := store.processNode(
ctx,
job.nodeGetter,
job.root,
job.rootPrio,
job.delta,
job.node,
)
if err != nil {
store.logger.Error(err)
store.MarkDirty(ctx)
job.session.Done()
continue
}
go func(j *dagJob) {
err := store.sendNewJobs(ctx, j.session, j.nodeGetter, j.root, j.rootPrio, children)
if err != nil {
store.logger.Error(err)
store.MarkDirty(ctx)
}
j.session.Done()
}(job)
}
}
// sendNewJobs calls getDeltas (GetMany) on the crdtNodeGetter with the given
// children and sends each response to the workers. It will block until all
// jobs have been queued.
func (store *Datastore) sendNewJobs(ctx context.Context, session *sync.WaitGroup, ng *crdtNodeGetter, root cid.Cid, rootPrio uint64, children []cid.Cid) error {
if len(children) == 0 {
return nil
}
cctx, cancel := context.WithTimeout(ctx, store.opts.DAGSyncerTimeout)
defer cancel()
// Special case for root
if rootPrio == 0 {
prio, err := ng.GetPriority(cctx, children[0])
if err != nil {
return errors.Wrapf(err, "error getting root delta priority")
}
rootPrio = prio
}
goodDeltas := make(map[cid.Cid]struct{})
var err error
loop:
for deltaOpt := range ng.GetDeltas(cctx, children) {
// we abort whenever we a delta comes back in error.
if deltaOpt.err != nil {
err = errors.Wrapf(deltaOpt.err, "error getting delta")
break
}
goodDeltas[deltaOpt.node.Cid()] = struct{}{}
session.Add(1)
job := &dagJob{
ctx: ctx,
session: session,
nodeGetter: ng,
root: root,
delta: deltaOpt.delta,
node: deltaOpt.node,
rootPrio: rootPrio,
}
select {
case store.sendJobs <- job:
case <-ctx.Done():
// the job was never sent, so it cannot complete.
session.Done()
// We are in the middle of sending jobs, thus we left
// something unprocessed.
err = ctx.Err()
break loop
}
}
// This is a safe-guard in case GetDeltas() returns less deltas than
// asked for. It clears up any children that could not be fetched from
// the queue. The rest will remove themselves in processNode().
// Hector: as far as I know, this should not execute unless errors
// happened.
for _, child := range children {
if _, ok := goodDeltas[child]; !ok {
store.logger.Warn("GetDeltas did not include all children")
store.queuedChildren.Remove(child)
}
}
return err
}
// the only purpose of this worker is to be able to orderly shut-down job
// workers without races by becoming the only sender for the store.jobQueue
// channel.
func (store *Datastore) sendJobWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
if len(store.sendJobs) > 0 {
// we left something in the queue
store.MarkDirty(ctx)
}
close(store.jobQueue)
return
case j := <-store.sendJobs:
store.jobQueue <- j
}
}
}
func (store *Datastore) processedBlockKey(c cid.Cid) ds.Key {
return store.namespace.ChildString(processedBlocksNs).ChildString(dshelp.MultihashToDsKey(c.Hash()).String())
}
func (store *Datastore) isProcessed(ctx context.Context, c cid.Cid) (bool, error) {
return store.store.Has(ctx, store.processedBlockKey(c))
}
func (store *Datastore) markProcessed(ctx context.Context, c cid.Cid) error {
return store.store.Put(ctx, store.processedBlockKey(c), nil)
}
func (store *Datastore) dirtyKey() ds.Key {
return store.namespace.ChildString(dirtyBitKey)
}
// MarkDirty marks the Datastore as dirty.
func (store *Datastore) MarkDirty(ctx context.Context) {
store.logger.Warn("marking datastore as dirty")
err := store.store.Put(ctx, store.dirtyKey(), nil)
if err != nil {
store.logger.Errorf("error setting dirty bit: %s", err)
}
}
// IsDirty returns whether the datastore is marked dirty.
func (store *Datastore) IsDirty(ctx context.Context) bool {
ok, err := store.store.Has(ctx, store.dirtyKey())
if err != nil {
store.logger.Errorf("error checking dirty bit: %s", err)
}
return ok
}
// MarkClean removes the dirty mark from the datastore.
func (store *Datastore) MarkClean(ctx context.Context) {
store.logger.Info("marking datastore as clean")
err := store.store.Delete(ctx, store.dirtyKey())
if err != nil {
store.logger.Errorf("error clearing dirty bit: %s", err)
}
}
// processNode merges the delta in a node and has the logic about what to do
// then.
func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, root cid.Cid, rootPrio uint64, delta *pb.Delta, node ipld.Node) ([]cid.Cid, error) {
// First, merge the delta in this node.
current := node.Cid()
blockKey := dshelp.MultihashToDsKey(current.Hash()).String()
err := store.set.Merge(ctx, delta, blockKey)
if err != nil {
return nil, errors.Wrapf(err, "error merging delta from %s", current)
}
// Record that we have processed the node so that any other worker
// can skip it.
err = store.markProcessed(ctx, current)
if err != nil {
return nil, errors.Wrapf(err, "error recording %s as processed", current)
}
// Remove from the set that has the children which are queued for
// processing.
store.queuedChildren.Remove(node.Cid())
// Some informative logging
if prio := delta.GetPriority(); prio%50 == 0 {
store.logger.Infof("merged delta from node %s (priority: %d)", current, prio)
} else {
store.logger.Debugf("merged delta from node %s (priority: %d)", current, prio)
}
links := node.Links()
children := []cid.Cid{}
// We reached the bottom. Our head must become a new head.
if len(links) == 0 {
err := store.heads.Add(ctx, root, rootPrio)
if err != nil {
return nil, errors.Wrapf(err, "error adding head %s", root)
}
}
// Return children that:
// a) Are not processed
// b) Are not going to be processed by someone else.
//
// For every other child, add our node as Head.
addedAsHead := false // small optimization to avoid adding as head multiple times.
for _, l := range links {
child := l.Cid
isHead, _, err := store.heads.IsHead(ctx, child)
if err != nil {
return nil, errors.Wrapf(err, "error checking if %s is head", child)
}
isProcessed, err := store.isProcessed(ctx, child)
if err != nil {
return nil, errors.Wrapf(err, "error checking for known block %s", child)
}
if isHead {
// reached one of the current heads. Replace it with
// the tip of this branch
err := store.heads.Replace(ctx, child, root, rootPrio)
if err != nil {
return nil, errors.Wrapf(err, "error replacing head: %s->%s", child, root)
}
addedAsHead = true
// If this head was already processed, continue this
// protects the case when something is a head but was
// not processed (potentially could happen during
// first sync when heads are set before processing, a
// both a node and its child are heads - which I'm not
// sure if it can happen at all, but good to safeguard
// for it).
if isProcessed {
continue
}
}
// If the child has already been processed or someone else has
// reserved it for processing, then we can make ourselves a
// head right away because we are not meant to replace an
// existing head. Otherwise, mark it for processing and
// keep going down this branch.
if isProcessed || !store.queuedChildren.Visit(child) {
if !addedAsHead {
err = store.heads.Add(ctx, root, rootPrio)
if err != nil {
// Don't let this failure prevent us
// from processing the other links.
store.logger.Error(errors.Wrapf(err, "error adding head %s", root))
}
}
addedAsHead = true
continue
}
// We can return this child because it is not processed and we
// reserved it in the queue.
children = append(children, child)
}
return children, nil
}
// RepairDAG is used to walk down the chain until a non-processed node is
// found and at that moment, queues it for processing.
func (store *Datastore) repairDAG(ctx context.Context) error {
start := time.Now()
defer func() {
store.logger.Infof("DAG repair finished. Took %s", time.Since(start).Truncate(time.Second))
}()
getter := &crdtNodeGetter{store.dagService}
heads, _, err := store.heads.List(ctx)
if err != nil {
return errors.Wrapf(err, "error listing heads")
}
type nodeHead struct {
head cid.Cid
node cid.Cid
}
var nodes []nodeHead
queued := cid.NewSet()
for _, h := range heads {
nodes = append(nodes, nodeHead{head: h, node: h})
queued.Add(h)
}
// For logging
var visitedNodes uint64
var lastPriority uint64
var queuedNodes uint64
exitLogging := make(chan struct{})
defer close(exitLogging)
go func() {
ticker := time.NewTicker(5 * time.Minute)
for {
select {
case <-exitLogging:
ticker.Stop()
return
case <-ticker.C:
store.logger.Infof(
"DAG repair in progress. Visited nodes: %d. Last priority: %d. Queued nodes: %d",
atomic.LoadUint64(&visitedNodes),
atomic.LoadUint64(&lastPriority),
atomic.LoadUint64(&queuedNodes),
)
}
}
}()
for {
// GetDelta does not seem to respond well to context
// cancellations (probably this goes down to the Blockstore
// still working with a cancelled context). So we need to put
// this here.
select {
case <-ctx.Done():
return nil
default:
}
if len(nodes) == 0 {
break
}
nh := nodes[0]
nodes = nodes[1:]
cur := nh.node
head := nh.head
cctx, cancel := context.WithTimeout(ctx, store.opts.DAGSyncerTimeout)
n, delta, err := getter.GetDelta(cctx, cur)
if err != nil {
cancel()
return errors.Wrapf(err, "error getting node for reprocessing %s", cur)
}
cancel()
isProcessed, err := store.isProcessed(ctx, cur)
if err != nil {
return errors.Wrapf(err, "error checking for reprocessed block %s", cur)
}
if !isProcessed {
store.logger.Debugf("reprocessing %s / %d", cur, delta.Priority)
// start syncing from here.
// do not add children to our queue.
err = store.handleBranch(ctx, head, cur)
if err != nil {
return errors.Wrapf(err, "error reprocessing block %s", cur)
}
}
links := n.Links()
for _, l := range links {
if queued.Visit(l.Cid) {
nodes = append(nodes, (nodeHead{head: head, node: l.Cid}))
}
}
atomic.StoreUint64(&queuedNodes, uint64(len(nodes)))
atomic.AddUint64(&visitedNodes, 1)
atomic.StoreUint64(&lastPriority, delta.Priority)
}
// If we are here we have successfully reprocessed the chain until the
// bottom.