hshar

5 min read - Posted 17 Feb 20

Consuming Ethereum Events To Kotlin Service Using Eventeum

It might be hard to roll your own implementation of a resilient web3 event consumer! You should if you have the time for it, but if you want something reliable and open source take a look at Eventeum

Getting Eventeum Up And Running With Our Events

First we need to clone the project: git clone https://github.com/ConsenSys/eventeum.git

Then we need to edit the project's application.yml located in server/src/main/resources/application.yml -- For this we're going to use a real life example from my project that's in stealth mode for now. But here's the event: event TournamentFinalized(uint _tournamentId, address payable[] _winners, uint[] payouts);

For this event we need to add this to the application.yml:

- id: "tournamentFinalizedEvent"
   contractAddress: ${CONTRACT_ADDRESS:0x00000000000000000000000000000000000000000}
   eventSpecification:
     eventName: TournamentFinalized
     nonIndexedParameterDefinitions:
       - position: 0
         type: UINT256
       - position: 1
         type: ADDRESS[]
       - position: 2
         type: UINT256[]
   correlationId:
     type: NON_INDEXED_PARAMETER
     index: 0
   startBlock: 9482978

It's really important to note the startBlock, it should be set to the block of the contract's inception.

Now to get Eventeum started we need Kafka (and zookeeper), Mongodb, and an Ethereum node to listen on (for this I'll be using Infura. For Infura to work we need the websocket node and use the PUBSUB block strategy.)

So let's set these environment variables beforehand:

export SPRING_DATA_MONGODB_URI = mongodb://myUser:myPW@localhost:27017/myDB
export ETHEREUM_NODE_URL = wss://rinkeby.infura.io/ws/v3/MY_KEY
export ZOOKEEPER_ADDRESS = <zookeeper-host:port>
export KAFKA_ADDRESSES=<kafka-host:port>
export ETHEREUM_BLOCKSTRATEGY = PUBSUB

Now let's compile the project by running mvn clean package and then running it through java -jar ./server/target/eventeum-server.jar

Soon the events will start to get picked up and pushed to MongoDB and streamed to Kafka under the topic's name contract-events

Now let's create our consumer and make sense of this data using Kotlinx's Serialization library

Consumer:

@Service
class KafkaEventListenerService {
    protected val log = KotlinLogging.logger {}

    @KafkaListener(topics = ["contract-events"], groupId = "event-listener")
    fun newEventeumEvent(payload: String) {
        log.info(payload)
        try {
            val event = EventeumPayload.fromJson(payload)
            when (event.details.name) {
                "TournamentFinalized" -> tournamentFinalizedEvent(event) // A function to be implemented
            }
        } catch (e: InstantiationError) {
            log.error(e.localizedMessage)
        }
    }
}

Data model (Uses polymorphism to decode the incoming data)

import BigIntegerSerializer
import kotlinx.serialization.SerialName
import kotlinx.serialization.json.Json
import kotlinx.serialization.Serializable
import kotlinx.serialization.UnstableDefault
import kotlinx.serialization.json.JsonConfiguration
import java.math.BigInteger

@Serializable
data class EventeumPayload(
        val id: String,
        val type: String,
        val details: EventDetails,
        val retries: Int
) {
    companion object {
        @UnstableDefault
        fun fromJson(data: String): EventeumPayload {
            return Json(JsonConfiguration(strictMode = false)).parse(serializer(), data)
        }
    }
}

@Serializable
data class EventDetails(
        val name: String,
        val filterId: String,
        val nodeName: String,
        val indexedParameters: List<EventValueType>,
        val nonIndexedParameters: List<EventValueType>,
        val transactionHash: String,
        val logIndex: Int,
        val blockNumber: Int,
        val blockHash: String,
        val address: String,
        val status: String,
        val eventSpecificationSignature: String,
        val networkName: String,
        val id: String
)

@Serializable
sealed class EventValueType {

    @Serializable
    @SerialName("string")
    data class EventValueTypeString(val value: String) : EventValueType()

    @Serializable
    @SerialName("address")
    data class EventValueTypeAddress(private val value: String) : EventValueType() {
        val valueLower: String
            get() = value.toLowerCase()
    }

    @Serializable
    @SerialName("uint256")
    data class EventValueTypeInt(@Serializable(BigIntegerSerializer::class) val value: BigInteger) : EventValueType()

    @Serializable
    @SerialName("bool")
    data class EventValueTypeBool(val value: Int) : EventValueType() // Temporary fix until Eventeum supports true and false booleans

    @Serializable
    @SerialName("uint256[]")
    data class EventValueTypeIntObject(val value: List<EventValueType>) : EventValueType()

    @Serializable
    @SerialName("address[]")
    data class EventValueTypeAddressObject(val value: List<EventValueType>) : EventValueType()
}

Also the BigIntegers' serializer that's mentioned:

package com.hshar.daory.model.serializer

import kotlinx.serialization.*
import kotlinx.serialization.internal.StringDescriptor
import java.math.BigInteger

@Serializer(forClass = BigInteger::class)
class BigIntegerSerializer: KSerializer<BigInteger> {
    override val descriptor: SerialDescriptor =
            StringDescriptor.withName("WithCustomDefault")

    override fun serialize(encoder: Encoder, obj: BigInteger) {
        encoder.encodeString(obj.toString())
    }

    override fun deserialize(decoder: Decoder): BigInteger {
        return BigInteger(decoder.decodeString())
    }
}

Now kick off your application and let the service do its job! Let me know if you have any questions, happy to help!

Created with Sketch.Content is"CC-BY-SA 4.0" licensed
Article On-chain
Article Author

Hayder Sharhan

Software Engineer

4

1

0

0 Comments
Related Articles
Using Eventeum to Build a Java Smart Contract Data Cache

In this tutorial, I am going to walk you through how to build a service that caches data emitted via smart contract events, so that this data can be consumed by other services in your system. Why would I want to do this? It all boils down to usability of your dApp. Web3 applications with no middleware layer at all generally do not provide the kind of user experience that end users are familiar with in the traditional web2 world. Operations such as complex searches across data stored within smart

Listening to Ethereum Events with Eventeum

What is Eventeum? Eventeum is an Ethereum event listener service that provides a bridge between your smart contracts and middleware layer. Events subscriptions can be registered dynamically, and on emission, a message containing the details of the event are broadcast onto a message bus (currently either Kafka or RabbitMQ) which can then be consumed by your backend services. It was developed by the kauri.io team, and the source code is freely available under the Apache 2.0 license. Click here to