We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register
to publish this job!

Login or register
to save this job!

Login or register
to save interesting jobs!

Login or register
to get access to all your job applications!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel 🚀.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through WorksHub average a 15% increase in salary 🚀

Blog hero image

An intro to writing native DB driver in Scala

Marek Kadek 1 April, 2021 | 18 min read

Most developers will sooner or later need to interact with some database from within the application. We then reach for an established Object Relational mapping or Functional Relational Mapping library of our choice.

Those are frameworks and libraries that help us interact with the database - handle mapping between objects and database’s tables (or graphs, depending on your database), help us with query constructions and so much more.

One such example is (one that many object-oriented programmers are familiar with) a Hibernate. Functional programmers might reach for Slick or Quill.

These usually depend on another layer - some driver - API, which is often provided by the database vendor. For example, for Postgres you could use its JDBC implementation. These drivers are fairly low level, and map capabilities of the database to the interface which is exposed to programmers. They also handle communication protocol - for example when you want to query the database “give me all users”, you are not interested in specific byte encoding of such query.

In this article, we’ll write a part of a simple driver that communicates with Cassandra, using non-blocking, asynchronous purely functional code, with the help of ZIO.

The bulk of the article will be about defining building blocks that we can use to build a full driver. We'll build it up from the bottom-up, starting from defining our payloads and serialization, followed by sockets to send it across the wire. We'll finish with an actual example that sends two real messages and receives responses.

A reasonable question is, why would you ever write such a database driver?

  • you work for a database vendor (that is about to release a new fantastic database to the world) and need a driver :-)
  • the existing driver doesn’t meet quality standards your codebase enforces - i.e. random exceptions, blocking operations all over the place, disregarding thread proper management, public mutating state, and so.
  • your driver lacks capabilities that the underlying database supports, such as the capability to stream your data out, publish-subscribe to changes, and more.
  • the existing driver doesn’t meet the paradigm your codebase enforces - i.e. you need a purely-functional, driver.
  • education - you are interested in how things work or could work under the hood.

This does not only apply to database drivers. During your career, you may come across times when you need to interact with some system that does not expose some typical HTTP REST API (although this is nowadays less common) - such as some machinery, robots, etc. If you work for a large corporation, this could be some proprietary or in-house solution.

The writing of a driver can be a difficult, yet rewarding endeavor. At best, you have some specification at hand and at worst, you have to reverse engineer it based on sniffing packets.

I strongly recommend checking out the full code if you plan to follow along here.

If there is a single thing to take from the article it would be to not be afraid to do fairly low level stuff with Scala & FP. The language and its powerful libraries have your (and mine) back and they make the whole process ever so enjoyable. :-)

Setting up the stage

Topics that we'll go through:

  • how to serialize and deserialize your data from-to bytes using scodec
  • how to use ZIO-NIO for low-level socket communication
  • how to use ZIO to power all the above
  • how to model your data types and structure code to cover the protocol

We will create a functioning vertical slice of us sending and receiving some messages from Cassandra using its native protocol. You can find the specification here. Don’t worry about reading it all - I’ll guide you through the important sections for our sample. We will develop the code in an extensible manner, that if you desire to cover more functionality, it should not be too difficult. I will link the spec directly to relevant sections as is needed. https://kadek-marek.medium.com/a-guide-to-writing-native-db-driver-in-scala-7f5cb1cf20a0

Setup

We will need three dependencies - ZIO for functional programming, ZIO-NIO for sending bytes around the wire, and scodec so we can define how our case classes look when they are serialized into bytes. Let's add them to our build.sbt.

libraryDependencies ++= Seq(
  "org.scodec" %% "scodec-core" % "1.11.7",
  "org.scodec" %% "scodec-bits" % "1.1.24",
  "dev.zio" %% "zio-nio" % "1.0.0-RC10",
  "dev.zio" %% "zio-logging" % "0.5.8")

Bits, bytes, and codecs

Cassandra envelope has various fields, such as a Version field, which takes 8 bytes, followed by Flags field , taking another 8 bytes, and so. At this time, we don't know what is the meaning of version, or flags. Looking at the envelope, we only see how many bits they take. These layouts are described in section 3. They are the smallest building blocks we can use, to eventually build up to mentioned envelope.

When the spec says "stream id is of type short", we have to know precisely what is the layout of short - how many bytes does it occupy, what endian, etc... Let's implement at least the most important ones from section 3

Create a protocol package with some codecs, using a fantastic scodec library. Codecs define how to encode and decode data into bytes. Scodecs gives us predefined lots of combinators on these codecs that will come in very handy as we implement more complex types.

Reading from the spec:

[int]             A 4 bytes integer
[long]            A 8 bytes integer
...

would correspond to the following codecs (codecs are defined in bits, and our spec defines types in bytes)

import scodec._
val int: Codec[Int]   = codecs.int(4 * 8) // 4 bytes, each having 8 bits
val long: Codec[Long] = codecs.long(8 * 8)

You can compose these codecs into larger codecs (as you would LEGO pieces). Let's say you wanted to encode, using the above codecs, int followed by long:

val bitVector: Attempt[BitVector] =
  (int ~ long).encode((1, 99999999999L))

We get back an Attempt - representing either success or failure of the encoding operation. Decoding follows a similar pattern:

val decodingResult: Attempt[DecodeResult[(Int, Long)]] =
  (int ~ long).decode(BitVector(Array(1.toByte, 4.toByte)))

Our Attempt now contains the result of decoding - which is the actual decoded value and the bits that have remained and were not consumed.

Let's define relevant types from the spec. My suggestion is, open up section 3 and compare the description of type against the following implementation:

import scodec._

package object protocol {
  /** from spec, section 3 */
  val int:        Codec[Int]         = codecs.int(4 * 8)
  val long:       Codec[Long]        = codecs.long(8 * 8)
  val byte:       Codec[Byte]        = codecs.byte(1 * 8)
  val short:      Codec[Int]         = codecs.uint(2 * 8)
  val string:     Codec[String]      = codecs.variableSizeBytes(short, codecs.utf8)
  val longString: Codec[String]      = codecs.variableSizeBytes(int, codecs.utf8)
  val stringList: Codec[List[String]]= codecs.listOfN(short, string)
  val shortBytes: Codec[List[Byte]]  = codecs.listOfN(short, byte)

  type StringMap      = Map[String, String]
  type StringMultiMap = Map[String, List[String]]

  val stringMap: Codec[StringMap] = 
    codecs.listOfN(short, string ~ string).xmap(_.toMap, _.toList)
  val stringMultiMap: Codec[StringMultiMap] =
    codecs.listOfN(short, string ~ stringList).xmap(_.toMap, _.toList)
}

Protocol envelope

Cassandra defines the full envelope as follows:

0         8        16        24        32         40
+---------+---------+---------+---------+---------+
| version |  flags  |      stream       | opcode  |
+---------+---------+---------+---------+---------+
|                length                 |
+---------+---------+---------+---------+
|                                       |
.            ...  body ...              .
.                                       .
.                                       .
+----------------------------------------

We have basic primitive types ready. We could now jump straight into implementing a codec for full message, but that wouldn't be good enough. We'd have a type represented by something like byte (for version), byte (for flags), 2x byte (for stream)... We want to attach semantics to these bytes, based on the spec. This will help us as we will be working with these types on a higher level, attaching them semantics of protocol (for example - if you want to compress the payload, enable second bit in flags payload).

Let's implement the envelope by parts. Don't be scared by the number of sections or code that follows - once you understand a single part, you understand all of them! :-)

The following sections will be very data-oriented, but you know, they say, data rules the world. Though I don't think they meant at most few bytes long envelopes ;-)

We'll start with parsing of an envelope header. That is everything except body, from the diagram above.

Version

Spec link

0         8
+---------+
| version |
+---------+

Version is the first real data type that is composed of multiple parts (each with its own semantics) despite that it's only a single byte! According to the spec, the first bit controls the direction of the message, whether it's a request or a response. The remaining 7 bits designated the version of the protocol. We want our codec to consume one bit and translate it to the direction, followed by 7 bits, translated into the version (this will fit into Byte). Let's model it as a regular set of Scala classes, and define codec for each part:

import scodec.Codec
import scodec.codecs._
import scodec.bits.BitVector

/** MSB bit designates request/response, rest is protocol version */
final case class Version(direction: MessageDirection, protocolVersion: Byte)
sealed abstract class MessageDirection extends Product with Serializable
object MessageDirection {
  case object Request  extends MessageDirection
  case object Response extends MessageDirection
}

object Version {
  val direction: Codec[MessageDirection] = bits(1).xmap(
    { b =>
      if (b.get(0)) MessageDirection.Response
      else MessageDirection.Request
    },
    {
      case MessageDirection.Request  => BitVector.zero
      case MessageDirection.Response => BitVector.one
    }
  )

  val protocolVersion = bits(7)
    .xmap(_.toByte(signed = false),
      (x: Byte) => BitVector.fromByte(x, size = 7)
    )

  val codec = (direction ~ protocolVersion)
    .xmap(Version.apply, (x: Version) => (x.direction, x.protocolVersion))
}

If you haven't encountered xmap before, it's a map & contramap together; we specify how to go from type A to type B, and how to go back from type B to A as well.

Flags

Spec link

 8        16 
-+---------+-
 |  flags  | 
-+---------+-

Flags give various meanings to the envelope. Each flag can be turned on and off, based on how we flip the particular bit. Cassandra currently supports five flags that fit into a single byte - with following layout:

8  flags byte 16 
+-------------+
 x x x 1 1 1 1 1 
       | | | | ^- compression flag
       | | | ^--- tracing flag
       | | ^----- custom payload flag
       | ^------- warning flag
       ^--------- use beta features flag
 ^ ^ ^ ---------- unused in current Cassandra

We'll define FlagsPayload class to wrap it and give us a nice API to access flags from byte. Let's also defined FlagType, that will give us typesafe enumeration of possible flags.

import protocol.FlagsPayload._

final case class FlagsPayload(private[protocol] val flags: Byte) {
  def flag(tpe: FlagType): Boolean     = (flags & tpe.byte) != 0
  def flagsMap: Map[FlagType, Boolean] = FlagsPayload.flags.map(x => x -> flag(x)).toMap
}

object FlagsPayload {
  sealed abstract class FlagType(val byte: Byte) extends Product with Serializable
  case object CompressionFlag                    extends FlagType(0x01)
  case object TracingFlag                        extends FlagType(0x02)
  case object CustomPayloadFlag                  extends FlagType(0x04)
  case object WarningFlag                        extends FlagType(0x08)
  case object UseBetaFlag                        extends FlagType(0x10)

  val codec = byte.xmap(FlagsPayload.apply, (x: FlagsPayload) => x.flags)
  val flags = Set(CompressionFlag, TracingFlag, CustomPayloadFlag, WarningFlag, UseBetaFlag)

  def apply(flags: Set[FlagType]): FlagsPayload = {
    val b = flags.foldLeft(0x00) { _ | _.byte }.toByte
    FlagsPayload(b)
  }

  def apply(flags: FlagType*): FlagsPayload = apply(flags.toSet)
}

I don't know about you, but I'd much rather match on FlagType than against bytes :-)

Stream

Spec link

Disclaimer: this is a naming conflict with our favorite streams that represent a flow of data - it has nothing in common with it.

 16        24        32 
-+---------+---------+-
  |      stream      | 
-+---------+---------+-

Stream is a number. It is set by the client (most usually) and denotes an id of the message. The response (to this request) that you receive from the server will bear the same id. This allows you to pair requests with responses. This is important because the protocol is asynchronous and you can send multiple requests, and get responses in a different order.

Stream is always positive. A negative value of Stream can be emitted by the server in case of the client being subscribed to events (so not a traditional request-response). Since it takes 16bits, we derive that protocol supports 215=32768 concurrent requests (one bit is reserved for sign).

import protocol.StreamSource.{InitiatedByClient, InitiatedByServer}
import scodec.codecs._

sealed abstract class StreamSource extends Product with Serializable
object StreamSource {
  case object InitiatedByClient extends StreamSource
  case object InitiatedByServer extends StreamSource
}
final case class StreamId(streamId: Short) {
  val initiatedBy: StreamSource =
    if (streamId >= 0) InitiatedByClient
    else InitiatedByServer
}
object StreamId {
  val codec = short16.xmap(StreamId.apply, (x: StreamId) => x.streamId)
}

Opcode

Spec link

32         40
-+---------+
 | opcode  |
-+---------+

OpCode (as in Operation Code), defines the meaning of the envelope. Every request is tagged by this code. We use it to distinguish the contents of the body - think about a body of authentication request - it surely differs to a body of query for contents of some Cassandra table.

We will define it only for some operations, but feel free to take a look at the spec and try to support all of them!

import scodec._
import scodec.bits._

sealed abstract class OpCode(val id: Byte, val name: String) extends Product with Serializable

object OpCode {
  case object OpError        extends OpCode(0x00, "ERROR")
  case object OpStartup      extends OpCode(0x01, "STARTUP")
  case object OpReady        extends OpCode(0x02, "READY")
  case object OpAuthenticate extends OpCode(0x03, "AUTHENTICATE")
  case object OpOptions      extends OpCode(0x05, "OPTIONS")
  case object OpSupported    extends OpCode(0x06, "SUPPORTED")
  // ... and others

  val all: List[OpCode] =
    List(
      OpError,
      OpStartup,
      OpReady,
      OpAuthenticate,
      OpOptions,
      OpSupported
    )

  val byId: Byte => Option[OpCode] =
    all.map(x => x.id -> x).toMap.lift

  val codec = byte.exmap(
    b => Attempt.fromOption(byId(b),
      Err.General(s"Failed to find opcode for byte $b", Nil)),
    (b: OpCode) => Attempt.successful(b.id)
  )
}

The new combinator might be exmap, which is the same as xmap that we had used before, only that it allows us to fail the operation. In our case, we fail if our byte doesn't have any case object representation. In other words - if we have received some byte that we don't know the meaning of.

Length

Spec link

0         8        16        24        32
+---------+---------+---------+---------+
|                length                 |
+---------+---------+---------+---------+
|                                       |
.            ...  body ...              .
.                                       .
.                                       .
+----------------------------------------

Length is an integer, that denotes the length of the body.

final case class LengthPayload(bodyLength: Int) extends AnyVal
object LengthPayload {
  val codec = int.xmap(LengthPayload.apply, (x: LengthPayload) => x.bodyLength)
  val empty = LengthPayload(0)
}

Header

These datatypes together create a header. Let's combine them into a single case class. In one of the first sections I've shown you how to combine them using the symbol ~.

import scodec.Codec

final case class Header(version: Version,
                        flagsPayload: FlagsPayload,
                        stream: StreamId,
                        opCode: OpCode)

object Header {
  val codec: Codec[Header] = 
    (Version.codec ~ FlagsPayload.codec ~ StreamId.codec ~ OpCode.codec)
      .xmap(x =>
        { case (((v, f), sid), op) =>
          Header(v, f, sid, op)
        },
        (x: Header) => (((x.version, x.flagsPayload), x.stream), x.opCode)
  )
}

It's a bit clumsy, all those brackets such as in case (((v, f), sid), op). If you were to combine even more codecs and you don't have LISP background, you probably would not enjoy it :-)

Let's use shapeless to make it a bit nicer. You might be wondering where is the dependency coming from since we didn't declare it. It's coming from Scodec that plays with it nicely.

import scodec.Codec
import shapeless._

object Header {
  val codec: Codec[Header] = (Version.codec :: FlagsPayload.codec :: StreamId.codec :: OpCode.codec).xmap(
    { case v :: f :: sid :: op :: HNil =>
      Header(v, f, sid, op)
    },
    (x: Header) => x.version :: x.flagsPayload :: x.stream :: x.opCode :: HNil
  )
}

That's nicer. Now we have everything to define a request and a response.

Requests and their encoding

Let's start by defining a Frame. Every frame contains a header. The frame is a request (client initiates some operation) or response (server's response to the operation).

package protocol

import scodec._

sealed trait Frame extends Product with Serializable {
  def header: Header
}

sealed trait FrameRequest  extends Frame
sealed trait FrameResponse extends Frame

This represents our basic hierarchy. We'll define requests as FrameRequest descendants, and responses under FrameResponse. We will want to encode requests into bits that Cassandra understands. Let's create a typeclass that gives an encoding capability to our requests.

package protocol

import scodec.bits.BitVector
import scodec.{Attempt, Encoder}

trait RequestEncoder[A <: FrameRequest] {
  def bodyEncoder: Encoder[A]
  
  final def headerEncoder: Encoder[A] = Header.codec.contramap(_.header)

  final def encode(a: A): Attempt[BitVector] = {
    for {
      header  <- headerEncoder.encode(a)
      bodyVec <- bodyEncoder.encode(a)
      lenVec  <- LengthPayload.codec.encode(LengthPayload(bodyVec.toByteVector.size.toInt))
    } yield header ++ lenVec ++ bodyVec
  }
}

We can implement an encoder for any type that is a FrameRequest because we know the header of every such type. We encode it, then we encode the body, and the length (we need to encode the body because it denotes its length).

Let's see define two requests, an OptionsRequest and a StartupRequest. The OptionsRequest can be used to find out supported versions of options (protocol version, compression, etc.). The StartupRequest will actually initialize the connection, and the server should respond with ReadyResponse.

package protocol

import scodec._

final case class OptionsRequest(protocolVersion: Byte,
                                flagsPayload: FlagsPayload,
                                streamId: StreamId)
  extends FrameRequest {
  def header: Header =
    Header(Version(MessageDirection.Request, protocolVersion),
      flagsPayload, streamId, OpCode.OpOptions)
}

object OptionsRequest {
  implicit val optionsRequestEncoder: RequestEncoder[OptionsRequest] =
    new RequestEncoder[OptionsRequest] {
      def bodyEncoder: Encoder[OptionsRequest] = Encoder[OptionsRequest] { _: OptionsRequest =>
        Attempt.successful(BitVector.empty)
      }
    }
}

OptionRequest doesn't have any body, and therefore we define encoder that doesn't encode any actual bits. Notice how our header defines that it's MessageDirection.Request as well as OpCode.OpOptions. Those are very important, after all, we are sending OptionsRequest :-).

We move to a StartupRequest. StartupRequest has a body defined by the spec as StringMap, which we had implemented codec for in the previous section.

final case class StartupRequest(protocolVersion: Byte,
                                flagsPayload: FlagsPayload,
                                streamId: StreamId,
                                body: StringMap)
  extends FrameRequest {
  def header: Header =
    Header(Version(MessageDirection.Request, protocolVersion),
      flagsPayload, streamId, OpCode.OpStartup)
}

object StartupRequest {
  implicit val startupRequestEncoder: RequestEncoder[StartupRequest] = new RequestEncoder[StartupRequest] {
    def bodyEncoder: Encoder[StartupRequest] =
      stringMap.contramap((x: StartupRequest) => x.body)
  }
}

Responses and their decoding

Let's define responses to our requests. For OptionsRequest, we get a back response with opcode OpSupported opcode. Let's call it SupportedResponse. For StartupRequest, we receive response with OpReady opcode. We'll call it, surprise - surprise, ReadyResponse. ReadyResponse does not contain anything besides its header (it has no body).

sealed trait FrameResponse extends Frame

final case class ReadyResponse(header: Header) extends FrameResponse

final case class SupportedResponse(header: Header,
                                   body: StringMultiMap) extends FrameResponse

We don't need to create instances of our RequestEncoder typeclass because these are responses and we will never be encoding them into bytes.

We still don't know how to decode it from bytes though. We need to read the header to see the OpCode, and based on that we need to read only certain amount of bytes, matching the payload of message defined by the OpCode. Let's define the decoder based on data from the header:

object FrameResponse {
  def decodeHeader(header: Header): Decoder[FrameResponse] = header.opCode match {
    case OpCode.OpSupported =>
      stringMultiMap.map(x => SupportedResponse(header, x))
    case OpCode.OpReady     =>
      codecs.provide(ReadyResponse(header))
    case h                  =>
      Decoder(_ => Attempt.failure(Err(s"Unable to decode $h")))
  }
}

Decoder for response to OpSupported's body has to read stringMultiMap. For OpReady there is no body, thus we create a decoder that directly provides a response with the header that we already have codecs.provide(ReadyResponse(header)).

Data over network

Now we have implemented all the required encoders and decoders, ranging from primitive types such as int, throughout data representing Flags, Stream and so, into a full envelope containing Header and Body. Let's build pieces to use these to send data over the wire.

We will create two types of sockets. One to wrap low level Java socket into one that understands structures that our encoding library Scodec uses. The second one (built on top of the first one) encodes a request and decodes a response.

Bit Vector Socket

ZIO-NIO provides low level constructs around Java NIO that we will use to send data. It works with ByteBuffers. Scodec's type of choice is BitVector. As we read the data, we need to do these conversions. We'll separate these concerns because we want nice separation and clear layers.

import protocol._
import scodec.Attempt
import scodec.bits.BitVector
import zio._
import zio.nio.channels._
import zio.nio.core._

trait BitVectorSocket {
  def write(vector: BitVector): Task[Unit]

  def read(numBytes: Int): Task[BitVector]
}

object BitVectorSocket {
  def apply(host: String, port: Int): TaskManaged[BitVectorSocket] = {
    for {
      inet   <- ZManaged.fromEffect(SocketAddress.inetSocketAddress(host, port))
      socket <- SocketChannel.open(inet)
    } yield new BitVectorSocket {
      override def write(vector: BitVector): Task[Unit] = {
        val buffer = Buffer.byteFromJava(vector.toByteBuffer)
        socket.write(buffer).unit
      }

      override def read(numBytes: Int): Task[BitVector] = {
        socket.readChunk(numBytes).map(x => BitVector.bits(x.asBits))
      }
    }
  }
}

We'll create a socket address, which is an effect. We open the socket using the address, which is Managed type - it will take care of resource cleanup after we are done using it. The TaskManaged type that you see is essentially a managed type without any dependencies that can fail on Throwable. Less jargon and more code: type TaskManaged[+A] = ZManaged[Any, Throwable, A] `

Let's test this socket - just for fun. Open a local socket under port 9042. If you are on Linux, you can use nc -l 9042 | hexdump -C. Let's send a number 11 over the wire.

import scodec.bits.BitVector
import zio._

object HelloBitVector extends App {
  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
    val zio = BitVectorSocket("localhost", 9042).use { bvs =>
      bvs.write(BitVector.fromInt(11))
    }

    zio.exitCode
  }
}

We have received:

00 00 00 0b

0b represents 11 in hexadecimal representation. Our BitVectorSocket seems to be working fine (if you're a pessimist add "it works for number 11").

Message Socket

The final missing piece is a socket that can send FrameRequest and receive FrameResponse.

trait MessageSocket {
  def send[A <: FrameRequest: RequestEncoder](req: A): Task[Unit]

  def receive: Task[FrameResponse]
}

We can send anything that is a FrameRequest and we have typeclass instance of RequestEncoder for. Let's implement it. It will use our BitVectorSocket underneath, therefore it will also need to be of type TaskManaged.

object MessageSocket {
  def apply(host: String, port: Int): TaskManaged[MessageSocket] = {
    for {
      socket <- BitVectorSocket(host, port)
    } yield new MessageSocket {
      override def send[A <: FrameRequest](req: A)(implicit encoder: RequestEncoder[A]): Task[Unit] = {
        val encoded = fromAttempt(encoder.encode(req))
        encoded.flatMap(socket.write)
      }

      override def receive: Task[FrameResponse] = {
        val bytes       = (Header.codec.sizeBound.lowerBound / 8).toInt
        val lengthBytes = (LengthPayload.codec.sizeBound.lowerBound / 8).toInt
        for {
          headerBits     <- socket.read(bytes)
          header         <- fromAttempt(Header.codec.decodeValue(headerBits))
          
          bodyLengthBits <- socket.read(lengthBytes)
          bodyLength     <- fromAttempt(LengthPayload.codec.decodeValue(bodyLengthBits))
          
          bodyBits       <- socket.read(bodyLength.bodyLength)
          bodyDecoder     = FrameResponse.decodeHeader(header)
          body           <- fromAttempt(bodyDecoder.decodeValue(bodyBits))
        } yield body
      }
    }

  }

  private def fromAttempt[T](attemt: Attempt[T]): Task[T] =
    attemt.fold(x => ZIO.fail(new Exception(x.messageWithContext)), x => ZIO.succeed(x))

}

We prepare a method to help us convert Attempts to ZIO's Tasks. If encoding or decoding fails, we want to fail the task. Sending a message is simple - we encode the data to BitVector, and write it to the socket.

For receiving, we read and decode the header, and bodyLength. Based on bodyLength, we read the following bits. Do you remember how the header contains OpCode that gives semantics to the body? We implemented a method that gives us the right decoder based on this header. We called it def decodeHeader(header: Header): Decoder[FrameResponse]. We use it to obtain the required decoder which we use to read the body to get FrameResponse.

Let's test it by sending a couple of messages.

Sending real requests to Cassandra

First, make sure you have Cassandra running with port 9042 exposed.

docker run --rm -p 127.0.0.1:9042:9042 \
 -p 127.0.0.1:9160:9160 \
 --name cassandra \
 -e CASSANDRA_START_RPC=true \
 cassandra:latest

Let's construct requests and send them:

import protocol.FlagsPayload._
import protocol._
import zio._
import zio.logging._

object HelloCassandra extends App {
  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
    val flags   = FlagsPayload(TracingFlag, WarningFlag)
    val version = Version(MessageDirection.Request, 4)

    val request1 = OptionsRequest(version.protocolVersion, flags, StreamId(1))
    val request2 = StartupRequest(version.protocolVersion, flags, StreamId(2), Map("CQL_VERSION" -> "3.0.0"))

    val zio = for {
      _ <- MessageSocket("localhost", 9042).use { socket =>
        for {
          _     <- log.info(s"request: $request1")
          _     <- socket.send(request1)
          resp1 <- socket.receive
          _     <- log.info(s"response: ${resp1.toString}")

          _     <- log.info(s"request: $request2")
          _     <- socket.send(request2)
          resp2 <- socket.receive
          _     <- log.info(s"response: ${resp2.toString}")
        } yield ()
      }
    } yield ()

    val loggingLayer = Logging.console()
    zio.provideCustomLayer(loggingLayer).exitCode
  }
}

After running it should see in console something like this (based on your Cassandra version):

request: OptionsRequest(4,FlagsPayload(10),StreamId(1))
response: SupportedResponse(
  Header(Version(Response,4),FlagsPayload(0),StreamId(1),OpSupported),
    Map(PROTOCOL_VERSIONS -> List(3/v3, 4/v4, 5/v5-beta),
        COMPRESSION -> List(snappy, lz4),
        CQL_VERSION -> List(3.4.4)))

request: StartupRequest(4,FlagsPayload(10),StreamId(2),
    Map(CQL_VERSION -> 3.0.0))
response: ReadyResponse(Header(Version(Response,4),FlagsPayload(0),StreamId(2),OpReady))

Congratulations - you have successfully sent a message to Cassandra - that understood it, and sent you back a response! Isn't it exciting?

This is where the real fun starts. We are able to talk to our database, and now it's a matter of implementing further types, operations, requests, responses, and protocol logic.

Next steps

Notice how when we receive a message, we receive a generic FrameResponse. We still lack actual protocol logic - for example - the OptionsRequest we always get back a SupportedResponse. This is not encoded anywhere. All we can do is send some FrameRequest, and receive some FrameResponse.

In our example, we used the driver synchronously - we sent a message, and expected response. A good driver is able to handle this communication asynchronously (the protocol supports it) - you can have multiple requests in flight in parallel. The driver should pair them based on their Stream id.

  • break the protocol (i.e. change expected sizes of payloads) and see what errors you get. Do you get any response from Cassandra?
  • define remaining types from section 3
  • implement missing OpCodes
  • support additional requests and responses
  • create a layer that understands protocol defined by the spec - i.e. after StartupRequest, we get back typed response of either ReadyResponse or an AuthenticateResponse, instead of a generic FrameResponse.
  • add support for asynchronous communication - the ability to have multiple requests in flight

Maybe I will tackle some of these in one of my following posts.

Conclusion

We have defined our datatypes, opened a socket, and exchanged few messages with Cassandra. We have a strong foundation and layers that we can extend with additional operations and capabilities to better the driver.

As a closing remark, I would like to point out a kickass library that does something similar, but for Postgres - skunk. It was my initial motivation for this blog post, and I highly recommend you check it out.

I hope you enjoyed the article at least as much as I enjoyed writing it. If you have some questions feel free to get in touch at my Twitter @MarekKadek.

Originally published on kadek-marek.medium.com

    ZIO
    Scala
    Postgress
    Kafka
    Cassandra

Related Issues

cosmos / gaia
  • Open
  • 0
  • 0
  • Intermediate
  • Go
cosmos / gaia
  • Started
  • 0
  • 2
  • Intermediate
  • Go
cosmos / ibc
  • Open
  • 0
  • 0
  • Intermediate
  • TeX
cosmos / ibc
cosmos / ibc
  • Started
  • 0
  • 1
  • Intermediate
  • TeX
viebel / klipse-clj
viebel / klipse-clj
  • Started
  • 0
  • 4
  • Intermediate
  • Clojure
viebel / klipse
  • Started
  • 0
  • 1
  • Intermediate
  • Clojure
viebel / klipse
  • 1
  • 0
  • Intermediate
  • Clojure
viebel / klipse
  • Started
  • 0
  • 4
  • Intermediate
  • Clojure
  • $80

Get hired!

Sign up now and apply for roles at companies that interest you.

Engineers who find a new job through WorksHub average a 15% increase in salary.

Start with GitHubStart with Stack OverflowStart with Email