Skip to content

Commit af14f24

Browse files
committed
refactor: async/await
Still pending PeerId and PeerInfo async/await - they've been done but we can't use them until libp2p update libp2p-secio so use promisify in the interim where necessary, which is only in the tests.
1 parent 0e2eba8 commit af14f24

27 files changed

+415
-743
lines changed

package.json

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,44 +43,47 @@
4343
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
4444
"devDependencies": {
4545
"@nodeutils/defaults-deep": "^1.1.0",
46-
"aegir": "^18.2.1",
46+
"aegir": "^20.3.1",
47+
"async": "^2.6.1",
48+
"async-iterator-all": "^1.0.0",
4749
"benchmark": "^2.1.4",
4850
"chai": "^4.2.0",
4951
"dirty-chai": "^2.0.1",
50-
"ipfs-repo": "~0.26.3",
51-
"libp2p": "~0.24.2",
52-
"libp2p-kad-dht": "~0.15.0",
53-
"libp2p-mplex": "~0.8.4",
52+
"ipfs-repo": "^0.27.1",
53+
"libp2p": "^0.26.1",
54+
"libp2p-kad-dht": "^0.16.0",
55+
"libp2p-mplex": "^0.8.0",
5456
"libp2p-secio": "~0.11.1",
55-
"libp2p-tcp": "~0.13.0",
56-
"lodash": "^4.17.11",
57+
"libp2p-tcp": "^0.13.0",
58+
"lodash.difference": "^4.5.0",
59+
"lodash.flatten": "^4.4.0",
5760
"lodash.range": "^3.2.0",
5861
"lodash.without": "^4.4.0",
5962
"ncp": "^2.0.0",
63+
"p-event": "^4.1.0",
6064
"peer-book": "~0.9.0",
61-
"peer-id": "~0.12.0",
62-
"peer-info": "~0.15.0",
63-
"rimraf": "^2.6.2",
65+
"peer-id": "^0.12.2",
66+
"peer-info": "~0.15.1",
67+
"promisify-es6": "^1.0.3",
68+
"rimraf": "^3.0.0",
6469
"safe-buffer": "^5.1.2",
6570
"stats-lite": "^2.2.0",
6671
"uuid": "^3.3.2"
6772
},
6873
"dependencies": {
69-
"async": "^2.6.1",
70-
"bignumber.js": "^8.0.1",
74+
"bignumber.js": "^9.0.0",
7175
"cids": "~0.7.0",
7276
"debug": "^4.1.0",
7377
"ipfs-block": "~0.8.0",
7478
"just-debounce-it": "^1.1.0",
7579
"lodash.isequalwith": "^4.4.0",
7680
"moving-average": "^1.0.0",
7781
"multicodec": "~0.5.0",
78-
"multihashing-async": "~0.5.1",
79-
"promisify-es6": "^1.0.3",
82+
"multihashing-async": "^0.8.0",
8083
"protons": "^1.0.1",
8184
"pull-length-prefixed": "^1.3.1",
8285
"pull-stream": "^3.6.9",
83-
"typical": "^4.0.0",
86+
"typical": "^5.1.0",
8487
"varint-decoder": "~0.1.1"
8588
},
8689
"pre-push": [

src/decision-engine/index.js

Lines changed: 2 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ class DecisionEngine {
2626
this._outbox = debounce(this._processTasks.bind(this), 100)
2727
}
2828

29-
// _sendBlocks (peer, blocks, cb) {
3029
async _sendBlocks (peer, blocks) {
3130
// split into messages of max 512 * 1024 bytes
3231
const total = blocks.reduce((acc, b) => {
@@ -42,29 +41,6 @@ class DecisionEngine {
4241
let batch = []
4342
let outstanding = blocks.length
4443

45-
// eachSeries(blocks, (b, cb) => {
46-
// outstanding--
47-
// batch.push(b)
48-
// size += b.data.byteLength
49-
50-
// if (size >= MAX_MESSAGE_SIZE ||
51-
// // need to ensure the last remaining items get sent
52-
// outstanding === 0) {
53-
// size = 0
54-
// const nextBatch = batch.slice()
55-
// batch = []
56-
// this._sendSafeBlocks(peer, nextBatch, (err) => {
57-
// if (err) {
58-
// this._log('sendblock error: %s', err.message)
59-
// }
60-
// // not returning the error, so we send as much as we can
61-
// // as otherwise `eachSeries` would cancel
62-
// cb()
63-
// })
64-
// } else {
65-
// nextTick(cb)
66-
// }
67-
// }, cb)
6844
for (const b of blocks) {
6945
outstanding--
7046
batch.push(b)
@@ -86,7 +62,6 @@ class DecisionEngine {
8662
}
8763
}
8864

89-
// _sendSafeBlocks (peer, blocks, cb) {
9065
async _sendSafeBlocks (peer, blocks) {
9166
const msg = new Message(false)
9267
blocks.forEach((b) => msg.addBlock(b))
@@ -106,36 +81,6 @@ class DecisionEngine {
10681
const uniqCids = uniqWith((a, b) => a.equals(b), cids)
10782
const groupedTasks = groupBy(task => task.target.toB58String(), tasks)
10883

109-
// waterfall([
110-
// (callback) => map(uniqCids, (cid, cb) => {
111-
// this.blockstore.get(cid, cb)
112-
// }, callback),
113-
// (blocks, callback) => each(Object.values(groupedTasks), (tasks, cb) => {
114-
// // all tasks have the same target
115-
// const peer = tasks[0].target
116-
// const blockList = cids.map((cid) => {
117-
// return blocks.find(b => b.cid.equals(cid))
118-
// })
119-
120-
// this._sendBlocks(peer, blockList, (err) => {
121-
// if (err) {
122-
// // `_sendBlocks` actually doesn't return any errors
123-
// this._log.error('should never happen: ', err)
124-
// } else {
125-
// blockList.forEach((block) => this.messageSent(peer, block))
126-
// }
127-
128-
// cb()
129-
// })
130-
// }, callback)
131-
// ], (err) => {
132-
// this._tasks = []
133-
134-
// if (err) {
135-
// this._log.error(err)
136-
// }
137-
// })
138-
13984
const blocks = await Promise.all(uniqCids.map(cid => this.blockstore.get(cid)))
14085
await Object.values(groupedTasks).map(async (tasks) => {
14186
// all tasks in the group have the same target
@@ -206,25 +151,7 @@ class DecisionEngine {
206151
}
207152

208153
// Handle incoming messages
209-
// messageReceived (peerId, msg, cb) {
210154
async messageReceived (peerId, msg) {
211-
// const ledger = this._findOrCreate(peerId)
212-
213-
// if (msg.empty) {
214-
// return nextTick(cb)
215-
// }
216-
217-
// // If the message was a full wantlist clear the current one
218-
// if (msg.full) {
219-
// ledger.wantlist = new Wantlist()
220-
// }
221-
222-
// this._processBlocks(msg.blocks, ledger)
223-
224-
// if (msg.wantlist.size === 0) {
225-
// return nextTick(cb)
226-
// }
227-
228155
const ledger = this._findOrCreate(peerId)
229156

230157
if (msg.empty) {
@@ -242,8 +169,8 @@ class DecisionEngine {
242169
return
243170
}
244171

245-
let cancels = []
246-
let wants = []
172+
const cancels = []
173+
const wants = []
247174
msg.wantlist.forEach((entry) => {
248175
if (entry.cancel) {
249176
ledger.cancelWant(entry.cid)
@@ -268,26 +195,7 @@ class DecisionEngine {
268195
}, this._tasks, entries)
269196
}
270197

271-
// _addWants (ledger, peerId, entries, callback) {
272198
async _addWants (ledger, peerId, entries) {
273-
// each(entries, (entry, cb) => {
274-
// // If we already have the block, serve it
275-
// this.blockstore.has(entry.cid, (err, exists) => {
276-
// if (err) {
277-
// this._log.error('failed existence check')
278-
// } else if (exists) {
279-
// this._tasks.push({
280-
// entry: entry.entry,
281-
// target: peerId
282-
// })
283-
// }
284-
// cb()
285-
// })
286-
// }, () => {
287-
// this._outbox()
288-
// callback()
289-
// })
290-
291199
await Promise.all(entries.map(async (entry) => {
292200
// If we already have the block, serve it
293201
let exists

0 commit comments

Comments
 (0)