Skip to content

Commit de70694

Browse files
committed
Merge branch 'master' into release/v0.6.0
2 parents 90303a5 + 2d101ff commit de70694

File tree

11 files changed

+507
-333
lines changed

11 files changed

+507
-333
lines changed

responsemanager/allocator/allocator.go renamed to allocator/allocator.go

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,34 @@ type Allocator struct {
1212
totalMemoryMax uint64
1313
perPeerMax uint64
1414

15-
allocLk sync.Mutex
16-
total uint64
17-
nextAllocIndex uint64
18-
peerStatuses map[peer.ID]*peerStatus
19-
peerStatusQueue pq.PQ
15+
allocLk sync.RWMutex
16+
totalAllocatedAllPeers uint64
17+
nextAllocIndex uint64
18+
peerStatuses map[peer.ID]*peerStatus
19+
peerStatusQueue pq.PQ
2020
}
2121

2222
func NewAllocator(totalMemoryMax uint64, perPeerMax uint64) *Allocator {
2323
return &Allocator{
24-
totalMemoryMax: totalMemoryMax,
25-
perPeerMax: perPeerMax,
26-
total: 0,
27-
peerStatuses: make(map[peer.ID]*peerStatus),
28-
peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)),
24+
totalMemoryMax: totalMemoryMax,
25+
perPeerMax: perPeerMax,
26+
totalAllocatedAllPeers: 0,
27+
peerStatuses: make(map[peer.ID]*peerStatus),
28+
peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)),
2929
}
3030
}
3131

32+
func (a *Allocator) AllocatedForPeer(p peer.ID) uint64 {
33+
a.allocLk.RLock()
34+
defer a.allocLk.RUnlock()
35+
36+
status, ok := a.peerStatuses[p]
37+
if !ok {
38+
return 0
39+
}
40+
return status.totalAllocated
41+
}
42+
3243
func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error {
3344
responseChan := make(chan error, 1)
3445
a.allocLk.Lock()
@@ -44,8 +55,8 @@ func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error {
4455
a.peerStatuses[p] = status
4556
}
4657

47-
if (a.total+amount <= a.totalMemoryMax) && (status.totalAllocated+amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 {
48-
a.total += amount
58+
if (a.totalAllocatedAllPeers+amount <= a.totalMemoryMax) && (status.totalAllocated+amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 {
59+
a.totalAllocatedAllPeers += amount
4960
status.totalAllocated += amount
5061
responseChan <- nil
5162
} else {
@@ -65,8 +76,16 @@ func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error {
6576
if !ok {
6677
return errors.New("cannot deallocate from peer with no allocations")
6778
}
68-
status.totalAllocated -= amount
69-
a.total -= amount
79+
if status.totalAllocated > amount {
80+
status.totalAllocated -= amount
81+
} else {
82+
status.totalAllocated = 0
83+
}
84+
if a.totalAllocatedAllPeers > amount {
85+
a.totalAllocatedAllPeers -= amount
86+
} else {
87+
a.totalAllocatedAllPeers = 0
88+
}
7089
a.peerStatusQueue.Update(status.Index())
7190
a.processPendingAllocations()
7291
return nil
@@ -84,7 +103,7 @@ func (a *Allocator) ReleasePeerMemory(p peer.ID) error {
84103
for _, pendingAllocation := range status.pendingAllocations {
85104
pendingAllocation.response <- errors.New("Peer has been deallocated")
86105
}
87-
a.total -= status.totalAllocated
106+
a.totalAllocatedAllPeers -= status.totalAllocated
88107
a.processPendingAllocations()
89108
return nil
90109
}
@@ -111,13 +130,13 @@ func (a *Allocator) processPendingAllocations() {
111130

112131
func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool {
113132
pendingAllocation := nextPeer.pendingAllocations[0]
114-
if a.total+pendingAllocation.amount > a.totalMemoryMax {
133+
if a.totalAllocatedAllPeers+pendingAllocation.amount > a.totalMemoryMax {
115134
return false
116135
}
117136
if nextPeer.totalAllocated+pendingAllocation.amount > a.perPeerMax {
118137
return false
119138
}
120-
a.total += pendingAllocation.amount
139+
a.totalAllocatedAllPeers += pendingAllocation.amount
121140
nextPeer.totalAllocated += pendingAllocation.amount
122141
nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:]
123142
pendingAllocation.response <- nil

0 commit comments

Comments
 (0)