Typing your actors step by step
Background
After some frustration with our project’s choice for a planning poker solution, I did what any reasonable developer would do. No, I didn’t search for a more suitable solution, I decided to write my own and do a blog post about it.
The main idea is to build a simple application that uses some features of the following core Akka libraries:
-
Akka HTTP: REST API for Vue.js frontend
-
Akka Streams: Websocket bi-directional data handling
-
Akka Actors: Rooms management
Initially, I deliberately chose to use a classic actor implementation so that I could actually also talk about something that might be more useful: migrating an Akka Classic application to an Akka Typed version.
"Why would you want to do that?", you may ask… Well, for me it is the type safety of Akka Typed which means that an Actor’s behaviour isn’t just a function of Any ⇒ Unit
and strong support to avoid certain anti-patterns (more eloquently explained here), but feel free to search more if this migration is something for your project.
Overview
Obviously, the observable behaviour should stay the same after migration, which consists of:
-
a stream for each websocket connection with its own wire data domain forwarding all incoming data to the main application controller (Room Manager) as well as providing an interface to forward outgoing data.
-
the Room manager is responsible for translating from the wire domain to the application domain and route translated messages to appropriate rooms and manage each room lifecycle.
-
rooms are representations of each planning poker session, holding all necessary information and making sure that each participant is up to date with it.
Step by step
First things first, we need to replace our old classical actor system with its typed counterpart:
implicit val system: ActorSystem[SpawnProtocol.Command] =
ActorSystem(Behaviors.setup[SpawnProtocol.Command](_ => SpawnProtocol()), "pointing-poker")
Takeaways
-
There is no default guardian actor, you need to provide one.
Tips
-
Use SpawnProtocol, it allows to easily create new actors.
Now that we have our actor system, we need to create our top level actors:
system ! SpawnProtocol.Spawn(RoomManager(), "room-manager", Props.empty, system.ignoreRef)
Takeaways
-
There is no
actorOf
method on a typed ActorSystem. -
If you don’t care/need the response, you can use
system.ignoreRef
I could have created the RoomManager during the Behaviors.setup
when creating the actor system, but I also need the RoomManager reference in my http api, so I created it from “outside” using the SpawnProtocol.Spawn
which will reply with the ActorRef once created. However as you can see, I’m ignoring the ref for now, because to actually use it, we need to talk how to ask information out of the actor system:
implicit val timeout: Timeout = 3.seconds
val roomManagerFuture: Future[ActorRef[RoomManager.Command]] = system.ask { ref =>
SpawnProtocol.Spawn(RoomManager(), "room-manager", Props.empty, ref)
}
implicit val ec: ExecutionContextExecutor = system.executionContext
roomManagerFuture.onComplete {
case Success(roomManager) =>
val api = API(roomManager, apiConfig)
api.run()
case Failure(exception) =>
log.error("Error creating room manager {}", exception)
}
Takeaways
-
Ask now provides a reference that can receive messages from any actor.
Now we need to convert RoomManager and Room to typed versions, that could be accomplished either by extending AbstractBehavior
(object oriented style) or defining functions that return a Behavior
(functional style), I’ll be using functional style (comparison):
RoomMananger:
final case class RoomManagerData(rooms: Map[UUID, ActorRef[Room.Command]])
object RoomManagerData {
val empty: RoomManagerData = RoomManagerData(rooms = Map.empty[UUID, ActorRef[Room.Command]])
}
def apply(): Behavior[Command] =
Behaviors.setup[Command] { context =>
val roomResponseActor: ActorRef[Room.Response] =
context.messageAdapter(response => RoomResponseWrapper(response))
receiveBehaviour(RoomManagerData.empty, roomResponseActor)
}
private[actors] def receiveBehaviour(data: RoomManagerData, roomResponseWrapper: ActorRef[Room.Response]): Behavior[Command] =
Behaviors
.receive[Command] { (context, message) =>
message match {
case CreateRoom(replyTo) =>
//handle message
receiveBehaviour(newData, roomResponseWrapper)
case ConnectToRoom(message, user) =>
//handle message
Behaviors.same
case IncomeWSMessage(message) =>
//handle message
Behaviors.same
case UnsupportedWSMessage =>
//handle message
Behaviors.same
case WSCompleted(roomId, userId) =>
//handle message
Behaviors.same
case WSFailure(t) =>
//handle message
Behaviors.same
}
}
.receiveSignal {
case (_, Terminated(ref)) =>
//handle message
receiveBehaviour(newData, roomResponseWrapper)
}
Room:
final case class RoomData(
users: List[User],
currentIssue: String,
issueLastEditBy: Option[UUID]
) {
def joinUser(user: User): RoomData = //Omitting implementation to save space
def vote(userId: UUID, estimation: String): RoomData = //Omitting implementation to save space
def clear(): RoomData = //Omitting implementation to save space
def leave(userId: UUID): RoomData = //Omitting implementation to save space
def editIssue(issue: String, userId: UUID): RoomData = //Omitting implementation to save space
}
object RoomData {
val empty: RoomData = RoomData(List.empty[User], "", Option.empty[UUID])
}
def apply(roomId: UUID): Behavior[Command] =
Behaviors.setup[Command] { _ =>
receiveBehaviour(roomId, RoomData.empty)
}
private def receiveBehaviour(roomId: UUID, data: RoomData): Behavior[Command] =
Behaviors.receive[Command] { (context, message) =>
message match {
case Join(user) =>
// Handling message using data transformation defined in case class
receiveBehaviour(roomId, newData)
case Vote(userId, estimation) =>
// Handling message using data transformation defined in case class
receiveBehaviour(roomId, newData)
case ClearVotes(userId) =>
// Handling message using data transformation defined in case class
receiveBehaviour(roomId, newData)
case ShowVotes(userId) =>
// Handling message using data transformation defined in case class
Behaviors.same
case Leave(userId) =>
// Handling message using data transformation defined in case class
Behaviors.same
case EditIssue(userId, issue) =>
// Handling message using data transformation defined in case class
receiveBehaviour(roomId, newData)
}
}
Takeaways
-
Internal state is now immutable.
-
Sender
sender()
reference is not present, causing modification on CreateRoom, now being a part of the message itselfreplyTo
. -
Already used actor DSL, however now compiler also checks (found a missing extends on EditIssue on Room).
-
The compiler will issue a warning if a message defined in the Actor’s protocol is not being handled.
-
Lifecycle messages are handled in a separate function.
-
Context.stop
will only work on child actors.
Tip
-
Think of your actor as a finite state machine.
-
Set data transformation as a function on the case class, making message handling more readable. It also allows to unit testing on transformations.
Previously we had to request (ask) information out of one actor, however when an actor sends a request to another actor, there is a need for translation of the response as Typed actors only can handle messages defined on their domain. This translation is done using an adapater.
object Room {
sealed trait Response
final case class Running(roomId: UUID) extends Response
final case class Stopped(roomId: UUID) extends Response
}
object RoomManager {
final case class RoomResponseWrapper(response: Room.Response) extends Command
def apply(): Behavior[Command] =
Behaviors.setup[Command] { context =>
val roomResponseActor: ActorRef[Room.Response] =
context.messageAdapter(response => RoomResponseWrapper(response))
receiveBehaviour(RoomManagerData.empty, roomResponseActor)
}
}
Takeaways
-
When expecting a response from another actor you will need an adapter to convert to the proper message type.
-
If the translation on the adapter fails, the actor will be stopped.
-
There is only one adapter per message type, any new adapter will override the previous one.
Tip
-
Use simple conversion on adapters, leave the actual message handling to the actor’s behaviour.
Now that the actors are typed, the remaining parts of the system need to be adapted to interact with them.
Since Akka HTTP 10.2.x, it is not necessary to convert to untyped actor system to start your HTTP server. Now it looks like:
Http().newServerAt(apiConfig.host, apiConfig.port).bind(route)
Takeaways
-
Before 10.2.x you would need to change to untyped system.
As for the websocket stream, sadly I couldn’t find anything that would make the integration between streams and typed actors. So instead of changing the whole websocket stream code, I decided to use the coexistence functionalities, which allows me to do:
import akka.actor.typed.scaladsl.adapter._
handleWebSocketMessages(
WS.handler(
roomId,
URLDecoder.decode(encodedName, StandardCharsets.UTF_8.name()),
roomManager.toClassic
)
)
Takeaways
-
You are not bound to use only typed actors, both can coexist.
Now that all actors, services and connecting parts are migrated, the tests need to be adjusted. The Actor TestKit provides two utilities for testing, ActorTestKit
for asynchronous testing and BehaviorTestKit
for synchronous testing.
RoomManagerSpec:
"connect user to room" in {
val behaviorTestKit = BehaviorTestKit(RoomManager())
val roomId = UUID.randomUUID()
val user1Probe = TestProbe()(testKit.system.classicSystem)
val user2Probe = TestProbe()(testKit.system.classicSystem)
val user1 = Room.User(UUID.randomUUID(), "user 1", false, "", user1Probe.ref)
val user2 = Room.User(UUID.randomUUID(), "user 2", false, "", user2Probe.ref)
behaviorTestKit.run(
RoomManager
.ConnectToRoom(WSMessage(MessageType.Join, roomId, user1.id, user1.name), user1Probe.ref)
)
behaviorTestKit.run(
RoomManager
.ConnectToRoom(WSMessage(MessageType.Join, roomId, user2.id, user2.name), user2Probe.ref)
)
val childInbox = behaviorTestKit.childInbox[Room.Command](roomId.toString)
childInbox.expectMessage(Room.Join(user1))
childInbox.expectMessage(Room.Join(user2))
}
"handle an IncomeWSMessage that generates an outcome" in {
val roomId = UUID.randomUUID()
val roomProbe = testKit.createTestProbe[Room.Command]()
val managerRef = testKit.spawn(
RoomManager.receiveBehaviour(RoomManagerData(Map(roomId -> roomProbe.ref)))
)
val userId = UUID.randomUUID()
managerRef ! RoomManager.IncomeWSMessage(WSMessage(MessageType.Vote, roomId, userId, "5"))
managerRef ! RoomManager.IncomeWSMessage(
WSMessage(MessageType.EditIssue, roomId, userId, "issue name")
)
managerRef ! RoomManager.IncomeWSMessage(WSMessage(MessageType.Show, roomId, userId, ""))
managerRef ! RoomManager.IncomeWSMessage(WSMessage(MessageType.Clear, roomId, userId, ""))
roomProbe.expectMessage(Room.Vote(userId, "5"))
roomProbe.expectMessage(Room.EditIssue(userId, "issue name"))
roomProbe.expectMessage(Room.ShowVotes(userId))
roomProbe.expectMessage(Room.ClearVotes(userId))
}
RoomSpec:
"Room Actor" should {
"update current issue and broadcast it" in {
val issue = "Issue test 1"
val (user, userProbe) = createUser(UUID.randomUUID(), "user1", false, "")
val (user2, user2Probe) = createUser(UUID.randomUUID(), "user2", false, "")
val dataProbe = testKit.createTestProbe[Room.Response]()
val actingUserId = UUID.randomUUID()
val (roomId, roomRef) = createRoom(
UUID.randomUUID(),
RoomData.empty.copy(users = List(user, user2))
)
val expectedMessage = WSMessage(MessageType.EditIssue, roomId, actingUserId, issue)
val expectedData = Room.DataStatus(data =
RoomData(
users = List(user, user2),
currentIssue = issue,
issueLastEditBy = Option(actingUserId)
)
)
roomRef ! Room.EditIssue(actingUserId, issue)
roomRef ! Room.GetData(dataProbe.ref)
userProbe.expectMsg(expectedMessage)
user2Probe.expectMsg(expectedMessage)
dataProbe.expectMessage(expectedData)
}
}
Takeaways
-
Since actors are
Behavior
functions, overriding internal functions for testing is not suggested like in classical. -
BehaviorTestKit
is better at dealing with an actor’s children. -
ActorTestKit
provides a simple way to create and use probes. -
There isn’t a clear way to test communication between more than 2 actors (if they aren’t related).
Tips
-
Normally I would only expose the initial behavior from my actor, but I changed the other behavior access to private on the package, so now I can set the behavior data as I need before the test.
private[actors] def receiveBehaviour(roomId: UUID, data: RoomData): Behavior[Command] = ???
-
Include a message that allow to inspect internal state, again access is private on package.
private[actors] final case class GetData(replyTo: ActorRef[Response]) extends Command
sealed trait Response
final case class DataStatus(data: RoomData) extends Response
Closing thoughts
The migration went smoother than I was expecting. Once I started it I couldn’t run the application until it was complete, and after changing the whole engine of the app I only faced two compilation errors that were quickly solved and the application was running again. Those errors were actually already present at in the application before the migration, however they went un-noticed due to not having the checks that typed actors bring, so the migration already proved useful.
Some of the smoothness I experienced while migrating might be explained by the fact that I already followed good practices when using classic actors, specifically having a DSL already defined. If you’re having more troubles when migrating, I would suggest to take a step back and refine your actors' DSL and transitions (How they communicate with each and states that they pass on their lifecycle).
If you want to see more what could be done with typed actors and Scala 3.0, these two blog posts may be interesting to read: Using Scala 3 Union types with Akka Typed - part 1 and Using Scala 3 Union types with Akka Typed - part 2.
Helpful links: