Skip to content

Commit f5a323b

Browse files
Ensure that two functions added for the same work key get run
Only delete the key if the function we executed is the last function.
1 parent d3a7139 commit f5a323b

File tree

1 file changed

+20
-11
lines changed

1 file changed

+20
-11
lines changed

pkg/util/writerlease/writerlease.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ const (
6666

6767
var nowFn = time.Now
6868

69+
type work struct {
70+
id int
71+
fn WorkFunc
72+
}
73+
6974
type WriterLease struct {
7075
name string
7176
backoff wait.Backoff
@@ -74,7 +79,8 @@ type WriterLease struct {
7479
once chan struct{}
7580

7681
lock sync.Mutex
77-
queued map[string]WorkFunc
82+
id int
83+
queued map[string]*work
7884
queue workqueue.DelayingInterface
7985
state State
8086
expires time.Time
@@ -95,7 +101,7 @@ func New(leaseDuration, retryInterval time.Duration) *WriterLease {
95101
maxBackoff: leaseDuration,
96102
retryInterval: retryInterval,
97103

98-
queued: make(map[string]WorkFunc),
104+
queued: make(map[string]*work),
99105
queue: workqueue.NewDelayingQueue(),
100106
once: make(chan struct{}),
101107
}
@@ -110,7 +116,7 @@ func NewWithBackoff(name string, leaseDuration, retryInterval time.Duration, bac
110116
maxBackoff: leaseDuration,
111117
retryInterval: retryInterval,
112118

113-
queued: make(map[string]WorkFunc),
119+
queued: make(map[string]*work),
114120
queue: workqueue.NewNamedDelayingQueue(name),
115121
once: make(chan struct{}),
116122
}
@@ -155,7 +161,8 @@ func (l *WriterLease) WaitUntil(t time.Duration) (bool, bool) {
155161
func (l *WriterLease) Try(key string, fn WorkFunc) {
156162
l.lock.Lock()
157163
defer l.lock.Unlock()
158-
l.queued[key] = fn
164+
l.id++
165+
l.queued[key] = &work{fn: fn, id: l.id}
159166
if l.state == Follower {
160167
delay := l.expires.Sub(nowFn())
161168
// no matter what, always wait at least some amount of time as a follower to give the nominal
@@ -196,7 +203,7 @@ func (l *WriterLease) Remove(key string) {
196203
delete(l.queued, key)
197204
}
198205

199-
func (l *WriterLease) get(key string) WorkFunc {
206+
func (l *WriterLease) get(key string) *work {
200207
l.lock.Lock()
201208
defer l.lock.Unlock()
202209
return l.queued[key]
@@ -215,8 +222,8 @@ func (l *WriterLease) work() bool {
215222
}
216223
key := item.(string)
217224

218-
fn := l.get(key)
219-
if fn == nil {
225+
work := l.get(key)
226+
if work == nil {
220227
glog.V(4).Infof("[%s] Work item %s was cleared, done", l.name, key)
221228
l.queue.Done(key)
222229
return true
@@ -236,7 +243,7 @@ func (l *WriterLease) work() bool {
236243
glog.V(4).Infof("[%s] Lease owner or electing, running %s", l.name, key)
237244
}
238245

239-
isLeader, retry := fn()
246+
isLeader, retry := work.fn()
240247
if retry {
241248
// come back in a bit
242249
glog.V(4).Infof("[%s] Retrying %s", l.name, key)
@@ -245,11 +252,11 @@ func (l *WriterLease) work() bool {
245252
return true
246253
}
247254

248-
l.finishKey(key, isLeader)
255+
l.finishKey(key, isLeader, work.id)
249256
return true
250257
}
251258

252-
func (l *WriterLease) finishKey(key string, isLeader bool) {
259+
func (l *WriterLease) finishKey(key string, isLeader bool, id int) {
253260
l.lock.Lock()
254261
defer l.lock.Unlock()
255262

@@ -271,7 +278,9 @@ func (l *WriterLease) finishKey(key string, isLeader bool) {
271278
}
272279
l.expires = nowFn().Add(l.nextBackoff())
273280
}
274-
delete(l.queued, key)
281+
if l.queued[key].id == id {
282+
delete(l.queued, key)
283+
}
275284
// close the channel before we remove the key from the queue to prevent races in Wait
276285
if resolvedElection {
277286
close(l.once)

0 commit comments

Comments
 (0)