@@ -45,6 +45,7 @@ func (s *DispatcherSuite) TestContextAdapters(c *gc.C) {
45
45
func (s * DispatcherSuite ) TestDispatchCases (c * gc.C ) {
46
46
var cc mockClientConn
47
47
var disp = dispatcherBuilder {zone : "local" }.Build (& cc , balancer.BuildOptions {}).(* dispatcher )
48
+ cc .disp = disp
48
49
close (disp .sweepDoneCh ) // Disable async sweeping.
49
50
50
51
// Case: Called without a dispatchRoute. Expect it panics.
@@ -58,75 +59,75 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
58
59
// SubConn to the default service address is started.
59
60
var _ , err = disp .Pick (balancer.PickInfo {Ctx : ctx })
60
61
c .Check (err , gc .Equals , balancer .ErrNoSubConnAvailable )
61
- c .Check (cc .created , gc .DeepEquals , []mockSubConn {"default.addr" })
62
+ c .Check (cc .created , gc .DeepEquals , []mockSubConn {mockSubConn { Name : "default.addr" , disp : disp } })
62
63
cc .created = nil
63
64
64
65
// Case: Default connection transitions to Ready. Expect it's now returned.
65
- disp . UpdateSubConnState ( mockSubConn ( "default.addr" ), balancer.SubConnState {ConnectivityState : connectivity .Ready })
66
+ mockSubConn { Name : "default.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Ready })
66
67
67
68
result , err := disp .Pick (balancer.PickInfo {Ctx : ctx })
68
69
c .Check (err , gc .IsNil )
69
70
c .Check (result .Done , gc .IsNil )
70
- c .Check (result .SubConn , gc .Equals , mockSubConn ( "default.addr" ) )
71
+ c .Check (result .SubConn , gc .Equals , mockSubConn { Name : "default.addr" , disp : disp } )
71
72
72
73
// Case: Specific remote peer is dispatched to.
73
74
ctx = WithDispatchRoute (context .Background (),
74
75
buildRouteFixture (), ProcessSpec_ID {Zone : "remote" , Suffix : "primary" })
75
76
76
77
result , err = disp .Pick (balancer.PickInfo {Ctx : ctx })
77
78
c .Check (err , gc .Equals , balancer .ErrNoSubConnAvailable )
78
- c .Check (cc .created , gc .DeepEquals , []mockSubConn {"remote.addr" })
79
+ c .Check (cc .created , gc .DeepEquals , []mockSubConn {mockSubConn { Name : "remote.addr" , disp : disp } })
79
80
cc .created = nil
80
81
81
- disp . UpdateSubConnState ( mockSubConn ( "remote.addr" ), balancer.SubConnState {ConnectivityState : connectivity .Ready })
82
+ mockSubConn { Name : "remote.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Ready })
82
83
83
84
result , err = disp .Pick (balancer.PickInfo {Ctx : ctx })
84
85
c .Check (err , gc .IsNil )
85
86
c .Check (result .Done , gc .IsNil )
86
- c .Check (result .SubConn , gc .Equals , mockSubConn ( "remote.addr" ) )
87
+ c .Check (result .SubConn , gc .Equals , mockSubConn { Name : "remote.addr" , disp : disp } )
87
88
88
89
// Case: Route allows for multiple members. A local one is now dialed.
89
90
ctx = WithDispatchRoute (context .Background (), buildRouteFixture (), ProcessSpec_ID {})
90
91
91
92
_ , err = disp .Pick (balancer.PickInfo {Ctx : ctx })
92
93
c .Check (err , gc .Equals , balancer .ErrNoSubConnAvailable )
93
- c .Check (cc .created , gc .DeepEquals , []mockSubConn {"local.addr" })
94
+ c .Check (cc .created , gc .DeepEquals , []mockSubConn {mockSubConn { Name : "local.addr" , disp : disp } })
94
95
cc .created = nil
95
96
96
- disp . UpdateSubConnState ( mockSubConn ( "local.addr" ), balancer.SubConnState {ConnectivityState : connectivity .Ready })
97
+ mockSubConn { Name : "local.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Ready })
97
98
98
99
result , err = disp .Pick (balancer.PickInfo {Ctx : ctx })
99
100
c .Check (err , gc .IsNil )
100
101
c .Check (result .Done , gc .IsNil )
101
- c .Check (result .SubConn , gc .Equals , mockSubConn ( "local.addr" ) )
102
+ c .Check (result .SubConn , gc .Equals , mockSubConn { Name : "local.addr" , disp : disp } )
102
103
103
104
// Case: One local addr is marked as failed. Another is dialed.
104
- disp . UpdateSubConnState ( mockSubConn ( "local.addr" ), balancer.SubConnState {ConnectivityState : connectivity .TransientFailure })
105
+ mockSubConn { Name : "local.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .TransientFailure })
105
106
106
107
_ , err = disp .Pick (balancer.PickInfo {Ctx : ctx })
107
108
c .Check (err , gc .Equals , balancer .ErrNoSubConnAvailable )
108
- c .Check (cc .created , gc .DeepEquals , []mockSubConn {"local.otherAddr" })
109
+ c .Check (cc .created , gc .DeepEquals , []mockSubConn {mockSubConn { Name : "local.otherAddr" , disp : disp } })
109
110
cc .created = nil
110
111
111
- disp . UpdateSubConnState ( mockSubConn ( "local.otherAddr" ), balancer.SubConnState {ConnectivityState : connectivity .Ready })
112
+ mockSubConn { Name : "local.otherAddr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Ready })
112
113
113
114
result , err = disp .Pick (balancer.PickInfo {Ctx : ctx })
114
115
c .Check (err , gc .IsNil )
115
116
c .Check (result .Done , gc .IsNil )
116
- c .Check (result .SubConn , gc .Equals , mockSubConn ( "local.otherAddr" ) )
117
+ c .Check (result .SubConn , gc .Equals , mockSubConn { Name : "local.otherAddr" , disp : disp } )
117
118
118
119
// Case: otherAddr is also failed. Expect that an error is returned,
119
120
// rather than dispatch to remote addr. (Eg we prefer to wait for a
120
121
// local replica to recover or the route to change, vs using a remote
121
122
// endpoint which incurs more networking cost).
122
- disp . UpdateSubConnState ( mockSubConn ( "local.otherAddr" ), balancer.SubConnState {ConnectivityState : connectivity .TransientFailure })
123
+ mockSubConn { Name : "local.otherAddr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .TransientFailure })
123
124
124
125
_ , err = disp .Pick (balancer.PickInfo {Ctx : ctx })
125
126
c .Check (err , gc .Equals , balancer .ErrTransientFailure )
126
127
127
128
// Case: local.addr is Ready again. However, primary is required and has failed.
128
- disp . UpdateSubConnState ( mockSubConn ( "local.addr" ), balancer.SubConnState {ConnectivityState : connectivity .Ready })
129
- disp . UpdateSubConnState ( mockSubConn ( "remote.addr" ), balancer.SubConnState {ConnectivityState : connectivity .TransientFailure })
129
+ mockSubConn { Name : "local.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Ready })
130
+ mockSubConn { Name : "remote.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .TransientFailure })
130
131
131
132
ctx = WithDispatchRoute (context .Background (),
132
133
buildRouteFixture (), ProcessSpec_ID {Zone : "remote" , Suffix : "primary" })
@@ -150,7 +151,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
150
151
result , err = disp .Pick (balancer.PickInfo {Ctx : ctx })
151
152
c .Check (err , gc .IsNil )
152
153
c .Check (result .Done , gc .NotNil )
153
- c .Check (result .SubConn , gc .Equals , mockSubConn ( "local.addr" ) )
154
+ c .Check (result .SubConn , gc .Equals , mockSubConn { Name : "local.addr" , disp : disp } )
154
155
155
156
// Closure callback with an Unavailable error (only) will trigger an invalidation.
156
157
result .Done (balancer.DoneInfo {Err : nil })
@@ -164,6 +165,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
164
165
func (s * DispatcherSuite ) TestDispatchMarkAndSweep (c * gc.C ) {
165
166
var cc mockClientConn
166
167
var disp = dispatcherBuilder {zone : "local" }.Build (& cc , balancer.BuildOptions {}).(* dispatcher )
168
+ cc .disp = disp
167
169
defer disp .Close ()
168
170
169
171
var err error
@@ -177,11 +179,11 @@ func (s *DispatcherSuite) TestDispatchMarkAndSweep(c *gc.C) {
177
179
_ , err = disp .Pick (balancer.PickInfo {Ctx : localCtx })
178
180
c .Check (err , gc .Equals , balancer .ErrNoSubConnAvailable )
179
181
180
- c .Check (cc .created , gc .DeepEquals , []mockSubConn {"remote.addr" , "local.addr" })
182
+ c .Check (cc .created , gc .DeepEquals , []mockSubConn {mockSubConn { Name : "remote.addr" , disp : disp }, mockSubConn { Name : "local.addr" , disp : disp } })
181
183
cc .created = nil
182
184
183
- disp . UpdateSubConnState ( mockSubConn ( "remote.addr" ), balancer.SubConnState {ConnectivityState : connectivity .Ready })
184
- disp . UpdateSubConnState ( mockSubConn ( "local.addr" ), balancer.SubConnState {ConnectivityState : connectivity .Connecting })
185
+ mockSubConn { Name : "remote.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Ready })
186
+ mockSubConn { Name : "local.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Connecting })
185
187
186
188
disp .sweep ()
187
189
c .Check (cc .removed , gc .IsNil )
@@ -205,14 +207,14 @@ func (s *DispatcherSuite) TestDispatchMarkAndSweep(c *gc.C) {
205
207
206
208
// This time, expect that local.addr is swept.
207
209
disp .sweep ()
208
- c .Check (cc .removed , gc .DeepEquals , []mockSubConn {"local.addr" })
210
+ c .Check (cc .removed , gc .DeepEquals , []mockSubConn {mockSubConn { Name : "local.addr" , disp : disp } })
209
211
cc .removed = nil
210
- disp . UpdateSubConnState ( mockSubConn ( "local.addr" ), balancer.SubConnState {ConnectivityState : connectivity .Shutdown })
212
+ mockSubConn { Name : "local.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Shutdown })
211
213
212
214
disp .sweep () // Now remote.addr is swept.
213
- c .Check (cc .removed , gc .DeepEquals , []mockSubConn {"remote.addr" })
215
+ c .Check (cc .removed , gc .DeepEquals , []mockSubConn {mockSubConn { Name : "remote.addr" , disp : disp } })
214
216
cc .removed = nil
215
- disp . UpdateSubConnState ( mockSubConn ( "remote.addr" ), balancer.SubConnState {ConnectivityState : connectivity .Shutdown })
217
+ mockSubConn { Name : "remote.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Shutdown })
216
218
217
219
// No connections remain.
218
220
c .Check (disp .idConn , gc .HasLen , 0 )
@@ -223,10 +225,10 @@ func (s *DispatcherSuite) TestDispatchMarkAndSweep(c *gc.C) {
223
225
_ , err = disp .Pick (balancer.PickInfo {Ctx : localCtx })
224
226
c .Check (err , gc .Equals , balancer .ErrNoSubConnAvailable )
225
227
226
- c .Check (cc .created , gc .DeepEquals , []mockSubConn {"local.addr" })
228
+ c .Check (cc .created , gc .DeepEquals , []mockSubConn {mockSubConn { Name : "local.addr" , disp : disp } })
227
229
cc .created = nil
228
230
229
- disp . UpdateSubConnState ( mockSubConn ( "local.addr" ), balancer.SubConnState {ConnectivityState : connectivity .Ready })
231
+ mockSubConn { Name : "local.addr" , disp : disp }. UpdateState ( balancer.SubConnState {ConnectivityState : connectivity .Ready })
230
232
_ , err = disp .Pick (balancer.PickInfo {Ctx : localCtx })
231
233
c .Check (err , gc .IsNil )
232
234
}
@@ -235,31 +237,42 @@ type mockClientConn struct {
235
237
err error
236
238
created []mockSubConn
237
239
removed []mockSubConn
240
+ disp * dispatcher
238
241
}
239
242
240
- type mockSubConn string
243
+ type mockSubConn struct {
244
+ Name string
245
+ disp * dispatcher
246
+ }
247
+
248
+ func (s1 mockSubConn ) Equal (s2 mockSubConn ) bool {
249
+ return s1 .Name == s2 .Name
250
+ }
241
251
242
- func (s mockSubConn ) UpdateAddresses ([]resolver.Address ) {}
252
+ func (s mockSubConn ) UpdateAddresses ([]resolver.Address ) { panic ("deprecated" ) }
253
+ func (s mockSubConn ) UpdateState (state balancer.SubConnState ) { s .disp .updateSubConnState (s , state ) }
243
254
func (s mockSubConn ) Connect () {}
244
255
func (s mockSubConn ) GetOrBuildProducer (balancer.ProducerBuilder ) (balancer.Producer , func ()) {
245
256
return nil , func () {}
246
257
}
247
- func (s mockSubConn ) Shutdown () {}
258
+ func (s mockSubConn ) Shutdown () {
259
+ var c = s .disp .cc .(* mockClientConn )
260
+ c .removed = append (c .removed , s )
261
+ }
248
262
249
263
func (c * mockClientConn ) NewSubConn (a []resolver.Address , _ balancer.NewSubConnOptions ) (balancer.SubConn , error ) {
250
- var sc = mockSubConn ( a [0 ].Addr )
264
+ var sc = mockSubConn { Name : a [0 ].Addr , disp : c . disp }
251
265
c .created = append (c .created , sc )
252
266
return sc , c .err
253
267
}
254
268
255
- func (c * mockClientConn ) RemoveSubConn (sc balancer.SubConn ) {
256
- c .removed = append (c .removed , sc .(mockSubConn ))
257
- }
258
-
259
269
func (c * mockClientConn ) UpdateAddresses (balancer.SubConn , []resolver.Address ) {}
260
270
func (c * mockClientConn ) UpdateState (balancer.State ) {}
261
271
func (c * mockClientConn ) ResolveNow (resolver.ResolveNowOptions ) {}
262
272
func (c * mockClientConn ) Target () string { return "default.addr" }
273
+ func (c * mockClientConn ) RemoveSubConn (balancer.SubConn ) {
274
+ panic ("deprecated" )
275
+ }
263
276
264
277
type mockRouter struct { invalidated string }
265
278
0 commit comments