Stream data from two sources where one is eventually consistent and the other one loses its tail
This library provides the ability to use Kafka as storage for events.
Kafka is a perfect fit in case you want to have streaming capabilities for your events.
However, kafka-journal also uses Cassandra to keep performance of data access at acceptable level
and overcome Kafka's retention policy
.
Cassandra is a default choice, but you may use any other storage which satisfies the following interfaces:
- Read side, called within client library EventualJournal
- Write side, called from replicator app ReplicatedJournal
- Journal client publishes events to Kafka
- Replicator app stores events in Cassandra
- Client publishes special marker
Mark
to Kafka, so we can make sure there are no more events to expect - Client reads events from Cassandra, however, at this point we are not yet sure that all events are replicated from Kafka to Cassandra
- Client reads events from Kafka using offset of last event found in Cassandra
- We consider recovery finished when the marker
Mark
is read from Kafka
- one Kafka topic may be used for many different entities
- not all events must be on Kafka, as long as they are stored in Cassandra
- there is no snapshot support yet
Replicator
is a separate application running concurrently with Journal clients- it is relatively easy to replace Cassandra with some other relational database
Performance of reading events depends on finding the closest offset to the marker and on replication latency (time difference between the moment event has been published on Kafka and the moment when it becomes available from Cassandra).
The same Kafka consumer may be shared for many simultaneous recoveries.
- Client is allowed to
read
+write
Kafka andread
Cassandra - Replicator is allowed to
read
Kafka andread
+write
Cassandra
Hence, we recommend configuring access rights accordingly.
Kafka client tends to log some exceptions at error
level, however in reality those are harmless in case if operation
retried successfully. Retriable exceptions usually extend RetriableException.
List of known "error" cases which may be ignored:
- Offset commit failed on partition .. at offset ..: The request timed out.
- Offset commit failed on partition .. at offset ..: The coordinator is loading and hence can't process requests.
- Offset commit failed on partition .. at offset ..: This is not the correct coordinator.
- Offset commit failed on partition .. at offset ..: This server does not host this topic-partition.
In order to use kafka-journal
as akka persistence plugin
have to add following to *.conf
file:
akka.persistence.journal.plugin = "evolutiongaming.kafka-journal.persistence.journal"
Unfortunately akka persistence's snapshot
plugin is not implemented yet.
In order to use kafka-journal
as pekko persistence plugin
have to add following to *.conf
file:
pekko.persistence.journal.plugin = "evolutiongaming.kafka-journal.persistence.journal"
Unfortunately pekko persistence's snapshot
plugin is not implemented yet.
addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")
libraryDependencies += "com.evolution" %% "kafka-journal" % "5.0.0"
// for akka persistence
//libraryDependencies += "com.evolution" %% "kafka-journal-akka-persistence" % "5.0.0"
// for pekko persistence
libraryDependencies += "com.evolution" %% "kafka-journal-pekko-persistence" % "5.0.0"
libraryDependencies += "com.evolution" %% "kafka-journal-replicator" % "5.0.0"
libraryDependencies += "com.evolution" %% "kafka-journal-eventual-cassandra" % "5.0.0"
In 5.0.0
we adapted Evolution's new organization name and changed packages.
To migrate code from 4.3.1 to 5.0.0:
- change organization in
libraryDependencies
fromcom.evolutiongaming
tocom.evolution
- if used, update artefact names from
kafka-journal-persistence*
tokafka-journal-akka-persistence*
orkafka-journal-pekko-persistence*
- change imports:
- from
import com.evolutiongaming.kafka.journal
toimport com.evolution.kafka.journal
- from
import akka.persistence.kafka.journal
toimport com.evolution.kafka.journal.akka.persistence
- from
- tech-related extensions were moved from
core
to corresponding tech modules, thus extra imports are required, like:com.evolution.kafka.journal.akka.OriginExtension
orcom.evolution.kafka.journal.pekko.OriginExtension
- all Cassandra
encode*
anddecode*
extensions were moved tokafka-journal-cassandra
module
- Jan 2019 Riga Scala Community
- Apr 2019 Amsterdam.scala
To run unit-test, have to have Docker environment running (Docker Desktop, Rancher Desktop etc.). Some tests expect
to have /var/run/docker.sock
available. In the case of Rancher Desktop, one might need to amend local setup with:
sudo ln -s $HOME/.rd/docker.sock /var/run/docker.sock
Developer's Must Read