Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 6db5abf

Browse files
committed
Implement pinning core
1 parent 3f9c2fa commit 6db5abf

File tree

5 files changed

+840
-1
lines changed

5 files changed

+840
-1
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
"bs58": "^3.0.0",
6262
"debug": "^2.2.0",
6363
"detect-node": "^2.0.3",
64+
"fnv": "^0.1.3",
6465
"fs-blob-store": "^5.2.1",
6566
"glob": "^7.0.3",
6667
"hapi": "^13.4.1",
@@ -123,4 +124,4 @@
123124
"kumavis <[email protected]>",
124125
"nginnever <[email protected]>"
125126
]
126-
}
127+
}

src/core/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const repo = require('./ipfs/repo')
1717
const init = require('./ipfs/init')
1818
const bootstrap = require('./ipfs/bootstrap')
1919
const config = require('./ipfs/config')
20+
const pinner = require('./ipfs/pinner')
2021
const block = require('./ipfs/block')
2122
const object = require('./ipfs/object')
2223
const libp2p = require('./ipfs/libp2p')
@@ -53,6 +54,7 @@ function IPFS (repoInstance) {
5354
this.init = init(this)
5455
this.bootstrap = bootstrap(this)
5556
this.config = config(this)
57+
this.pinner = pinner(this)
5658
this.block = block(this)
5759
this.object = object(this)
5860
this.libp2p = libp2p(this)

src/core/ipfs/pinner-utils.js

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
'use strict'
2+
3+
const bs58 = require('bs58')
4+
const protobuf = require('protocol-buffers')
5+
const crypto = require('crypto')
6+
const fnv = require('fnv')
7+
const mDAG = require('ipfs-merkle-dag')
8+
const DAGNode = mDAG.DAGNode
9+
const DAGLink = mDAG.DAGLink
10+
const varint = require('varint')
11+
12+
const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n'
13+
const emptyKey = new Buffer(bs58.decode(emptyKeyHash))
14+
const defaultFanout = 256
15+
const maxItems = 8192
16+
17+
// Protobuf interface
18+
const pbSchema = (
19+
// from go-ipfs/pin/internal/pb/header.proto
20+
'message Set { ' +
21+
// 1 for now
22+
'optional uint32 version = 1; ' +
23+
// how many of the links are subtrees
24+
'optional uint32 fanout = 2; ' +
25+
// hash seed for subtree selection, a random number
26+
'optional fixed32 seed = 3; ' +
27+
'}'
28+
)
29+
const pb = protobuf(pbSchema)
30+
function readHeader (rootNode) {
31+
// rootNode.data should be a buffer of the format:
32+
// < varint(headerLength) | header | itemData... >
33+
var rootData = rootNode.data
34+
var hdrLength = varint.decode(rootData)
35+
var vBytes = varint.decode.bytes
36+
if (vBytes <= 0) {
37+
return { err: 'Invalid Set header length' }
38+
}
39+
if (vBytes + hdrLength > rootData.length) {
40+
return { err: 'Impossibly large set header length' }
41+
}
42+
var hdrSlice = rootData.slice(vBytes, hdrLength + vBytes)
43+
var header = pb.Set.decode(hdrSlice)
44+
if (header.version !== 1) {
45+
return { err: 'Unsupported Set version: ' + header.version }
46+
}
47+
if (header.fanout > rootNode.links.length) {
48+
return { err: 'Impossibly large fanout' }
49+
}
50+
return {
51+
header: header,
52+
data: rootData.slice(hdrLength + vBytes)
53+
}
54+
}
55+
56+
exports = module.exports = function (dagS) {
57+
var pinnerUtils = {
58+
// should this be part of `object` rather than `pinner`?
59+
hasChild: (root, childhash, callback, _links, _checked, _seen) => {
60+
// callback (err, has)
61+
if (callback.fired) { return }
62+
if (typeof childhash === 'object') {
63+
childhash = bs58.encode(childhash).toString()
64+
}
65+
_links = _links || root.links.length
66+
_checked = _checked || 0
67+
_seen = _seen || {}
68+
69+
if (!root.links.length && _links === _checked) {
70+
// all nodes have been checked
71+
return callback(null, false)
72+
}
73+
root.links.forEach((link) => {
74+
var bs58link = bs58.encode(link.hash).toString()
75+
if (bs58link === childhash) {
76+
callback.fired = true
77+
return callback(null, true)
78+
}
79+
dagS.get(link.hash, (err, obj) => {
80+
if (err) {
81+
callback.fired = true
82+
return callback(err)
83+
}
84+
// don't check the same links twice
85+
if (bs58link in _seen) { return }
86+
_seen[bs58link] = true
87+
88+
_checked++
89+
_links += obj.links.length
90+
pinnerUtils.hasChild(obj, childhash, callback, _links, _checked, _seen)
91+
})
92+
})
93+
},
94+
95+
storeSet: (keys, logInternalKey, callback) => {
96+
// callback (err, rootNode)
97+
var items = keys.map((key) => {
98+
return {
99+
key: key,
100+
data: null
101+
}
102+
})
103+
pinnerUtils.storeItems(items, logInternalKey, (err, rootNode) => {
104+
if (err) { return callback(err) }
105+
dagS.add(rootNode, (err) => {
106+
if (err) { return callback(err) }
107+
logInternalKey(rootNode.multihash())
108+
callback(null, rootNode)
109+
})
110+
})
111+
},
112+
113+
storeItems: (items, logInternalKey, callback, _subcalls, _done) => {
114+
// callback (err, rootNode)
115+
var seed = crypto.randomBytes(4).readUInt32LE(0, true)
116+
var pbHeader = pb.Set.encode({
117+
version: 1,
118+
fanout: defaultFanout,
119+
seed: seed
120+
})
121+
var rootData = Buffer.concat([
122+
new Buffer(varint.encode(pbHeader.length)), pbHeader
123+
])
124+
var rootLinks = []
125+
var i
126+
for (i = 0; i < defaultFanout; i++) {
127+
rootLinks.push(new DAGLink('', null, emptyKey))
128+
}
129+
logInternalKey(emptyKey)
130+
131+
if (items.length <= maxItems) {
132+
// the items will fit in a single root node
133+
var itemLinks = []
134+
var itemData = []
135+
var indices = []
136+
for (i = 0; i < items.length; i++) {
137+
itemLinks.push(new DAGLink('', null, items[i].key))
138+
itemData.push(items[i].data || new Buffer([]))
139+
indices.push(i)
140+
}
141+
indices.sort((a, b) => {
142+
var x = Buffer.compare(itemLinks[a].hash, itemLinks[b].hash)
143+
if (x) { return x }
144+
return (a < b ? -1 : 1)
145+
})
146+
var sortedLinks = indices.map((i) => { return itemLinks[i] })
147+
var sortedData = indices.map((i) => { return itemData[i] })
148+
rootLinks = rootLinks.concat(sortedLinks)
149+
rootData = Buffer.concat([rootData].concat(sortedData))
150+
readHeader(new DAGNode(rootData, rootLinks)) //
151+
return callback(null, new DAGNode(rootData, rootLinks))
152+
} else {
153+
// need to split up the items into multiple root nodes
154+
// (using go-ipfs "wasteful but simple" approach for consistency)
155+
_subcalls = _subcalls || 0
156+
_done = _done || 0
157+
var h
158+
var hashed = {}
159+
var hashFn = (seed, key) => {
160+
var buf = new Buffer(4)
161+
var h = new fnv.FNV()
162+
buf.writeUInt32LE(seed, 0)
163+
h.update(buf)
164+
h.update(bs58.encode(key).toString())
165+
return h.value()
166+
}
167+
// items will be distributed among `defaultFanout` bins
168+
for (i = 0; i < items.length; i++) {
169+
h = hashFn(seed, items[i].key) % defaultFanout
170+
hashed[h] = hashed[h] || []
171+
hashed[h].push(items[i])
172+
}
173+
var storeItemsCb = (err, child) => {
174+
if (callback.fired) { return }
175+
if (err) {
176+
callback.fired = true
177+
return callback(err)
178+
}
179+
dagS.add(child, (err) => {
180+
if (callback.fired) { return }
181+
if (err) {
182+
callback.fired = true
183+
return callback(err)
184+
}
185+
logInternalKey(child.multihash())
186+
rootLinks[this.h] = new DAGLink(
187+
'', child.size(), child.multihash()
188+
)
189+
_done++
190+
if (_done === _subcalls) {
191+
// all finished
192+
return callback(null, new DAGNode(rootData, rootLinks))
193+
}
194+
})
195+
}
196+
_subcalls += Object.keys(hashed).length
197+
for (h in hashed) {
198+
if (hashed.hasOwnProperty(h)) {
199+
pinnerUtils.storeItems(
200+
hashed[h],
201+
logInternalKey,
202+
storeItemsCb.bind({h: h}),
203+
_subcalls,
204+
_done
205+
)
206+
}
207+
}
208+
}
209+
},
210+
211+
loadSet: (rootNode, name, logInternalKey, callback) => {
212+
// callback (err, keys)
213+
var link = rootNode.links.filter((link) => {
214+
return link.name === name
215+
}).pop()
216+
if (!link) { return callback('No link found with name ' + name) }
217+
logInternalKey(link.hash)
218+
dagS.get(link.hash, (err, obj) => {
219+
if (err) { return callback(err) }
220+
var keys = []
221+
var walkerFn = (link) => {
222+
keys.push(link.hash)
223+
}
224+
pinnerUtils.walkItems(obj, walkerFn, logInternalKey, (err) => {
225+
if (err) { return callback(err) }
226+
return callback(null, keys)
227+
})
228+
})
229+
},
230+
231+
walkItems: (node, walkerFn, logInternalKey, callback) => {
232+
// callback (err)
233+
var h = readHeader(node)
234+
if (h.err) { return callback(h.err) }
235+
var fanout = h.header.fanout
236+
var subwalks = 0
237+
var finished = 0
238+
239+
var walkCb = (err) => {
240+
if (err) { return callback(err) }
241+
finished++
242+
if (subwalks === finished) {
243+
return callback()
244+
}
245+
}
246+
247+
for (var i = 0; i < node.links.length; i++) {
248+
var link = node.links[i]
249+
if (i >= fanout) {
250+
// item link
251+
walkerFn(link, i, h.data)
252+
} else {
253+
// fanout link
254+
logInternalKey(link.hash)
255+
if (!emptyKey.equals(link.hash)) {
256+
subwalks++
257+
dagS.get(link.hash, (err, obj) => {
258+
if (err) { return callback(err) }
259+
pinnerUtils.walkItems(obj, walkerFn, logInternalKey, walkCb)
260+
})
261+
}
262+
}
263+
}
264+
if (!subwalks) {
265+
return callback()
266+
}
267+
}
268+
}
269+
return pinnerUtils
270+
}

0 commit comments

Comments
 (0)