Skip to content

Commit 506636a

Browse files
committed
socket: separate read/write network timeouts
Splits DialInfo.Timeout (defaults to 60s when using mgo.Dial()) into ReadTimeout and WriteTimeout to address #160. Read/write timeout defaults to DialInfo.Timeout to preserve existing behaviour.
1 parent 9a3d363 commit 506636a

File tree

5 files changed

+226
-70
lines changed

5 files changed

+226
-70
lines changed

cluster.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,17 @@ type isMasterResult struct {
146146
}
147147

148148
func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error {
149+
// I'm not really sure why the timeout was hard coded to these values (I
150+
// assume because everything is passed as a func arg, and thus this info is
151+
// unavailable here), but leaving them as is for backwards compatibility.
152+
config := &DialInfo{
153+
Timeout: 10 * time.Second,
154+
ReadTimeout: 10 * time.Second,
155+
WriteTimeout: 10 * time.Second,
156+
}
157+
149158
// Monotonic let's it talk to a slave and still hold the socket.
150-
session := newSession(Monotonic, cluster, 10*time.Second)
159+
session := newSession(Monotonic, cluster, config)
151160
session.setSocket(socket)
152161

153162
var cmd = bson.D{{Name: "isMaster", Value: 1}}
@@ -624,9 +633,7 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout
624633
// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is
625634
// true, it will attempt to return a socket to a slave server. If it is
626635
// false, the socket will necessarily be to a master server.
627-
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
628-
mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration,
629-
) (s *mongoSocket, err error) {
636+
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(mode Mode, slaveOk bool, syncTimeout time.Duration, serverTags []bson.D, info *DialInfo) (s *mongoSocket, err error) {
630637
var started time.Time
631638
var syncCount uint
632639
for {
@@ -670,7 +677,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
670677
continue
671678
}
672679

673-
s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout)
680+
s, abended, err := server.AcquireSocketWithBlocking(info)
674681
if err == errPoolTimeout {
675682
// No need to remove servers from the topology if acquiring a socket fails for this reason.
676683
return nil, err

server.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -124,29 +124,31 @@ var errServerClosed = errors.New("server was closed")
124124
// use in this server is greater than the provided limit, errPoolLimit is
125125
// returned.
126126
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
127-
return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond)
127+
info := &DialInfo{
128+
PoolLimit: poolLimit,
129+
ReadTimeout: timeout,
130+
WriteTimeout: timeout,
131+
Timeout: timeout,
132+
}
133+
return server.acquireSocketInternal(info, false)
128134
}
129135

130136
// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_
131137
// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout
132138
// should elapse before a socket is available, it will return errPoolTimeout.
133-
func (server *mongoServer) AcquireSocketWithBlocking(
134-
poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration,
135-
) (socket *mongoSocket, abended bool, err error) {
136-
return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout)
139+
func (server *mongoServer) AcquireSocketWithBlocking(info *DialInfo) (socket *mongoSocket, abended bool, err error) {
140+
return server.acquireSocketInternal(info, true)
137141
}
138142

139-
func (server *mongoServer) acquireSocketInternal(
140-
poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration,
141-
) (socket *mongoSocket, abended bool, err error) {
143+
func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock bool) (socket *mongoSocket, abended bool, err error) {
142144
for {
143145
server.Lock()
144146
abended = server.abended
145147
if server.closed {
146148
server.Unlock()
147149
return nil, abended, errServerClosed
148150
}
149-
if poolLimit > 0 {
151+
if info.poolLimit() > 0 {
150152
if shouldBlock {
151153
// Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout
152154
// with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout,
@@ -158,11 +160,11 @@ func (server *mongoServer) acquireSocketInternal(
158160
// https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine.
159161
waitDone := make(chan struct{})
160162
timeoutHit := false
161-
if poolTimeout > 0 {
163+
if info.PoolTimeout > 0 {
162164
go func() {
163165
select {
164166
case <-waitDone:
165-
case <-time.After(poolTimeout):
167+
case <-time.After(info.PoolTimeout):
166168
// timeoutHit is part of the wait condition, so needs to be changed under mutex.
167169
server.Lock()
168170
defer server.Unlock()
@@ -172,7 +174,7 @@ func (server *mongoServer) acquireSocketInternal(
172174
}()
173175
}
174176
timeSpentWaiting := time.Duration(0)
175-
for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit {
177+
for len(server.liveSockets)-len(server.unusedSockets) >= info.poolLimit() && !timeoutHit {
176178
// We only count time spent in Wait(), and not time evaluating the entire loop,
177179
// so that in the happy non-blocking path where the condition above evaluates true
178180
// first time, we record a nice round zero wait time.
@@ -191,7 +193,7 @@ func (server *mongoServer) acquireSocketInternal(
191193
// Record that we fetched a connection of of a socket list and how long we spent waiting
192194
stats.noticeSocketAcquisition(timeSpentWaiting)
193195
} else {
194-
if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit {
196+
if len(server.liveSockets)-len(server.unusedSockets) >= info.poolLimit() {
195197
server.Unlock()
196198
return nil, false, errPoolLimit
197199
}
@@ -202,15 +204,15 @@ func (server *mongoServer) acquireSocketInternal(
202204
socket = server.unusedSockets[n-1]
203205
server.unusedSockets[n-1] = nil // Help GC.
204206
server.unusedSockets = server.unusedSockets[:n-1]
205-
info := server.info
207+
serverInfo := server.info
206208
server.Unlock()
207-
err = socket.InitialAcquire(info, timeout)
209+
err = socket.InitialAcquire(serverInfo, info)
208210
if err != nil {
209211
continue
210212
}
211213
} else {
212214
server.Unlock()
213-
socket, err = server.Connect(timeout)
215+
socket, err = server.Connect(info)
214216
if err == nil {
215217
server.Lock()
216218
// We've waited for the Connect, see if we got
@@ -231,20 +233,18 @@ func (server *mongoServer) acquireSocketInternal(
231233

232234
// Connect establishes a new connection to the server. This should
233235
// generally be done through server.AcquireSocket().
234-
func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error) {
236+
func (server *mongoServer) Connect(info *DialInfo) (*mongoSocket, error) {
235237
server.RLock()
236238
master := server.info.Master
237239
dial := server.dial
238240
server.RUnlock()
239241

240-
logf("Establishing new connection to %s (timeout=%s)...", server.Addr, timeout)
242+
logf("Establishing new connection to %s (timeout=%s)...", server.Addr, info.Timeout)
241243
var conn net.Conn
242244
var err error
243245
switch {
244246
case !dial.isSet():
245-
// Cannot do this because it lacks timeout support. :-(
246-
//conn, err = net.DialTCP("tcp", nil, server.tcpaddr)
247-
conn, err = net.DialTimeout("tcp", server.ResolvedAddr, timeout)
247+
conn, err = net.DialTimeout("tcp", server.ResolvedAddr, info.Timeout)
248248
if tcpconn, ok := conn.(*net.TCPConn); ok {
249249
tcpconn.SetKeepAlive(true)
250250
} else if err == nil {
@@ -264,7 +264,7 @@ func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error)
264264
logf("Connection to %s established.", server.Addr)
265265

266266
stats.conn(+1, master)
267-
return newSocket(server, conn, timeout), nil
267+
return newSocket(server, conn, info), nil
268268
}
269269

270270
// Close forces closing all sockets that are alive, whether

0 commit comments

Comments
 (0)