Skip to content

Commit 63feee0

Browse files
committed
chore(queue): support custom consumer name
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 119104e commit 63feee0

File tree

2 files changed

+22
-13
lines changed

2 files changed

+22
-13
lines changed

options.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,24 @@ type options struct {
1515
logger queue.Logger
1616
addr string
1717
subj string
18+
tag string
1819
}
1920

20-
// WithAddr setup the addr of NATS
21+
// WithAddr setup the URI
2122
func WithAddr(addr string) Option {
2223
return func(w *options) {
2324
w.addr = "nats://" + addr
2425
}
2526
}
2627

27-
// WithSubj setup the subject of NATS
28+
// WithAddr setup the tag
29+
func WithTag(tag string) Option {
30+
return func(w *options) {
31+
w.tag = tag
32+
}
33+
}
34+
35+
// WithSubj setup the topic
2836
func WithSubj(subj string) Option {
2937
return func(w *options) {
3038
w.subj = subj
@@ -49,6 +57,7 @@ func newOptions(opts ...Option) options {
4957
defaultOpts := options{
5058
addr: "amqp://guest:guest@localhost:5672/",
5159
subj: "queue",
60+
tag: "golang-queue",
5261
logger: queue.NewLogger(),
5362
runFunc: func(context.Context, core.QueuedMessage) error {
5463
return nil

rabbitmq.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ var _ core.Worker = (*Worker)(nil)
1717

1818
// Worker for NSQ
1919
type Worker struct {
20-
client *amqp.Connection
20+
conn *amqp.Connection
2121
channel *amqp.Channel
2222
stop chan struct{}
2323
stopFlag int32
@@ -36,12 +36,12 @@ func NewWorker(opts ...Option) *Worker {
3636
tasks: make(chan amqp.Delivery),
3737
}
3838

39-
w.client, err = amqp.Dial(w.opts.addr)
39+
w.conn, err = amqp.Dial(w.opts.addr)
4040
if err != nil {
4141
panic(err)
4242
}
4343

44-
w.channel, err = w.client.Channel()
44+
w.channel, err = w.conn.Channel()
4545
if err != nil {
4646
panic(err)
4747
}
@@ -66,13 +66,13 @@ func (w *Worker) startConsumer() (err error) {
6666
}
6767

6868
w.tasks, err = w.channel.Consume(
69-
q.Name, // queue
70-
"", // consumer
71-
true, // auto-ack
72-
false, // exclusive
73-
false, // no-local
74-
false, // no-wait
75-
nil, // args
69+
q.Name, // queue
70+
w.opts.tag, // consumer
71+
true, // auto-ack
72+
false, // exclusive
73+
false, // no-local
74+
false, // no-wait
75+
nil, // args
7676
)
7777

7878
if err != nil {
@@ -152,7 +152,7 @@ func (w *Worker) Shutdown() error {
152152
if err := w.channel.Cancel("", true); err != nil {
153153
w.opts.logger.Error(err)
154154
}
155-
if err := w.client.Close(); err != nil {
155+
if err := w.conn.Close(); err != nil {
156156
w.opts.logger.Error(err)
157157
}
158158
})

0 commit comments

Comments
 (0)