-
Notifications
You must be signed in to change notification settings - Fork 8
/
amqp.go
138 lines (116 loc) · 2.84 KB
/
amqp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2015 The rpcmq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpcmq
import (
"crypto/tls"
"fmt"
"sync"
"time"
"github.com/streadway/amqp"
)
// RetrySleep is the time between retries if the connection with the broker
// is lost.
var RetrySleep = 2 * time.Second
type publishing struct {
msg amqp.Publishing
timestamp time.Time
}
// DeliveryMode represents the boker's delivery mode.
type DeliveryMode uint8
// Delivery modes.
const (
// Persistent delivery mode means that messages will be restored to
// durable queues during server restart.
Persistent = DeliveryMode(amqp.Persistent)
// Transient delivery mode means higher throughput but messages will
// not be restored on broker restart.
Transient = DeliveryMode(amqp.Transient)
)
type amqpClient struct {
uri string
consumerTag string
conn *amqp.Connection
channel *amqp.Channel
done chan bool
// The mutex protects the acks/nacks channels from being used
// simultaneously by multiple publishing processes.
mu sync.Mutex
acks, nacks chan uint64
setupFunc func() error
tlsConfig *tls.Config
}
func newAmqpClient(uri string) *amqpClient {
ac := &amqpClient{
uri: uri,
done: make(chan bool),
}
return ac
}
func (ac *amqpClient) init() error {
var err error
ac.conn, err = amqp.DialTLS(ac.uri, ac.tlsConfig)
if err != nil {
return fmt.Errorf("DialTLS: %v", err)
}
ac.channel, err = ac.conn.Channel()
if err != nil {
return fmt.Errorf("Channel: %v", err)
}
if err := ac.channel.Confirm(false); err != nil {
return fmt.Errorf("Confirm: %v", err)
}
ac.acks, ac.nacks = ac.channel.NotifyConfirm(make(chan uint64), make(chan uint64))
go ac.handleMsgs()
return nil
}
func (ac *amqpClient) handleMsgs() {
returns := ac.channel.NotifyReturn(make(chan amqp.Return))
errors := ac.channel.NotifyClose(make(chan *amqp.Error))
for {
select {
case ret, ok := <-returns:
if !ok {
logf("returns channel closed")
return
}
if ret.ReplyCode == amqp.NoRoute {
panic("no route")
}
case _, ok := <-errors:
if !ok {
logf("errors channel closed")
return
}
logf("shutdown")
ac.shutdown()
for {
logf("retrying")
if err := ac.init(); err != nil {
logf("amqpClient init: %v", err)
time.Sleep(RetrySleep)
continue
}
if err := ac.setupFunc(); err != nil {
panic(fmt.Errorf("setup: %v", err))
}
logf("connected")
return
}
}
}
}
func (ac *amqpClient) shutdown() {
if ac.consumerTag != "" {
if err := ac.channel.Cancel(ac.consumerTag, false); err != nil {
logf("Channel Cancel: %v", err)
}
}
<-ac.done
if err := ac.channel.Close(); err != nil {
logf("Channel Close: %v", err)
}
if err := ac.conn.Close(); err != nil {
logf("Connection Close: %v", err)
}
}