Consuming Ethereum Events To Kotlin Service Using Eventeum

It might be hard to roll your own implementation of a resilient web3 event consumer! You should if you have the time for it, but if you want something reliable and open source take a look at Eventeum

Getting Eventeum Up And Running With Our Events

First we need to clone the project: git clone

Then we need to edit the project's application.yml located in server/src/main/resources/application.yml -- For this we're going to use a real life example from my project that's in stealth mode for now. But here's the event: event TournamentFinalized(uint _tournamentId, address payable[] _winners, uint[] payouts);

For this event we need to add this to the application.yml:

- id: "tournamentFinalizedEvent"
   contractAddress: ${CONTRACT_ADDRESS:0x00000000000000000000000000000000000000000}
     eventName: TournamentFinalized
       - position: 0
         type: UINT256
       - position: 1
         type: ADDRESS[]
       - position: 2
         type: UINT256[]
     index: 0
   startBlock: 9482978

It's really important to note the startBlock, it should be set to the block of the contract's inception.

Now to get Eventeum started we need Kafka (and zookeeper), Mongodb, and an Ethereum node to listen on (for this I'll be using Infura. For Infura to work we need the websocket node and use the PUBSUB block strategy.)

So let's set these environment variables beforehand:

export SPRING_DATA_MONGODB_URI = mongodb://myUser:myPW@localhost:27017/myDB
export ETHEREUM_NODE_URL = wss://
export ZOOKEEPER_ADDRESS = <zookeeper-host:port>
export KAFKA_ADDRESSES=<kafka-host:port>

Now let's compile the project by running mvn clean package and then running it through java -jar ./server/target/eventeum-server.jar

Soon the events will start to get picked up and pushed to MongoDB and streamed to Kafka under the topic's name contract-events

Now let's create our consumer and make sense of this data using Kotlinx's Serialization library


class KafkaEventListenerService {
    protected val log = KotlinLogging.logger {}

    @KafkaListener(topics = ["contract-events"], groupId = "event-listener")
    fun newEventeumEvent(payload: String) {
        try {
            val event = EventeumPayload.fromJson(payload)
            when ( {
                "TournamentFinalized" -> tournamentFinalizedEvent(event) // A function to be implemented
        } catch (e: InstantiationError) {

Data model (Uses polymorphism to decode the incoming data)

import BigIntegerSerializer
import kotlinx.serialization.SerialName
import kotlinx.serialization.json.Json
import kotlinx.serialization.Serializable
import kotlinx.serialization.UnstableDefault
import kotlinx.serialization.json.JsonConfiguration
import java.math.BigInteger

data class EventeumPayload(
        val id: String,
        val type: String,
        val details: EventDetails,
        val retries: Int
) {
    companion object {
        fun fromJson(data: String): EventeumPayload {
            return Json(JsonConfiguration(strictMode = false)).parse(serializer(), data)

data class EventDetails(
        val name: String,
        val filterId: String,
        val nodeName: String,
        val indexedParameters: List<EventValueType>,
        val nonIndexedParameters: List<EventValueType>,
        val transactionHash: String,
        val logIndex: Int,
        val blockNumber: Int,
        val blockHash: String,
        val address: String,
        val status: String,
        val eventSpecificationSignature: String,
        val networkName: String,
        val id: String

sealed class EventValueType {

    data class EventValueTypeString(val value: String) : EventValueType()

    data class EventValueTypeAddress(private val value: String) : EventValueType() {
        val valueLower: String
            get() = value.toLowerCase()

    data class EventValueTypeInt(@Serializable(BigIntegerSerializer::class) val value: BigInteger) : EventValueType()

    data class EventValueTypeBool(val value: Int) : EventValueType() // Temporary fix until Eventeum supports true and false booleans

    data class EventValueTypeIntObject(val value: List<EventValueType>) : EventValueType()

    data class EventValueTypeAddressObject(val value: List<EventValueType>) : EventValueType()

Also the BigIntegers' serializer that's mentioned:

package com.hshar.daory.model.serializer

import kotlinx.serialization.*
import kotlinx.serialization.internal.StringDescriptor
import java.math.BigInteger

@Serializer(forClass = BigInteger::class)
class BigIntegerSerializer: KSerializer<BigInteger> {
    override val descriptor: SerialDescriptor =

    override fun serialize(encoder: Encoder, obj: BigInteger) {

    override fun deserialize(decoder: Decoder): BigInteger {
        return BigInteger(decoder.decodeString())

Now kick off your application and let the service do its job! Let me know if you have any questions, happy to help!