Skip to content

Commit 1df5b99

Browse files
authored
[ETCM-1053] extend PeerEventBus to support Akka Streams (#1087)
1 parent f2023d6 commit 1df5b99

File tree

8 files changed

+150
-24
lines changed

8 files changed

+150
-24
lines changed

src/main/scala/io/iohk/ethereum/network/Peer.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ package io.iohk.ethereum.network
22

33
import java.net.InetSocketAddress
44

5+
import akka.NotUsed
56
import akka.actor.ActorRef
7+
import akka.pattern.Patterns.ask
8+
import akka.stream.scaladsl.Source
69
import akka.util.ByteString
710

811
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId
12+
import io.iohk.ethereum.network.p2p.Message
913

1014
final case class PeerId(value: String) extends BlacklistId
1115

@@ -18,6 +22,7 @@ final case class Peer(
1822
remoteAddress: InetSocketAddress,
1923
ref: ActorRef,
2024
incomingConnection: Boolean,
25+
source: Source[Message, NotUsed] = Source.empty,
2126
nodeId: Option[ByteString] = None,
2227
createTimeMillis: Long = System.currentTimeMillis
2328
)

src/main/scala/io/iohk/ethereum/network/PeerActor.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ package io.iohk.ethereum.network
33
import java.net.InetSocketAddress
44
import java.net.URI
55

6+
import akka.NotUsed
67
import akka.actor.SupervisorStrategy.Escalate
78
import akka.actor._
9+
import akka.stream.OverflowStrategy
10+
import akka.stream.scaladsl.Source
811
import akka.util.ByteString
912

1013
import org.bouncycastle.util.encoders.Hex
@@ -21,6 +24,7 @@ import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeResult
2124
import io.iohk.ethereum.network.handshaker.Handshaker.NextMessage
2225
import io.iohk.ethereum.network.p2p._
2326
import io.iohk.ethereum.network.p2p.messages.Capability
27+
import io.iohk.ethereum.network.p2p.messages.Codes
2428
import io.iohk.ethereum.network.p2p.messages.WireProtocol._
2529
import io.iohk.ethereum.network.rlpx.AuthHandshaker
2630
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler
@@ -289,7 +293,17 @@ class PeerActor[R <: HandshakeResult](
289293
class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) {
290294

291295
val peerId: PeerId = PeerId(Hex.toHexString(remoteNodeId.toArray))
292-
val peer: Peer = Peer(peerId, peerAddress, self, incomingConnection, Some(remoteNodeId))
296+
val source: Source[Message, NotUsed] = PeerEventBusActor
297+
.messageSource(
298+
peerEventBus,
299+
PeerEventBusActor.SubscriptionClassifier
300+
.MessageClassifier(
301+
Set(Codes.BlockBodiesCode, Codes.BlockHeadersCode),
302+
PeerEventBusActor.PeerSelector.WithId(peerId)
303+
)
304+
)
305+
.map(_.message)
306+
val peer: Peer = Peer(peerId, peerAddress, self, incomingConnection, source, Some(remoteNodeId))
293307
peerEventBus ! Publish(PeerHandshakeSuccessful(peer, handshakeResult))
294308

295309
/** main behavior of actor that handles peer communication and subscriptions for messages

src/main/scala/io/iohk/ethereum/network/PeerEventBusActor.scala

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,54 @@
11
package io.iohk.ethereum.network
22

3+
import akka.NotUsed
34
import akka.actor.Actor
45
import akka.actor.ActorRef
56
import akka.actor.Props
7+
import akka.actor.Terminated
68
import akka.event.ActorEventBus
9+
import akka.stream.OverflowStrategy
10+
import akka.stream.WatchedActorTerminatedException
11+
import akka.stream.scaladsl.Source
12+
import akka.util.Timeout
13+
14+
import scala.concurrent.Future
715

816
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
917
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected
1018
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerHandshakeSuccessful
1119
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier._
1220
import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeResult
1321
import io.iohk.ethereum.network.p2p.Message
22+
import io.iohk.ethereum.network.p2p.messages.Codes
1423

1524
object PeerEventBusActor {
1625
def props: Props = Props(new PeerEventBusActor)
1726

27+
/** Handle subscription to the peer event bus via Akka Streams.
28+
*
29+
* @param peerEventBus ref to PeerEventBusActor
30+
* @param messageClassifier specify which messages to subscribe to
31+
* @return Source that subscribes to the peer event bus on materialization
32+
* and unsubscribes on cancellation. It will complete when the event bus
33+
* actor terminates.
34+
*
35+
* Note:
36+
* - subscription is asynchronous so it may miss messages when starting.
37+
* - it does not complete when a specified peerId disconnects.
38+
*/
39+
def messageSource(peerEventBus: ActorRef, messageClassifier: MessageClassifier): Source[MessageFromPeer, NotUsed] =
40+
Source
41+
.fromMaterializer { (mat, _) =>
42+
val (actorRef, src) = Source
43+
.actorRef[MessageFromPeer](PartialFunction.empty, PartialFunction.empty, 1, OverflowStrategy.fail)
44+
.watch(peerEventBus)
45+
.preMaterialize()(mat)
46+
peerEventBus
47+
.tell(Subscribe(messageClassifier), actorRef)
48+
src
49+
}
50+
.mapMaterializedValue(_ => NotUsed)
51+
1852
sealed trait PeerSelector {
1953
def contains(peerId: PeerId): Boolean
2054
}
@@ -28,7 +62,6 @@ object PeerEventBusActor {
2862
case class WithId(peerId: PeerId) extends PeerSelector {
2963
override def contains(p: PeerId): Boolean = p == peerId
3064
}
31-
3265
}
3366

3467
sealed trait SubscriptionClassifier
@@ -196,20 +229,28 @@ object PeerEventBusActor {
196229
case class Unsubscribe(from: Option[SubscriptionClassifier] = None)
197230

198231
case class Publish(ev: PeerEvent)
199-
200232
}
201233

202234
class PeerEventBusActor extends Actor {
203-
204235
import PeerEventBusActor._
205236

206237
val peerEventBus: PeerEventBus = new PeerEventBus
207238

208239
override def receive: Receive = {
209-
case Subscribe(to) => peerEventBus.subscribe(sender(), to)
210-
case Unsubscribe(Some(from)) => peerEventBus.unsubscribe(sender(), from)
211-
case Unsubscribe(None) => peerEventBus.unsubscribe(sender())
212-
case Publish(ev: PeerEvent) => peerEventBus.publish(ev)
213-
}
240+
case Subscribe(to) =>
241+
peerEventBus.subscribe(sender(), to)
242+
context.watch(sender())
243+
244+
case Unsubscribe(Some(from)) =>
245+
peerEventBus.unsubscribe(sender(), from)
246+
247+
case Unsubscribe(None) =>
248+
peerEventBus.unsubscribe(sender())
214249

250+
case Publish(ev: PeerEvent) =>
251+
peerEventBus.publish(ev)
252+
253+
case Terminated(ref) =>
254+
peerEventBus.unsubscribe(ref)
255+
}
215256
}

src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import java.util.Collections.newSetFromMap
66

77
import akka.actor.SupervisorStrategy.Stop
88
import akka.actor._
9+
import akka.stream.scaladsl.Source
910
import akka.util.ByteString
1011
import akka.util.Timeout
1112

@@ -323,9 +324,7 @@ class PeerManagerActor(
323324
PeerId.fromRef(ref),
324325
address,
325326
ref,
326-
incomingConnection,
327-
nodeId = None,
328-
createTimeMillis = System.currentTimeMillis
327+
incomingConnection
329328
)
330329

331330
val newConnectedPeers = connectedPeers.addNewPendingPeer(pendingPeer)

src/test/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverSpec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ class FastSyncBranchResolverSpec extends AnyWordSpec with Matchers with MockFact
181181
val blocksSavedInPeer: List[Block] =
182182
commonBlocks :++ BlockHelpers.generateChain(ourBestBlock + 1 - highestCommonBlock, commonBlocks.last)
183183

184-
val dummyPeer = Peer(PeerId("dummyPeer"), new InetSocketAddress("foo", 1), ActorRef.noSender, false, None, 0)
184+
val dummyPeer =
185+
Peer(PeerId("dummyPeer"), new InetSocketAddress("foo", 1), ActorRef.noSender, false, createTimeMillis = 0)
185186

186187
val initialSearchState = SearchState(1, 10, dummyPeer)
187188
val ours = blocksSaved.map(b => (b.number, b)).toMap
@@ -256,7 +257,8 @@ class FastSyncBranchResolverSpec extends AnyWordSpec with Matchers with MockFact
256257
val blocksSaved: List[Block] = BlockHelpers.generateChain(8, BlockHelpers.genesis)
257258
val blocksSavedInPeer: List[Block] = BlockHelpers.generateChain(8, BlockHelpers.genesis)
258259

259-
val dummyPeer = Peer(PeerId("dummyPeer"), new InetSocketAddress("foo", 1), ActorRef.noSender, false, None, 0)
260+
val dummyPeer =
261+
Peer(PeerId("dummyPeer"), new InetSocketAddress("foo", 1), ActorRef.noSender, false, createTimeMillis = 0)
260262

261263
val initialSearchState = SearchState(1, 8, dummyPeer)
262264
val ours = blocksSaved.map(b => (b.number, b)).toMap

src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,20 +328,20 @@ class EtcPeerManagerSpec extends AnyFlatSpec with Matchers {
328328

329329
val peer1Probe: TestProbe = TestProbe()
330330
val peer1: Peer =
331-
Peer(PeerId("peer1"), new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false, Some(fakeNodeId))
331+
Peer(PeerId("peer1"), new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false, nodeId = Some(fakeNodeId))
332332
val peer1Info: PeerInfo = initialPeerInfo.withForkAccepted(false)
333333
val peer1InfoETC64: PeerInfo = initialPeerInfoETC64.withForkAccepted(false)
334334
val peer2Probe: TestProbe = TestProbe()
335335
val peer2: Peer =
336-
Peer(PeerId("peer2"), new InetSocketAddress("127.0.0.1", 2), peer2Probe.ref, false, Some(fakeNodeId))
336+
Peer(PeerId("peer2"), new InetSocketAddress("127.0.0.1", 2), peer2Probe.ref, false, nodeId = Some(fakeNodeId))
337337
val peer2Info: PeerInfo = initialPeerInfo.withForkAccepted(false)
338338
val peer3Probe: TestProbe = TestProbe()
339339
val peer3: Peer =
340-
Peer(PeerId("peer3"), new InetSocketAddress("127.0.0.1", 3), peer3Probe.ref, false, Some(fakeNodeId))
340+
Peer(PeerId("peer3"), new InetSocketAddress("127.0.0.1", 3), peer3Probe.ref, false, nodeId = Some(fakeNodeId))
341341

342342
val freshPeerProbe: TestProbe = TestProbe()
343343
val freshPeer: Peer =
344-
Peer(PeerId(""), new InetSocketAddress("127.0.0.1", 4), freshPeerProbe.ref, false, Some(fakeNodeId))
344+
Peer(PeerId(""), new InetSocketAddress("127.0.0.1", 4), freshPeerProbe.ref, false, nodeId = Some(fakeNodeId))
345345
val freshPeerInfo: PeerInfo = initialPeerInfo.withForkAccepted(false)
346346

347347
val peerManager: TestProbe = TestProbe()

src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,22 @@ import java.net.InetSocketAddress
44

55
import akka.actor.ActorRef
66
import akka.actor.ActorSystem
7+
import akka.actor.PoisonPill
8+
import akka.stream.WatchedActorTerminatedException
9+
import akka.stream.scaladsl.Flow
10+
import akka.stream.scaladsl.Keep
11+
import akka.stream.scaladsl.Sink
12+
import akka.stream.scaladsl.Source
13+
import akka.testkit.TestActor
714
import akka.testkit.TestProbe
815
import akka.util.ByteString
916

17+
import org.scalatest.concurrent.ScalaFutures
1018
import org.scalatest.flatspec.AnyFlatSpec
1119
import org.scalatest.matchers.should.Matchers
1220

1321
import io.iohk.ethereum.Fixtures
22+
import io.iohk.ethereum.NormalPatience
1423
import io.iohk.ethereum.domain.ChainWeight
1524
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
1625
import io.iohk.ethereum.network.EtcPeerManagerActor.RemoteStatus
@@ -19,11 +28,12 @@ import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected
1928
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerHandshakeSuccessful
2029
import io.iohk.ethereum.network.PeerEventBusActor.PeerSelector
2130
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier._
31+
import io.iohk.ethereum.network.p2p.Message
2232
import io.iohk.ethereum.network.p2p.messages.Capability
2333
import io.iohk.ethereum.network.p2p.messages.WireProtocol.Ping
2434
import io.iohk.ethereum.network.p2p.messages.WireProtocol.Pong
2535

26-
class PeerEventBusActorSpec extends AnyFlatSpec with Matchers {
36+
class PeerEventBusActorSpec extends AnyFlatSpec with Matchers with ScalaFutures with NormalPatience {
2737

2838
"PeerEventBusActor" should "relay messages received to subscribers" in new TestSetup {
2939

@@ -32,6 +42,7 @@ class PeerEventBusActorSpec extends AnyFlatSpec with Matchers {
3242
val classifier1 = MessageClassifier(Set(Ping.code), PeerSelector.WithId(PeerId("1")))
3343
val classifier2 = MessageClassifier(Set(Ping.code), PeerSelector.AllPeers)
3444
peerEventBusActor.tell(PeerEventBusActor.Subscribe(classifier1), probe1.ref)
45+
3546
peerEventBusActor.tell(PeerEventBusActor.Subscribe(classifier2), probe2.ref)
3647

3748
val msgFromPeer = MessageFromPeer(Ping(), PeerId("1"))
@@ -46,6 +57,47 @@ class PeerEventBusActorSpec extends AnyFlatSpec with Matchers {
4657
peerEventBusActor ! PeerEventBusActor.Publish(msgFromPeer2)
4758
probe1.expectNoMessage()
4859
probe2.expectMsg(msgFromPeer2)
60+
61+
}
62+
63+
it should "relay messages via streams" in new TestSetup {
64+
val classifier1 = MessageClassifier(Set(Ping.code), PeerSelector.WithId(PeerId("1")))
65+
val classifier2 = MessageClassifier(Set(Ping.code), PeerSelector.AllPeers)
66+
67+
val peerEventBusProbe = TestProbe()(system)
68+
peerEventBusProbe.setAutoPilot { (sender: ActorRef, msg: Any) =>
69+
peerEventBusActor.tell(msg, sender)
70+
TestActor.KeepRunning
71+
}
72+
73+
val seqOnTermination = Flow[MessageFromPeer]
74+
.recoverWithRetries(1, { case _: WatchedActorTerminatedException => Source.empty })
75+
.toMat(Sink.seq)(Keep.right)
76+
77+
val stream1 = PeerEventBusActor.messageSource(peerEventBusProbe.ref, classifier1).runWith(seqOnTermination)
78+
val stream2 = PeerEventBusActor.messageSource(peerEventBusProbe.ref, classifier2).runWith(seqOnTermination)
79+
80+
// wait for subscriptions to be done
81+
peerEventBusProbe.expectMsgType[PeerEventBusActor.Subscribe]
82+
peerEventBusProbe.expectMsgType[PeerEventBusActor.Subscribe]
83+
84+
val syncProbe = TestProbe()(system)
85+
peerEventBusActor.tell(PeerEventBusActor.Subscribe(classifier2), syncProbe.ref)
86+
87+
val msgFromPeer = MessageFromPeer(Ping(), PeerId("1"))
88+
peerEventBusActor ! PeerEventBusActor.Publish(msgFromPeer)
89+
90+
val msgFromPeer2 = MessageFromPeer(Ping(), PeerId("99"))
91+
peerEventBusActor ! PeerEventBusActor.Publish(msgFromPeer2)
92+
93+
// wait for publications to be done
94+
syncProbe.expectMsg(msgFromPeer)
95+
syncProbe.expectMsg(msgFromPeer2)
96+
97+
peerEventBusProbe.ref ! PoisonPill
98+
99+
whenReady(stream1)(_ shouldEqual Seq(msgFromPeer))
100+
whenReady(stream2)(_ shouldEqual Seq(msgFromPeer, msgFromPeer2))
49101
}
50102

51103
it should "only relay matching message codes" in new TestSetup {
@@ -105,7 +157,13 @@ class PeerEventBusActorSpec extends AnyFlatSpec with Matchers {
105157
peerEventBusActor.tell(PeerEventBusActor.Subscribe(PeerHandshaked), probe2.ref)
106158

107159
val peerHandshaked =
108-
new Peer(PeerId("peer1"), new InetSocketAddress("127.0.0.1", 0), TestProbe().ref, false, Some(ByteString()))
160+
new Peer(
161+
PeerId("peer1"),
162+
new InetSocketAddress("127.0.0.1", 0),
163+
TestProbe().ref,
164+
false,
165+
nodeId = Some(ByteString())
166+
)
109167
val msgPeerHandshaked = PeerHandshakeSuccessful(peerHandshaked, initialPeerInfo)
110168
peerEventBusActor ! PeerEventBusActor.Publish(msgPeerHandshaked)
111169

src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ class PeerManagerSpec
195195

196196
// It should have created the next peer for the first incoming connection (probably using a synchronous test scheduler).
197197
val probe2: TestProbe = createdPeers(2).probe
198-
val peer = Peer(PeerId("peer"), incomingPeerAddress1, probe2.ref, incomingConnection = true, Some(incomingNodeId1))
198+
val peer =
199+
Peer(PeerId("peer"), incomingPeerAddress1, probe2.ref, incomingConnection = true, nodeId = Some(incomingNodeId1))
199200
probe2.expectMsg(PeerActor.HandleConnection(incomingConnection1.ref, incomingPeerAddress1))
200201
probe2.reply(PeerEvent.PeerHandshakeSuccessful(peer, initialPeerInfo))
201202

@@ -213,7 +214,13 @@ class PeerManagerSpec
213214
val probe3: TestProbe = createdPeers(3).probe
214215

215216
val secondPeer =
216-
Peer(PeerId("secondPeer"), incomingPeerAddress2, probe3.ref, incomingConnection = true, Some(incomingNodeId2))
217+
Peer(
218+
PeerId("secondPeer"),
219+
incomingPeerAddress2,
220+
probe3.ref,
221+
incomingConnection = true,
222+
nodeId = Some(incomingNodeId2)
223+
)
217224

218225
probe3.expectMsg(PeerActor.HandleConnection(incomingConnection2.ref, incomingPeerAddress2))
219226
probe3.reply(PeerEvent.PeerHandshakeSuccessful(secondPeer, initialPeerInfo))
@@ -287,7 +294,7 @@ class PeerManagerSpec
287294
peerAsIncomingAddress,
288295
peerAsIncomingProbe.ref,
289296
incomingConnection = true,
290-
Some(nodeId)
297+
nodeId = Some(nodeId)
291298
)
292299

293300
peerAsIncomingProbe.expectMsg(
@@ -322,7 +329,7 @@ class PeerManagerSpec
322329
peerAsIncomingAddress,
323330
peerAsIncomingProbe.ref,
324331
incomingConnection = true,
325-
Some(nodeId)
332+
nodeId = Some(nodeId)
326333
)
327334

328335
peerAsIncomingProbe.expectMsg(

0 commit comments

Comments
 (0)