noteflakes

Real-world Concurrency with Ruby and Polyphony: a Telnet Chat App

13·11·2021

Recently there has been a lot of renewed interest in fibers as the building blocks for writing concurrent Ruby apps. Most of the articles written lately (my own included) have tried to explain what are fibers, how they can be used for writing concurrent apps, and how fiber scheduling works. While this obviously is great, I feel there’s also a need for developers to get a feel for how a real-world fiber-based app looks, and how writing such an app differs from using, say, EventMachine or some other Ruby library providing a different concurrency model.

In this article I’ll walk you through implementing a bare-bones Telnet chat app using Polyphony. Along the way, I’ll demonstrate how Polyphony lets us write concurrent programs in a natural, idiomatic style, and show how fiber messaging, one of Polyphony’s unique features, allows us to design a concurrent app as a collection of simple, autonomous entities, each having a single responsibility.

The source code for the chat app is available as a gist.

Designing our chat app

The chat app we’re going to implement today will have the following requirements:

Now that we have our requirements, let’s concentrate on the design of our program: what are the different moving parts and how do they connect? One of the biggest advantages of using fibers is that, fibers being so cheap to create (in terms of computer resources,) we can implement any entity in our program as a fiber. If we take the problem of a chat app, we have rooms, we have users, and we have TCP connections. As I’ll show below, each of these can be modeled as an independent fiber.

Fiber messaging

In order for all those different fibers to communicate with each other, we can use fiber messaging, a feature that is unique to Polyphony, and is greatly inspired by message-passing in Erlang, which essentially permits Erlang processes to behave as concurrent actors.

In Polyphony, each fiber has a mailbox, and can receive messages by calling Kernel#receive. A message can be any Ruby object. To send a message to a fiber, we call Fiber#send or Fiber#<<. Receiving a message is a blocking operation, if the fiber’s mailbox is empty, the call to #receive will block until a message is sent to the fiber. The call to #send, however, is not blocking (except if the fiber’s mailbox is capped and filled to capacity. By default fiber mailboxes are not capped.)

Here’s a simple example to show how fiber messaging works:

require 'polyphony'

receiver = Fiber.current
spin do
  sleep 1
  receiver << "hello"
end

puts "Waiting for message..."
message = receive
puts "Got #{message.inspect}"

In the above example, we spin up a fiber that will sleep for 1 second, then send a message to the main fiber, which meanwhile waits for a message to be received. This apparently simple mechanism for asynchronous communication between fibers has profound implications for how we can structure our concurrent programs. Since fibers can behave as actors (just like Erlang processes,) they can basically have the same capabilities as custom Ruby objects. Think about it: when we call a method on a Ruby object, we basically send it a message. If fibers can send and receive messages, we can use them instead of plain Ruby objects. And just like custom Ruby objects which hold state (stored in instance variables,) fibers can hold state in local variables.

In order to see how a fiber can hold state and receive “method calls”, let’s take the simple example of a calculator with a memory. Our calculator can do arythmetic operations on the last retained value. Here’s how we’ll implement such a calculator using a normal Ruby class definition:

class Calculator
  def initialize
    @value = 0
  end

  def add(x)
    @value += x
  end

  def mul(x)
    @value *= x
  end
end

calculator = Calculator.new
calculator.add(3) #=> 3
calculator.mul(2) #=> 6

Now let’s see how we can do the same thing with a fiber:

require 'polyphony'

calculator = spin do
  value = 0
  loop do
    peer, op, x = receive
    case op
    when :add
      value += x
      peer << value
    when :mul
      value *= x
      peer << value
    end
  end
end

calculator << [Fiber.current, :add, 3]
receive #=> 3
calculator << [Fiber.current, :mul, 2]
receive #=> 6

The calculator fiber loops infinitely, waiting for messages to be received in its mailbox. Each message, having been destructured, is processed by updating the state and sending the updated state to the peer fiber which originated the message. Notice that in the fiber-based version, in order to get the result of the arythmetic operation, we need to provide the calculator fiber with the current fiber, to which it will send the result of the operation. In effect, our calculator fiber can be said to be a sort of server: it receives requests, handles them, and sends back a reply.

This might seem like a a much more complicated way of doing things, but in fact look at the stuff we don’t need to worry about: we don’t need to define a custom class, and our state is safely stored as a local variable and cannot be accessed or tampered with from the outside. Finally, since our calculator fiber is doing one thing at a time we are basically guaranteed to not have any race conditions when making “calls” to our calculator. Compare this to the “normal” implementation above, which will fail miserably once we try to call methods from multiple threads at once.

If we want to make the fiber’s interface a bit more like what we’re used to with our normal Ruby method calls, we can wrap our calculator fiber implementation with something akin to Erlang’s GenServer (generic server) behavior, as shown in the Polyphony repository. Our fiber-based calculator would then look something like this:

module Calculator
  module_function

  def initial_state
    0
  end

  def add(state, value)
    state += value
    # The first value is the return value, the second is the mutated state. In
    # our case, they are the same.
    [state, state]
  end

  def mul(state, value)
    state *= value
    [state, state]
  end
end

# start server with initial state
calculator = GenServer.start(Calculator)
calculator.add(3) #=> 3
calculator.mul(2) #=> 6

One important detail to understand about fiber messaging is that like with any API, the actual messages sent and received between fibers (which, if you recall, can be any Ruby object) need to be well defined. An abstraction such as the GenServer example shown above, can help with making those interfaces more convenient to use, but it is in no way obligatory. We can get by explicitly sending and receiving fiber messages.

Using fibers to encapsulate state - and fiber messaging to communicate between fibers - has an additional ramification: it guides the developer towards a more functional style of programming (the example above is a case in point.) You stop thinking in classes and objects, and think more in terms of methods and message passing. While Ruby is pretty good at doing both, in the last few years I’ve been personally gravitating towards a more functional programming style, and Polyphony does facilitate moving in that direction.

But let’s go back to our chat app. We’d like to implement the different entities in our program as fibers, and make them interact using fiber messaging. As noted above, if we want to use fiber messaging, we’ll need to have defined the different messages that are going to be sent between the different fibers, in other words the different interfaces those fibers will have. Before starting to write our implementation, let’s first define those.

Defining fiber interfaces

As we said, we have three kinds of entities: Telnet session, user, and room. Let’s figure out the responsibilities of each entity, and how those entities interact:

Let’s now define the shape of the different messages our chat entities should be able to handle. A room fiber needs to be able to handle the following events:

A user fiber should handle the following events:

The Telnet session fiber does not need to handle incoming messages, as its job is only to wait for lines of text to arrive on the socket, and send them to the corresponding user. The distinction between session and user is important, since those two entities have different responsibilities. The user fiber implements the business logic from the point of view of the user, dealing with notifications coming either from the room or the Telnet session. The Telnet session deals exclusively with receiving data on the corresponding TCP socket.

Now that we have defined the interactions and messages sent between the different parts of our app, let’s start writing code!

The Telnet session

We start writing our code with a straightforward implementation of a TCP server:

server = spin do
  server_socket = TCPServer.new('0.0.0.0', 1234)
  server_socket.accept_loop do |s|
    spin { handle_session(s) }
  end
end

We start by spinning up a server fiber that will run the TCP server. The server fiber creates a TCPServer instance for accepting connections. The #accept_loop method runs an infinite loop, waiting for connections to be accepted. For each accepted connection, we spin a separate fiber, calling #handle_session with the accepted connection. Let’s look at how #handle_session is implemented:

def handle_session(socket)
  socket << 'Please enter your name: '
  name = socket.gets.chomp
  socket.puts "Hello, #{name}!"
  user_fiber = spin { run_user(name, socket) }
  while (line = socket.gets.chomp)
    user_fiber << [:input, line]
  end
ensure
  user_fiber << [:close]
end

We start by asking the user for their name, then setup a fiber for the user, calling #run_user. Finally, we run a loop waiting for lines to arrive on our socket, and send each line to the user fiber.

The user fiber

Our user fiber will run a loop, waiting for and processing incoming messages:

def run_user(name, socket)
  current_room = nil
  loop do
    event, message = receive
    case event
    when :close
      break
    when :input
      case message
      when /\:enter\s+(.+)/
        leave_room(current_room, name) if current_room
        current_room = enter_room($1, name)
      when ':leave'
        leave_room(current_room, name) if current_room
      else
        say(current_room, name, message)
      end
    when :message
      socket.puts message
    end
  end
ensure
  leave_room(current_room, name) if current_room
end

We destructure incoming messages (received as an Array of the form [event, message]), then take the correct action according to the message received. Here are the rest of the user’s business logic, which consist of sending messages to the room the user has entered or left:

def leave_room(room_fiber, name)
  room_fiber << [:leave, name, Fiber.current]
end

def enter_room(room_name, name)
  room_fiber = find_room(room_name)
  room_fiber << [:enter, name, Fiber.current]
  room_fiber
end

def say(room_fiber, name, message)
  room_fiber << [:say, name, message]
end

The room

Finally, we get to the room entity, which manages a list of users and takes care of broadcasting messages received from individual users in the room. Let’s start with the #find_room method, which is used by users to find the fiber for the room they want to enter:

@room_fibers = {}
@main_fiber = Fiber.current

def find_room(room_name)
  @room_fibers[room_name] ||= @main_fiber.spin { run_room(room_name) }
end

Since #find_room is called in the context of the user fiber, we need to be careful about how we spin up the room fiber. We want our room fiber to not be limited to the lifetime of the user fiber (which will terminate when the user’s Telnet session closes,) and that means we cannot spin it directly from the user fiber. Instead, we spin it from the main fiber. Notice that the user fiber itself is spun from the Telnet session fiber, but since the user fiber should not outlive its Telnet session that is just fine.

(In a future article I’ll show a better way to manage fibers by organizing them into supervision trees, but for the sake of the present discussion the above solution is good enough).

Lets continue with the room implementation:

def run_room(room_name)
  @users = {}
  loop do
    event, *args = receive
    case event
    when :leave
      name, fiber = args
      @users.delete(args[1])
      broadcast(@users.keys, "#{args[0]} has left the room.")
      break if @users.empty?
    when :enter
      @users[args[1]] = true
      broadcast(@users.keys, "#{args[0]} has entered the room.")
    when :say
      broadcast(@users.keys, "#{args[0]}: #{args[1]}")
    end
  end
ensure
  @room_fibers.delete(room_name)
end

def broadcast(fibers, message)
  fibers.each { |f| f << [:message, message] }
end

The room fiber is very similar to the user fiber, in that it runs a loop waiting for events to be received. The different events are processed by updating the list of users and broadcasting the corresponding messages to all users.

Tying it all together

Now that we have implemented the different parts of the application, all that’s left is for the main fiber to wait for the server fiber to terminate (which will never arrive). We do that by calling Fiber#await:

server.await

Now that our program is complete, let’s run it (we can run two separate Telnet sessions from separate terminal windows):

sharon@nf1:~$ Telnet localhost 1234
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Please enter your name: sharon
Hello, sharon!
:enter foo
sharon has entered the room.
hi
sharon: hi
sylvain has entered the room.
sylvain: hi there!
hello
sharon: hello
...

Conclusion

We now have a fully functioning bare-bones chat app able to handle hundreds or even thousands of concurrent users, implemented in about 85 lines of code, and including a total of 8 methods: 1 for Telnet sessions, 4 for users, 3 for rooms. Our code is compact, easy to understand, and does not include any class definitions.

Furthermore, any state we need to keep track of (the current room for the user, and the list of users for each room) is conveniently held as local variables inside the relevant methods. As discussed above, we could have encapsulated our different entities (namely, users and rooms) as GenServer interfaces, but I’ll leave that as an exercise to the reader.

Also, note how fluid and idiomatic our code looks. Spinning up fibers takes no effort, and neither does fiber messaging. We just sprinkle our code with a bunch of spin receive and fiber << message and everything works concurrently.

There’s a lot to be said for designing concurrent programs as collections of autonomous actors, interacting using messages. Programming in this way requires a shift in how we think about the different entities in our program, and in how we get them to interact. I’ll continue exploring this subject in more detail in future articles.

You can find the complete code to the chat app here. Please feel free to contact me if you have any questions about this article or Polyphony in general.