Skip to content

Commit 72b2179

Browse files
committed
test: add Example_direct_queue testing
1 parent ca08324 commit 72b2179

File tree

1 file changed

+49
-13
lines changed

1 file changed

+49
-13
lines changed

example_test.go

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,34 +14,74 @@ func Example_direct_exchange() {
1414
m := mockMessage{
1515
Message: "foo",
1616
}
17-
w := NewWorker(
17+
w1 := NewWorker(
1818
WithSubj("direct_queue"),
1919
WithExchangeName("direct_exchange"),
20-
WithRoutingKey("direct_queue"),
21-
WithTag("direct_queue"),
20+
WithExchangeType("direct"),
21+
WithRoutingKey("direct_exchange"),
2222
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
23-
fmt.Println("get data:", string(m.Bytes()))
23+
fmt.Println("worker01 get data:", string(m.Bytes()))
24+
time.Sleep(100 * time.Millisecond)
2425
return nil
2526
}),
2627
)
28+
29+
q1, err := queue.NewQueue(
30+
queue.WithWorker(w1),
31+
)
32+
if err != nil {
33+
w1.opts.logger.Error(err)
34+
}
35+
q1.Start()
36+
37+
w2 := NewWorker(
38+
WithSubj("direct_queue"),
39+
WithExchangeName("direct_exchange"),
40+
WithExchangeType("direct"),
41+
WithRoutingKey("direct_exchange"),
42+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
43+
fmt.Println("worker02 get data:", string(m.Bytes()))
44+
time.Sleep(100 * time.Millisecond)
45+
return nil
46+
}),
47+
)
48+
49+
q2, err := queue.NewQueue(
50+
queue.WithWorker(w2),
51+
)
52+
if err != nil {
53+
w2.opts.logger.Error(err)
54+
}
55+
q2.Start()
56+
57+
w := NewWorker(
58+
WithExchangeName("direct_exchange"),
59+
WithExchangeType("direct"),
60+
WithRoutingKey("direct_exchange"),
61+
)
62+
2763
q, err := queue.NewQueue(
2864
queue.WithWorker(w),
29-
queue.WithWorkerCount(1),
3065
)
3166
if err != nil {
3267
w.opts.logger.Error(err)
3368
}
3469

35-
q.Start()
3670
time.Sleep(200 * time.Millisecond)
3771
q.Queue(m)
3872
q.Queue(m)
73+
q.Queue(m)
74+
q.Queue(m)
3975
time.Sleep(200 * time.Millisecond)
4076
q.Release()
77+
q1.Release()
78+
q2.Release()
4179

42-
// Output:
43-
// get data: foo
44-
// get data: foo
80+
// Unordered Output:
81+
// worker01 get data: foo
82+
// worker02 get data: foo
83+
// worker01 get data: foo
84+
// worker02 get data: foo
4585
}
4686

4787
// Fanout Exchange
@@ -65,9 +105,7 @@ func Example_fanout_exchange() {
65105
if err != nil {
66106
w1.opts.logger.Error(err)
67107
}
68-
69108
q1.Start()
70-
time.Sleep(200 * time.Millisecond)
71109

72110
w2 := NewWorker(
73111
WithSubj("fanout_queue_2"),
@@ -85,9 +123,7 @@ func Example_fanout_exchange() {
85123
if err != nil {
86124
w2.opts.logger.Error(err)
87125
}
88-
89126
q2.Start()
90-
time.Sleep(200 * time.Millisecond)
91127

92128
w := NewWorker(
93129
WithExchangeName("fanout_exchange"),

0 commit comments

Comments
 (0)