An Introduction to Finagle by example
It is a challenging task to build a large-scale web application, there are fundamental characteristics to take into account: for example, efficiency, safety and robustness. Finagle is a asynchronous, Netty based JVM RPC system made by Twitter which makes it easy to build high available clients and servers in Java and Scala. And it can even simplify your application architecture. Here I want to show you how powerful Finagle is.
Quickstart
Let’s first have a quick look about how to create a Finagle micro web service and a Finagle http client to consume this api.Create a sbt project and import dependencies.
libraryDependencies ++= Seq(
"com.twitter" %% "finagle-http" % "6.38.0",
"org.scalatest" %% "scalatest" % "2.2.4" % "test"
)
First, let’s define a service. Here we define a service to receive a http request and get its url parameter as Integer then return a http response by plus 10.
import com.twitter.finagle.Serviceimport
import com.twitter.util.Futureimport
import com.twitter.finagle.http
// This is a plus 10 service
class PlusTenService extends Service[http.Request, http.Response] {
override def apply(request: http.Request): Future[http.Response] = {
Future {
val input = request.getIntParam("num")
val output = input + 10
val response = http.Response(request.version, http.Status.Ok)
response.setContentString(output.toString)
response
}
}
}
Then initiate and start our server
import com.twitter.finagle.{http, Service, Http}
import com.twitter.util.Await
object QuickLookServer {
def main(args: Array[String]): Unit = {
val service: Service[http.Request, http.Response] = new PlusTenService val server = Http.serve(":9090", service)
Await.ready(server)
}
}
Last let’s define a client to consume this server.
import com.twitter.finagle.{Service, Http}
import com.twitter.finagle.http
import com.twitter.util.Await
object QuickLookClient {
def main(args: Array[String]): Unit = {
//define a client
val client: Service[http.Request, http.Response] = Http.newService("localhost:9090")
//define a request
val request = http.Request(http.Method.Get, "/?num=5")
//apply request on the client
val response = client(request)
// print response
response.foreach(rep => println(rep.getContentString()))
Await.result(response)
}
}
If you run the two application you will see the server running on localhost:9090 and client get response 15. Simple right? As you can see our service and client are both type of Service[http.Request, http.Response] . This data type really confuse me in the beginning. I will explan what’s the differences between them.
The core of Finagle
Service
Now let’s first have a look at the core of finagle Service[-Req, +Rep] . You can find the definition in com.twitter.finagle.Service . In Finagle 6.38.0 the definition of Service is an abstract class, in previous version it was a trait
abstract class Service[-Req, +Rep] extends (Req => Future[Rep])
A service is a function that takes request of type Req, and return a response of Future of Rep. This Services type are used to represent both clients and servers. To answer my previous question, the differences between service and client is that a Finagle client “imports” a Service from the network. However, a Finagle server “exports” a Service to the network.Note: the Future here is twitter future not scala future. There is no differences on conception.
Filter
Some times we want to add application-agnostic behaviour, we can use Filter to achieve this.
abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
extends ((ReqIn, Service[ReqOut, RepIn])
=> Future[RepOut])
If it is not clear please check image below.
In most common cases, ReqIn is equal to ReqOut, and RepIn is equal to RepOut. So we have this SimpleFilter class
abstract class SimpleFilter[Req, Rep] extends Filter[Req, Rep, Req, Rep]
A filter can attached to client and server side. Let’s try to implement a simple timeout filter.
import com.twitter.finagle.{SimpleFilter, Service}
import com.twitter.util.{Duration, Timer, Future}
class TimeoutFilter[Req, Rep](timeout: Duration, timer: Timer)
extends SimpleFilter[Req, Rep] {
def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
val res = service(request)
res.within(timer, timeout)
}
}
Here, a timeout filter is a class extends SimpleFilter trait. Below is how to use this filter on client side
client = Http.newService("localhost:9090")
val timeoutFilter = new TimeoutFilter[http.Request, http.Response](Duration.fromSeconds(1),
new JavaTimer(false))
val clientWithTimeoutFilter = timeoutFilter.andThen(quickLookClient)
A filter can be applied on server side too. Here is an example. First let’s define a filter.
class CountFilter[Req, Rep](countClient: Service[http.Request, http.Response]) extends SimpleFilter[Req, Rep] {
override def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
val countRequest = http.Request(http.Method.Post, "/?count=5")
countClient(countRequest) service(request)
}
}
And then let’s use it on our plusTen service
val service: Service[http.Request, http.Response] = new PlusTenService
val countClient = Http.newService("localhost:9010")
val countFilter = new CountFilter[http.Request, http.Response](countClient)
val serviceWithCountFilter = countFilter.andThen(service)
You may notice the way to chain filter and service together is by using andThen method. Actually andThen method can not only chain filter with service but also chain multiple filters, like filter1 andThen filter2 andThen myservice
Client
This is the part that I like the most in finagle. Finagle http client is designed to maximize success and minimize latency. Each request will flow through various modules. These modules are logically separated into three stacks: Client stack, Endpoint stack, connection stack.
Client stack
manages name resolution and balances requests across multiple endpoints.
Endpoint stack
provides circuit breakers and connection pooling.
connection stack
provides connection life-cycle management and implements the wire protocol.
To use finagle http client is very simple. Define a client first and define a http request, then apply request on the client.
// create a http client
val client = Http.client.newService("example.com:80")
// create a http requestval
req = Request("/foo", ("my-query-string", "bar"))
// apply request on the client
val resp: Future[Response] = client(req)Note: client(req) is equal to client.apply(req)
What I want to emphasis here is the Load Balancer module. This module brings a lot of benefit for your application. It can simplify your application infstracture. Let’s compare it with traditional solution.
As you can see, the traditional solution highly rely on nginx as load balancer, once nginx dead your service is not reachable, in real production environment, you have master-slave nginx wiht keeplived installed on nginx machine for heartbeat detection. This looks really complex, what about if we can get rid of these nginx?Let’s have look at following code.
val name: Name = Name.bound(Address("localhost", 10010), Address("localhost", 10011), Address("localhost", 10012))
//define a client
val client: Service[http.Request, http.Response] = Http.newService(name, "client")
This means you supply three addresses and put it into finagle http client. Finagle client will dispatch the request to one of address based on certain load balance algorithmn. The default algorithmn is "Exponentially Weighted Moving Average (EWMA)". Now your infstracture architechture becomes like following
Pretty simple right. Your apis talk to each other directly.
Protocol-agnostic
Finagle is a protocol-agnostic RPC system. It means Finagle supports every protocol if people implement it. For example: finagle-thrift is using thrift protocol. finagle-mysql implements the mysql protocol.Now, let’s look at this scenario
We want to make a api count service to count how many times the web service has been called. In section Service and Filter. We send http request and put number as query parameter. It just feel strange that I just want to send a number to count server, to achieve that I have to send a http request. Because I don’t use any data from header, cookie and body. If the application is running on AWS, it those junk information cost money. So it’s ideal to just send a integer number to api count service. Let’s implement this by customize finagle protocol.First, we should tell finagle how to converts an scodec codec into a Netty encoder
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.oneone.{OneToOneDecoder,OneToOneEncoder}
import scodec.Codec
import scodec.bits.BitVector
trait CodecConversions {
/**
* Converts an scodec codec into a Netty encoder.
*/
protected def encoder[A: Codec] = new OneToOneEncoder {
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
ChannelBuffers.wrappedBuffer(
Codec.encodeValid(msg.asInstanceOf[A]).toByteBuffer)
}
/**
* Converts an scodec codec into a Netty decoder.
*/
protected def decoder[A: Codec] = new OneToOneDecoder {
override def decode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
msg match {
case cb: ChannelBuffer =>
Codec.decodeValidValue[A (BitVector(cb.toByteBuffer)).asInstanceOf[Object]
case other => other
}
}
}
And then channel pipeline and codec factories
trait Factories {
this: CodecConversions =>
import com.twitter.finagle.{Codec => FinagleCodec, CodecFactory}
import org.jboss.netty.channel.{ChannelPipelineFactory, Channels}
/**
* Creates a Netty channel pipeline factory given input and output types. */
private[this] def pipeline[I: Codec, O: Codec] = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("encoder", encoder[I])
pipeline.addLast("decoder", decoder[O])
pipeline
}
}
/**
* Creates a Finagle codec factory given input and output types. */
protected def codecFactory[I: Codec, O: Codec] = new CodecFactory[I, O] {
def server = Function.const {
new FinagleCodec[I, O] { def pipelineFactory = pipeline[O, I] }
}
def client = Function.const {
new FinagleCodec[I, O] { def pipelineFactory = pipeline[I, O] }
}
}
}
And then the code that actually creates our Finagle server and client
import java.net.InetSocketAddress
import com.twitter.conversions.time._
import com.twitter.finagle.Service
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
import com.twitter.util.{Duration, Future}
import scodec.Codec
object IntegerServerAndClient extends Factories with CodecConversions {
/**
* Creates a Finagle server from a service that we have scodec codecs
* for both the input and output types. */
def server[I, O](port: Int)(service: Service[I, O])(implicit ic: Codec[I], oc: Codec[O]) =
ServerBuilder()
.name("server")
.codec(codecFactory[I, O])
.bindTo(new InetSocketAddress(port))
.build(service)
/**
* Creates a Finagle client given input and output types with scodec codecs.
*/
def client[I, O](host: String, timeout: Duration = 3.second) (implicit ic: Codec[I], oc: Codec[O]) =
ClientBuilder()
.name("client")
.codec(codecFactory[I, O])
.hosts(host)
.timeout(timeout)
.build()
}
Define our simple service
import com.twitter.finagle.Service
import com.twitter.util.Future
class IntegerService extends Service[Int, Int] {
var count = 0
override def apply(request: Int): Future[Int] = {
Future.value(count + request)
}
}
Run a server
import com.twitter.finagle.Service
import com.twitter.util.Await
import scodec.codecs.implicits.{ implicitIntCodec => _, _ }
object Server {
def main(args: Array[String]): Unit = {
implicit val intgerCodec = scodec.codecs.uint8
val service: Service[Int, Int] = new IntegerService
val server = IntegerServerAndClient.server[Int, Int](9191)(service)
Await.ready(server)
}
}
Run a client
import com.twitter.finagle.Service
import com.twitter.util.Await
import scodec.codecs.implicits.{ implicitIntCodec => _, _ }
object Client {
def main(args: Array[String]): Unit = {
implicit val intgerCodec = scodec.codecs.uint8
//define a client
val client: Service[Int, Int] = IntegerServerAndClient.client[Int, Int]("localhost:9191")
//define a request
val request = 4
//apply request on the client
val response = client(request)
//print response
response.foreach(rep => println(s"This is response $rep"))
Await.result(response)
}
}
Conclusion
Finagle is a very flexible asychronous, protocol-agnostic RPC framework. It can help you to build high performance micro service with any protocol. It is worth to take a look at Finch the web framework based on Finagle. You can find more detail introduction from Twitter blog and more detailed example from Twitter scala school.