noteflakes

And I dreamed I saw the bombers
Riding shotgun in the sky
And they were turning into butterflies
Above our nation

We are stardust
Billion year old carbon
We are golden
Caught in the devil's bargain
And we've got to get ourselves
back to the garden

Joni Mitchell

20·10·2021

Explaining Ruby Fibers

Fibers have long been a neglected corner of the Ruby core API. Introduced in Ruby version 1.9 as a coroutine abstraction, fibers have never really seemed to realize their promise of lightweight concurrency, and remain relatively little explored. In spite of attempts to employ them for achieving concurrency, most notably em-synchrony, fibers have not caught on. Hopefully, with the advent of the FiberScheduler interface introduced in Ruby 3.0, and libraries such as Async and Polyphony, this situation will change and fibers will become better known and understood, and Ruby developers will be able to use them to their full potential.

My aim in this article is to explain how fibers work from the point of view of a concurrent Ruby application written using Polyphony. I’ll give an overview of fibers as concurrency constructs, and discuss how Polyphony harnesses Ruby fibers in order to provide an idiomatic and performant solution for writing highly-concurrent Ruby apps. For the sake of simplicity, I’ll omit some details, which will be mentioned in footnotes towards the end of this article.

What’s a Fiber?

Most developers (hopefully) know about threads, but just what are fibers? Linguistically we can already tell they have some relation to threads, but what is the nature of that relation? Is it one of aggregation (as in “a thread is made of one or more fibers”,) or is it one of similitude (as in “a fiber is sort of like a thread but lighter/smaller/whatever”?) As we shall see, it’s a little bit of both.

A fiber is simply an independent execution context that can be paused and resumed programmatically. We can think of fibers as story lines in a book or a movie: there are multiple happenings involving different persons at different places all occurring at the same time, but we can only follow a single story line at a time: the one we’re currently reading or watching.

This is one of the most important insights I had personally when I started working with Ruby fibers: there’s always a currently active fiber. In fact, even if you don’t use fibers and don’t do any fiber-related work, your code is still running in the context of the main fiber for the current thread, which is created automatically by the Ruby runtime for each thread.

So a fiber is an execution context, and in that regard it’s kind of like a thread: it keeps track of a sequence of operations, and as such it has its own stack and instruction pointer. Where fibers differ from threads is that they are not managed by the operating system, but instead are paused, resumed, and switched between by the userspace program itself. This programmatic way to switch between different execution contexts is also called “cooperative multitasking”, in contrast to the way threads are being switched by the operating system, called “preemptive multitasking”.

Switching between Fibers

Here’s a short example of how switching between fibers is done¹. In this example we’re doing the fiber switching manually:

require 'fiber'

@f1 = Fiber.new do
  puts 'Hi from f1'
  @f2.transfer
  puts 'Hi again from f1'
end

@f2 = Fiber.new do
  puts 'Hi from f2'
  @f1.transfer
end

puts 'Hi from main fiber'
@f1.transfer
puts 'Bye from main fiber'

In the above example, we create two fibers. The main fiber then transfers control (“switches”) to fiber @f1, which then transfers control to @f2, which then returns control to @f1. When @f1 has finished running, control is returned automatically to the main fiber, and the program terminates. The program’s output will be:

Hi from main fiber
Hi from f1
Hi from f2
Hi again from f1
Bye from main fiber

One peculiarity of the way switching between fibers is done, is that the context switch is always initiated from the currently active fiber. In other words, the currently active fiber must voluntarily yield control to another fiber. This is also the reason why this model of concurrency is called “cooperative concurrency.”

Another important detail to remember is that a fiber created using Fiber.new always starts in a suspended state. It will not run unless you first switch to it using Fiber#transfer.

Controlling fiber state via the context switch

What I find really interesting about the design of Ruby fibers is that the act of pausing a fiber, then resuming it later, is seen as a normal method call from the point of view of the fiber being resumed: we make a call to Fiber#transfer, and that call will return only when the fiber who made that call is itself resumed using a reciprocal call to Fiber#transfer. The return value will be the value given as an argument to the reciprocal call, as is demonstrated in the following example:

ping = Fiber.new do |peer|
  loop do
    msg = peer.transfer('ping')
    puts msg
  end
end

pong = Fiber.new do |msg|
  loop do
    puts msg
    msg = ping.transfer('pong')
  end
end

ping.transfer(pong)

The block provided to Fiber.new can take a block argument which will be set to the value given to Fiber#transfer the first time the fiber is being resumed. We use this to set ping’s peer, and then use Fiber#transfer to pass messages between the ping and pong fibers.

This characteristic of Fiber#transfer has profound implications: it means that we can control a fiber’s state using the value we pass to it as we resume it. Here’s an example for how this could be done:

main = Fiber.current
f = Fiber.new do |msg|
  count = 0
  loop do
    case msg
    when :reset
      count = 0
    when :increment
      count += 1
    end
    puts "count = #{count}"
    msg = main.transfer
  end
end

3.times { |i| f.transfer(:increment) } # count = 3
f.transfer(:reset)                     # count = 0 
3.times { |i| f.transfer(:increment) } # count = 3

In the above example, the fiber f gets its state updated each time it is resumed. The main fiber can control f’s state by passing a special value when calling Fiber#transfer. As long as we have in place a well-defined convention for how the transferred value is interpreted by fibers in a given program, we can implement arbitrarily complex semantics for controlling our fibers’ state and life-cycle. Later on in this article we’ll see how Polyphony couples this mechanism with return values from blocking operations, as well as exceptions that permit us to cancel any long-running operation at any time.

Switching fibers on long-running operations

Now that we have a feel for how fibers can be paused and resumed, we can discuss how this can be used in conjunction with blocking operations. For the sake of our discussion, a blocking operation is any operation that potentially needs to wait for some external event to occur, such as a socket becoming readable, or waiting for a timer to elapse. We want to be able to have multiple concurrent tasks, each proceeding at its own pace, and each yielding control whenever it needs to wait for some external event to occur.

In order to demonstrate how this might work, let’s imagine a program where we have two fibers: one waits for data to arrive on STDIN, then echoes it; and a second fiber that prints the time once a second:

@echo_printer = Fiber.new do
  loop do
    if STDIN.readable?
      msg = STDIN.readpartial(1024)
      puts msg
    end
    @time_printer.transfer
  end
end

@time_printer = Fiber.new do
  timer = Timer.new(1)
  loop do
    if timer.elapsed?
      puts "Time: #{Time.now}"
      timer.reset
    end
    @echo_printer.transfer
  end
end

In each of the above fibers, we have have a condition that tells us if the fiber can proceed: for @echo_printer the condition is STDIN.readable?; for @time_printer it’s timer.elapsed?. What’s notable though about this example that the switching between fibers is done explicitly, and that each fiber needs to check a condition continually. Of course, this is not ideal, since the two fibers will just pass control between them until one of the conditions is met and actual work is done. If you run such a program, you’ll see one of your CPU cores saturated. But the main insight to draw here is that each fiber can yield control to another fiber if it cannot go on doing actual work.

Automatic fiber switching using an event reactor

Let’s see if we can avoid endlessly checking for readiness conditions, by using an event reactor - a piece of software that lets you subscribe to specific events, most importantly I/O readiness, and timers. In this case we’ll be using ever, a tiny Ruby gem that I wrote a few months ago, implementing an event reactor for Ruby apps based on libev. Here’s our rewritten example:

require 'ever'
require 'fiber'

evloop = Ever::Loop.new
@reactor = Fiber.current

@echo_printer = Fiber.new do
  loop do
    msg = STDIN.readpartial(1024)
    puts msg
    @reactor.transfer
  end
end

@time_printer = Fiber.new do
  loop do
    puts "Time: #{Time.now}"
    @reactor.transfer
  end
end

# register interest in events
evloop.watch_io(@echo_printer, STDIN, false, false)
evloop.watch_timer(@time_printer, 1, 1)

# run loop
evloop.each { |fiber| fiber.transfer }

As you can see, our fibers no longer need to check for readiness. The reactor, after registering interest in the respective event for each fiber, runs the event loop, and when an event becomes available, the corresponding fiber is resumed. Once resumed, each fiber does its work, then yields control back to the reactor.

This is already a great improvement, since our program does not need to endlessly check for readiness, and the code for each of the fibers doing actual work looks almost normal. The only sign we have of anything “weird” going on is that each fiber needs to yield control back to the reactor by calling @reactor.transfer.

If we look more closly at the above program, we can describe what’s actually happening as follows:

@echo_printer = Fiber.new do
  loop do
    msg = STDIN.readpartial(1024)
    puts msg

    wait_for_events if queued_events.empty?
    queued_events.each { |e| e.fiber.transfer }
  end
end

@time_printer = Fiber.new do
  loop do
    puts "Time: #{Time.now}"
    
    wait_for_events if queued_events.empty?
    queued_events.each { |e| e.fiber.transfer }
  end
end

...

For each of our fiber, at any point where the fiber needs to wait, we first look at our list of queued events. If there are none, we wait for events to occur. Finally we proceed to handle those events by transferring control to each respective fiber. This needs to be done for each blocking or long-running operation: reading from a file, reading from a socket, writing to a socket, waiting for a time period to elapsed, waiting for a process to terminate, etc. What if we had a tool that could automate this for us? This is where Polyphony enters the stage.

Using Polyphony for fiber-based concurrency

Polyphony automates all the different aspects of fiber switching, which boils down to knowing which fiber should be running at any given moment. Let’s see how Polyphony solves the problem of fiber switching by using it to rewrite the example above:

require 'polyphony'

echo_printer = spin do
  loop do
    msg = STDIN.read
    puts msg
  end
end

time_printer = spin do
  loop do
    sleep 1
    puts "Time: #{Time.now}"
  end
end

Fiber.await(echo_printer, time_printer)

As our rewritten example shows, we got completely rid of calls to Fiber#transfer. The fiber switching is handled automatically and implicitly by Polyphony². Each fiber is an autonomous infinite loop made of a sequence of operations that’s simple to write and simple to read. All the details of knowing when STDIN is ready or when a second has elapsed, and which fiber to run at any moment, are conveniently taken care of by Polyphony.

Even more importantly, Polyphony offers a fluent, idiomatic API that mostly gets out of your way and feels like an integral part of the Ruby core API. Polyphony introduces the #spin global method, which creates a new fiber and schedules it for running as soon as possible (remember: fibers start their life in a suspended state.) The call to Fiber.await means that the main fiber, from which the two other fibers were spun, will wait for those two fibers to terminate. Because both echo_printer and time_printer are infinite loops, the main fiber will wait forever.

Now that we saw how Polyphony handles behind the scenes all the details of switching between fibers, let’s examine how Polyphony does that.

The fiber switching dance

A fiber will spend its life in one of three states: :running, :waiting or :runnable³. As discussed above, only a single fiber can be :running at a given moment (for each thread). When a fiber needs to perform a blocking operation, such as reading from a file descriptor, it makes a call to the Polyphony backend associated with its thread, which performs the actual I/O operation. The backend will submit the I/O operation to the OS (using the io_uring interface,) and will switch to the first fiber pulled from the runqueue, which will run until it too needs to perform a blocking operation, at which point another fiber switch will occur to the next fiber pulled from the runqueue.

Meanwhile, our original fiber has been put in the :waiting state. Eventually, the runqueue will be exhausted, which means that all fibers are waiting for some event to occur. At this point, the Polyphony backend will check whether any of the currently ongoing I/O operation have completed. For each completed operation, the corresponding fiber is “scheduled” by putting it on the runqueue, and the fiber transitions to the :runnable state. The runnable state means that the operation the fiber was waiting for has been completed (or cancelled), and the fiber can be resumed.

Once all completed I/O operations have been processed, the backend performs a fiber switch to the first fiber available on the runqueue, the fiber transitions back to the :running state, and the whole fiber-switching dance recommences.

What’s notable about this way of scheduling concurrent tasks is that Polyphony does not really have an event loop that wraps around your code. Your code does not run inside of a loop. Instead, whenever your code needs to perform some blocking operation, the Polyphony backend starts the operation, then switches to the next fiber that is :runnable, or ready to run.

The runqueue

The runqueue is simply a FIFO queue that contains all the currently runnable fibers, that is fibers that can be resumed. Let’s take our last example and examine how the contents of the runqueue changes as the program executes:

runqueue #=> []

echo_printer = spin { ... }
runqueue #=> [echo_printer]

time_printer = spin { ... }
runqueue #=> [echo_printer, time_printer]

# at this point the two fibers have been put on the runqueue and will be resumed
# once the current (main) fiber yields control:
Fiber.await(echo_printer, time_printer)

# while the main fiber awaits, the two fibers are resumed. echo_printer will
# wait for STDIN to become readable. time_printer will wait for 1 second to
# elapse.

# The runqueue is empty.
runqueue #=> []

# Since there's no runnable fiber left, the Polyphony backend will wait for
# io_uring to generate at least one completion entry. A second has elapsed, and
# time_printer's completion has arrived. The fiber becomes runnable and is put
# back on the runqueue.
runqueue #=> [time_printer]

# Polyphony pulls the fiber from the runqueue and switches to it. The time is
# printed, and time_printer goes back to sleeping for 1 second. The runqueue is
# empty again:
runqueue #=> []

# The Polyphony backend waits again for completions to occur. The user types a
# line and hits RETURN. The completion for echo_printer is received, and
# echo_printer is put back on the runqueue:
runqueue #=> [echo_printer]

# Polyphony pulls the fiber from the runqueue and switches to it.
runqueue #=> []

...

While the runqueue was represented above as a simple array of runnable fibers, its design is actually much more sophisticated than that. First of all, the runqueue is implemented as a ring buffer in order to achieve optimal performance. The use of a ring buffer algorithm results in predictable performance characteristics for both adding and removing of entries from the runqueue. When adding an entry to a runqueue that’s already full, the underlying ring buffer is reallocated to twice its previous size. For long running apps, the runqueue size will eventually stabilize around a value that reflects the maximum number of currently runnable fibers in the process.

In addition, each entry in the runqueue contains not only the fiber, but also the value with which it will be resumed. If you recall, earlier we talked about the fact that we can control a fiber’s state whenever we resume it by passing it a value using Fiber#transfer. The fiber will receive this value as the return value of its own previous call to Fiber#transfer.

In Polyphony, each time a fiber is resumed, the return value is checked to see if it’s an exception. In case of an exception, it will be raised in the context of the resumed fiber. Here’s an excerpt from Polyphony’s io_uring backend that implements the global #sleep method:

VALUE Backend_sleep(VALUE self, VALUE duration) {
  Backend_t *backend;
  GetBackend(self, backend);

  VALUE resume_value = Qnil;
  io_uring_backend_submit_timeout_and_await(backend, NUM2DBL(duration), &resume_value);
  RAISE_IF_EXCEPTION(resume_value);
  RB_GC_GUARD(resume_value);
  return resume_value;
}

In the code above, we first submit an iouring timeout entry, then yield control (by calling Fiber#transfer on the next runnable fiber) and await its completion. When the call to io_uring_backend_submit_timeout_and_await returns, our fiber has alreadey been resumed, with resume_value holding the value returned from our call to Fiber#transfer. We use RAISE_IF_EXCEPTION to check if resume_value is an exception, and raise it in case it is.

It’s also important to note the resume values stored alongside fibers in the runqueue can be used in effect to control a fiber’s state, just like in bare bones calls to Fiber#transfer. This can be done using the Polyphony-provided Fiber#schedule method, which puts the fiber on the runqueue, along with the provided resume value:

lazy = spin do
  loop do
    msg = suspend
    puts "Got #{msg}"
  end
end

every(1) { lazy.schedule('O hi!') }

In the example above, our lazy fiber suspends itself (using #suspend, which puts it in a :waiting state), and the main fiber schedules it once every second along with a message. The lazy fiber receives the message as the return value of the call to #suspend. One important difference between Fiber#schedule and Fiber#transfer is that Fiber#schedule does not perform a context switch. It simply puts the fiber on the runqueue along with its resume value. The fiber will be resumed as soon as all previous runnable fibers have been resumed and have yielded control.

Yielding control

As explained above, blocking operations involve submitting the operation to the io_uring interface, and then yielding (or #transferring) control to the next runnable fiber. A useful metaphor here is the relay race: only a single person runs at any given time (she holds the baton,) and eventually the runner will pass the baton to the next person who’s ready to run. Let’s examine what happens during this “passing of the baton” in a little more detail.

In effect, once the I/O operation has been submitted, the Polyphony backend calls the backend_base_switch_fiber function, which is responsible for this little ceremony, which consists of the following steps:

  1. Shift the first runqueue entry from the runqueue.
  2. If the entry is not nil:
    • Check if it’s time to do a non-blocking poll (in order to prevent starvation of I/O completions. See discussion below.)
    • proceed to step 4.
  3. Otherwise:
    • Perform a blocking poll.
    • Go back to step 1.
  4. Transfer control to the entry’s fiber with the entry’s resume value using Fiber#transfer.

All this happens in the context of the fiber that yields control, until a context switch is performed in step 4. To reuse our “relay race” metaphor, each time the current runner wishes to pass the baton to the next one, it’s as if she has a little gadget in her hand that holds the baton, performs all kinds of checks, finds out who the next runner is, and finally hands it over to the next runner.

Polling for I/O completions

When there are no runnable fibers left, Polyphony polls for at least one io_uring completion to arrive. For each received completion, the corresponding fiber is scheduled by putting it on the runqueue. Once the polling is done, the first fiber is pulled off the runqueue and is then resumed.

As we saw, polling for completions is only perofrmed done when the runqueue is empty. But what about situations where the runqueue is never empty? Consider the following example:

@f1 = spin { loop { @f2.schedule; puts 'f1' } }
@f2 = spin { loop { @f1.schedule; puts 'f2' } }

Fiber.await(@f1, @f2)

Each of the fibers above will run in an infinite loop, scheduling its peer and then printing a message. As shown above, the call to #puts, being an I/O operation, causes the Polyphony backend to submit the write operation to the io_uring interface, and then perform a context switch to the next runnable fiber. In order for the call to #puts to return, the Polyphony backend needs to poll for completions from the io_uring interface. But, since the runqueue is never empty (both fibers are scheduling each other, effectively adding each other to the runqueue,) the runqueue will never be empty!

In order to be able to deal with such circumstances, and prevent the “starvation” of completion processing, the Polyphony backend periodically performs a non-blocking check for any received completions. This mechanism assures that even in situations where our application becomes CPU-bound (since there’s always some fiber running!) we’ll continue to process io_uring completions, and our entire process will continue to behave normally.

Implications for performance

The asynchronous nature of the io_uring interface has some interesting implications for the performance of Polyphony’s io_uring backend. As mentioned above, the submission of SQE’s is deferred, and performed either when a certain number of submissions have acumulated, or before polling for completions.

In Polyphony, runnable fibers are always prioritized, and polling for events is done mostly when there are no runnable fibers. Theoretically, this might have a negative impact on latency, but I have seen the io_uring backend achieve more than twice the throughput achieved by the libev backend. Tipi, the Polyphony-based web server I’m currently developing, boasts a request rate of more than 138K requests/second at the time of this writing.

In fact, Polyphony provides multiple advantages over other concurrency solutions: The number of I/O-related syscalls is minimized (by using the io_uring interface.) In addition, the use Fiber#transfer to transfer control directly to the next runnable fiber halves the number of context-switches when compared to using Fiber#resume/Fiber.yield.

Most importantly, by prioritizing runnable fibers over processing of I/O completions (with an anti-starvation mechanism as described above,) Polyphony lets a Ruby app switch easily between I/O-bound work and CPU-bound work. For example, when a Polyphony-based web server receives 1000 connections in the space of 1ms, it needs to perform a lot of work, setting up a fiber, a parser and associated state for each connection. The design of Polyphony’s scheduling system allows the web server to do this burst of hard work while deferring any I/O work for later submission. When this CPU-bound burst has completed, the web server fires all of its pending I/O submissions at once, and can proceed to check for completions.

Cancellation

Now that we have a general idea of how Polyphony performs fiber switching, let’s examine how cancellation works in Polyphony. We want to be able to cancel any ongoing operation at any time. This can be done for any reason: a timeout has elapsed, an exception has occurred in a related fiber, or business logic that requires cancelling some specific operation in specific circumstances. The mechanism for doing this is simple, as mentioned above: we use an exception as the resume value that will be transferred to the fiber when the context switch occurs.

Here’s a simple example that shows how cancellation looks from the app’s point of view:

def gets_with_timeout(io, timeout)
  move_on_after(timeout) { io.gets }
end

The #move_on_after global API sets up a timer, then runs the given block. If the timer has elapsed before the block has finished executing, the fiber is scheduled with a Polyphony::MoveOn exception, any ongoing I/O operation is cancelled by the backend, the exception is caught by #move_on_after and an optional cancellation value is returned.

If we want to generate an exception on timeout, we can instead use the #cancel_after API, which will schedule the fiber with a Polyphony::Cancel exception, and the exception will have to be caught by the app code:

def gets_with_timeout(io, timeout)
  cancel_after(timeout) { io.gets }
rescue Polyphony::Cancel
  "gets was cancelled!"
end

More ways to control fiber execution

In addition to the various APIs discussed above, here’s a list of some of the various APIs used for controlling fiber execution:

Conclusion

Polyphony has been built to harness the full power of Ruby fibers, and provide a solid and joyful experience for those wishing to write highly-concurrent Ruby apps. There are many subtleties to designing a robust, well-behaving fiber-based concurrent environment. For example, there’s the problem of forking from arbitrary fibers, or the problem of correctly handling signals. Polyphony aims to take care of all these details, in order to be able to handle a broad range of applications and requirements.

I hope this article has helped clear up some of the mystery and misconceptions about Ruby fibers. Please let me know if you have specific questions about fibers in general or Polyphony in particular, and feel free to browse Polyphony’s source code. Contributions will be gladly accepted!

Footnotes

1. In this article we’ll confine ourselves to using the Fiber#transfer API for fiber switching, which is better suited for usage with symmetric coroutines. Using the Fiber.yield/Fiber#resume API implies an asymmetric usage which is better suited for fibers used as generators or iterators. Sadly, most articles dealing with Ruby fibers only discuss the latter, and make no mention of the former. Please note that in order to use Fiber#transfer we first need to require 'fiber'.

2. I’ve mentioned above the Fiber Scheduler interface introduced in Ruby 3.0, based on the work of Samuel Williams. This new feature, baked into the Ruby core, has roughly the same capabilities as Polyphony when it comes to automatically switching between fibers based on I/O readiness. As its name suggests, this is just a well-defined interface. In order to be able to employ it in your Ruby app, you’ll need to use an actual fiber scheduler. At the moment, the following fiber schedulers are available, in different states of production-readiness: evt, event, and my own libev-scheduler.

3. A fourth state, :dead is used for fibers that have terminated. A fiber’s state can be interrogated using Fiber#state.

4. Polyphony also includes an alternative backend used on non-Linux OSes or older Linux kernels. Both backends have the same capabilities. In this article we’ll discuss only the io_uring backend.

14·10·2021

Embracing Infinite Loops with Ruby and Polyphony

In this article I’ll discuss the use of infinite loops as a major construct when writing concurrent apps in Ruby using Polyphony. I’ll show how infinite loops differ from normal, finite ones; how they can be used to express long-running tasks in a concurrent environment; and how they can be stopped.

Polyphony is a library for writing highly concurrent Ruby apps. Polyphony harnesses Ruby fibers and a powerful io_uring-based I/O runtime to provide a solid foundation for building high-performance concurrent Ruby apps.

In the last few months I’ve been slowly transitioning from working on Polyphony-designing APIs and adding functionality-to using to develop actual applications, some of them open source, and others closed source production apps for my clients.

In the process of actually using Polyphony as the basis for writing concurrent apps, I’ve discovered some patterns that I’d like to share. It’s really fascinating how the design of an API can impact the patterns that emerge in the application code. Take for example loops.

Developers that are used to asynchronous APIs will probably find the idea of writing loops in your app code anathema to asynchronous design: there’s only one loop - the main event loop - and it is that loop which drives your code. You just provide callbacks to be called at the right moment.

By contrast, with Polyphony the app code is written in a sequential style, and it is the app code that is in control. There is no event loop. Instead, you can create any number of fibers, all executing concurrently, with each of those fibers proceeding independently of the others.

But loops come into play when you want to launch autonomous long-running tasks, like listening for incoming connections on a TCP socket, pulling items from a queue and processing them, or periodically running some background task. Infinite loops are what makes it possible to “fire-and-forget” those concurrent processes.

Loops are everywhere!

Loops are one of the most useful ways to control execution. Loops are used anywhere you need to repeat an operation, and can be expressed in a variety of ways, from the lowly GOTO, through plain for and while loops, all the way to Ruby’s elegant #each and related methods, which take a block and apply it to items from some iterable object. While those don’t necessarily look like loops, they are, in fact, loops:

# this is a loop
while (item = queue.shift)
  item.process
end

# this is also a loop
queue.each { |i| i.process }

Infinite loops

Inifinite loops are loops that run indefinitely. A loop can be inadvertently infinite if the loop logic is faulty, but loops can also be infinite by design. Infinite loops are made for running autonomous, long-lived tasks that can run any number of iterations, and are not meant to be stopped conditionally. Here are some examples:

# Accept incoming connections:
loop do
  socket = server.accept
  handle_client_connection(socket)
end

# Process items from a queue:
loop do
  item = queue.shift
  process(item)
end

As the example above shows, Ruby provides the very useful #loop method which lets us express infinite loops in a clear and concise manner. Looking at the code we can immediately tell that we’re dealing with an infinite loop.

What’s important to note about infinite loops is that they can include a mechanism for breaking out of the loop if a certain condition is met. In fact, sometimes the distinction between a finite loop and an infinite one is not that clear.

Take for example a loop for handling an HTTP client connection. It needs to run for the life time of the connection, which can last for any duration and for any number of HTTP requests. In this case, this might look like an infinite loop, but it will include a conditional break:

# using h1p for parsing HTTP/1
def handle_client_connection(socket)
  parser = H1P::Parser.new(socket)
  loop do
    headers = parser.parse_headers # returns nil when socket is closed
    break unless headers
    
    body = parser.read_body
    handle_request(headers, body)
  end
end

Another way to express the same logic, which makes it look like a normal finite loop, is like this:

def handle_client_connection(socket)
  parser = H1P::Parser.new
  while (headers = parser.parse_headers)
    body = parser.read_body
    handle_request(headers, body)
  end
end

Concurrent infinite loops

What’s interesting about infinite loops is that once they start, theoretically they will go on forever! In Polyphony you can start any number of infinite loops, each running in its own fiber. Polyphony does the hard work of switching between all those fibers, letting each fiber proceed at its own pace once the operation it was waiting for has completed: reading from or writing to a socket, waiting for an item to become available on a queue, etc. To do this, we use the #spin global method provided by Polyphony, which spins up new fibers:

item_processor = spin do
  loop do
    item = item_queue.shift
    process(item)
  end
end

http_server = spin do
  server = TCPServer.new('0.0.0.0', 1234)
  loop do
    socket = server.accept
    # each client runs in its own fiber
    spin { handle_http_client(socket) }
  end
end

Fiber.await(item_processor, http_server)

In the above example, we start a fiber for processing items from a queue, and along side it an HTTP server. Each of those is implemented using an infinite loop running on a separate fiber. Finally, the main fiber waits for those two fibers to terminate. While the main fiber waits, Polyphony takes care of running the item processor and the HTTP server, with each fiber proceeding at its own pace as items are pushed into the queue, and as incoming HTTP connections are being accepted.

Interrupting an infinite loop

As we saw above, starting an inifinite loop on a separate fiber is really easy, but how do you interrupt one? Polyphony provides us with some tools for interrupting fibers at any time. We can do that by scheduling the specific fiber with an exception, which might be a normal exception, or one of the special exceptions that Polyphony provides for controlling fibers.

In order to stop a fiber running an infinite loop, we can call Fiber#stop:

item_processor = spin do
  loop do
    item = item_queue.shift
    process(item)
  end
end

# tell the item_processor to stop
item_processor.stop

# then wait for it to terminate
item_processor.await

Under the hood, Fiber#stop schedules the fiber with a Polyphony::MoveOn exception, which means that the fiber should just terminate at the earliest occasion, without the exception bubbling further up the fiber hierarchy.

As the example above shows, telling a fiber to stop does not mean it will do so immediately. We also need to properly wait for it to terminate, which we do by calling item_processor.await or Fiber.await(item_processor). As discussed above, stopping a fiber is done by scheduling it with a special exception that tells it to terminate. The terminated fiber will then proceed to terminate any child fibers it has, and perform other cleanup. This also means that you can use normal ensure blocks in order to perform cleanup. Let’s rewrite our item processor to process items by sending them in JSON format over a TCP connection:

item_processor = spin do
  soket = TCPSocket.new(PROCESSOR_HOSTNAME, PROCESSOR_PORT)
  loop do
    item = item_queue.shift
    socket.puts item.to_json
  end
ensure
  socket.close
end

item_processor.await
# the socket is now guaranteed to be closed

When the item_processor fiber is terminated, the ensure block guarantees that the socket used for sending items is closed before the call to item_processor.await returns.

More ways to stop a fiber

In addition to the Fiber#stop method, Polyphony has other APIs that can be used to stop a fiber in a variety of ways, including by raising an exception in the fiber’s context, and gracefully terminating a fiber. Termnating a fiber with an exception is done using Fiber#raise. This is especially useful when you need to implement your own error states:

my_fiber.raise(FooError, 'bar')

A graceful termination can be done using Fiber#terminate which takes an optional boolean flag. This requires a bit more logic in the fiber itself:

item_processor = spin do
  soket = TCPSocket.new(PROCESSOR_HOSTNAME, PROCESSOR_PORT)
  loop do
    item = item_queue.shift
    socket.puts item.to_json
  end
ensure
  if Fiber.current.graceful_shutdown?
    move_on_after(10) do
      wait_for_inflight_items
    end
  end
  socket.close
end

# terminate gracefully
item_processor.terminate(true)

In the example above, we added logic in the ensure block that waits up to 10 seconds for all inflight items to be processed, then proceeds with closing the TCP socket.

(We’ll take a closer look at exception handling and fiber termination in a future article.)

Polyphony 💛 Loops

Polyphony not only makes it easy to start and stop concurrent infinite loops, but it also further embraces loops by providing a bunch of loop APIs, including:

In the future Polyphony will include even more #xxx_loop APIs that will provide more concise ways to express loops along with better performance.

Polyphony is just plain Ruby

Looking at all the above examples, you will have noticed how the Polyphony API really looks baked into the Ruby language, as if it was part of the Ruby core. One of my principal design goals was to minimize boilerplate code when expressing concurrent operations. There’s no instantiating of special objects, no weird mechanisms for controlling fibers or rescuing exceptions. It just looks like plain Ruby! This makes it easier to both write and read concurrent code.

Conclusion

In this article I’ve showed you how infinite loops can be used to express long-running concurrent tasks using Polyphony. Polyphony provides all the tools needed for controlling the execution of concurrent fibers. For more information about Polyphony you can go to the Polyphony website. You can also browse the examples in the Polyphony repository.

05·10·2021

A Compositional Approach to Optimizing the Performance of Ruby Apps

Ruby has long been derided as a slow programming language. While this accusation has some element of truth to it, successive Ruby versions, released yearly, have made great strides in improving Ruby’s performance characteristics.

In addition to all the iterative performance improvements - Ruby 2.6 and 2.7 were especially impressive in terms of performance gains - recent versions have introduced bigger features aimed at improving performance, namely: a JIT compiler, the Ractor API for achieving parallelism, and the Fiber scheduler interface aimed at improving concurrency for I/O bound applications.

While those three big developments have yet to prove themselves in real-life Ruby apps, they represent great opportunities for improving the performance of Ruby-based apps. The next few years will tell if any of those new technologies will deliver on its promise for Ruby developers.

While Ruby developers (especially those working in and around Ruby on Rails) are still looking for the holy grail of Ruby fastness, I’d like to explore a different way of thinking about developing Ruby apps (and gems) that can be employed to achieve optimal performance.

Ruby Makes Developers Happy, Until it Comes to Performance…

Why do I love programming in Ruby? First of all, because it’s optimized for developer happiness. Ruby is famous for allowing you to express your ideas in any number of ways. You can write functional programs, or go all in on Java-like enterprise OOP patterns. You can build DSLs, and you can replace or extend whole chunks of the Ruby core functionality by redefining core class methods. And metaprogramming in Ruby lets you do nuclear stuff!

All this comes, of course, with a price tag in the form of reduced performance when compared to other, less magical, programming languages, such as Go, C or C++. But experienced Ruby developers will have already learned how to get the most “bang for the buck” out of Ruby, by carefully designing their code so as to minimize object allocations (and subsequent GC cycles), and picking core Ruby and Stdlib APIs that provide better performance.

Another significant way Ruby developers have been dealing with problematic performance is by using Ruby C-extensions, which implement specific functionalities in native compiled code that can be invoked from plain Ruby code.

Compositional Programming in Ruby

It has occurred to me during my work on Polyphony, a concurrency library for Ruby, that a C-extension API can be designed in such a way as to provide a small, task-specific execution layer for small programs composed of multiple steps that can be exspressed as data structures using plain Ruby objects. Let me explain using an example.

Let’s say we are implementing an HTTP server, and we would like to implement sending a large response using chunked encoding. Here’s how we can do this in Ruby:

def send_chunked_encoding(data, chunk_size)
  idx = 0
  len = data.bytesize
  while idx < len
    chunk = data[idx...(idx += chunk_size)]
    @socket << "#{chunk.bytesize.to_s(16)}\r\n#{chunk}\r\n"
  end
  # send empty chunk
  @socket << "0\r\n\r\n"
end

This is pretty short and sweet, but look how we’re allocating a string for each chunk and doing index arythmetic in Ruby. This kind of code surely could be made more efficient by reimplementing it as a C-extension. But if we already go to the trouble of writing a C-extension, we might want to generalize this approach, so we might be able to implement sending chunked data over other protocols as well.

What if we could come up with a method implemented in C, that takes a description of what we’re trying to do? Suppose we have a method with the following interface:

def send_data_in_chunks(
    data,
    chunk_size,
    chunk_head,
    chunk_tail
  )
end

We could then implement HTTP/1 chunked encoding by doing the following:

def send_chunked_encoding(data, chunk_size)
  @socket.send_data_in_chunks(
    data,
    chunk_size,
    ->(len) { "#{len.to_s(16)}\r\n" }, # chunk size + CRLF
    "\r\n"                             # trailing CRLF
  )
end

If the #send_data_in_chunks method is implemented in C, this means that Ruby code is not involved at all in the actual sending of the data. The C-extension code is responsible for looping and writing the data to the socket, and the Ruby code just provides instructions for what to send before and after each chunk.

Polyphony’s chunked splicing API

The above approach is actually how static file responses are generated in Tipi, the web server for Ruby I’m currently developing. One of Tipi’s distinguishing features is that it can send large files without ever loading them into memory, by using Polyphony’s Backend#splice_chunks API (Polyphony emulates splicing on non-Linux OSes). Here’s an excerpt from Tipi’s HTTP/1 adapter code:

def respond_from_io(request, io, headers, chunk_size = 2**14)
  formatted_headers = format_headers(headers, true, true)
  request.tx_incr(formatted_headers.bytesize)

  Thread.current.backend.splice_chunks(
    io,
    @conn,
    formatted_headers,
    "0\r\n\r\n",
    ->(len) { "#{len.to_s(16)}\r\n" },
    "\r\n",
    chunk_size
  )
end

The Backend#splice_chunks method is slightly more sophisticated than the previous example, as it also takes a string to send before all chunks (here it’s the HTTP headers), and a string to send after all chunks (the empty chunk string "0\r\n\r\n"). My non-scientific benchmarks have shown speed gains of up to 64% for multi-megabyte HTTP responses!

The main idea behind the #splice_chunks API is that the application provides a plan, or a program for what to do, and the underlying system “runs” that program.

Chaining multiple I/O operations in a single Ruby method call

A similar approach was also used to implement chaining of multiple I/O operations, a feature particularly useful when running on recent Linux kernels with io_uring (Polyphony automatically uses io_uring starting from Linux version 5.6.) Here again, the same idea is employed - the application provides a “program” expressed using plain Ruby objects. Here’s how chunked transfer encoding can be implemented using Backend#chain (when splicing a single chunk from an IO instance):

def send_chunk_from_io(io, chunk_size)
  r, w = IO.pipe
  len = w.splice(io, chunk_size)
  if len > 0
    Thread.current.backend.chain(
      [:write, @conn, "#{len.to_s(16)}\r\n"],
      [:splice, r, @conn, len],
      [:write, @conn, "\r\n"]
    )
  else
    @conn.write("0\r\n\r\n")
  end
  len
end

Let’s take a closer look at the call to #chain:

Thread.current.backend.chain(
  [:write, @conn, "#{len.to_s(16)}\r\n"],
  [:splice, r, @conn, len],
  [:write, @conn, "\r\n"]
)

The Backend#chain API takes one or more Ruby arrays each an I/O operation. The currently supported operations are :write, :send and :splice. For each operation we provide the operation type followed by its arguments. The most interesting aspect of this API is that it allows us to reap the full benefits of using io_uring, as the given operations are linked so that they will be performed by the kernel one after the other without the Ruby code ever being involved! The #chain method will return control to the Ruby layer once all operations have been performed by the kernel.

Designing Compositional APIs

This approach to API design might be called compositional APIs - the idea here is that the API provides a way to compose multiple tasks or operations by describing them using native data structures.

Interestingly enough, io_uring itself takes this approach: you describe I/O operations using SQEs (submission queue entries), which are nothing more than C data structures conforming to a standard interface. In addition, as mentioned above, with io_uring you can chain multiple operations to be performed one after another.

Future plans for io_uring include making it possible to submit eBPF programs for running arbitrary eBPF code kernel side. That way, we might be able to implement chunked encoding in eBPF code, and submit it to the kernel using io_uring.

A More General Approach to Chaining I/O operations

It has recently occurred to me that the compositional approach to designing APIs can be further enhanced and generalized, for example by providing the ability to express flow control. Here’s how the chunk splicing functionality might be expressed using such an API:

def respond_from_io(request, io, headers, chunk_size = 2**14)
  formatted_headers = format_headers(headers, true, true)
  r, w = IO.pipe

  Thread.backend.submit(
    [:write, @conn, formatted_headers],
    [:loop,
      [:splice, io, w, chunk_size],
      [:break_if_ret_eq, 0],
      [:store_ret, :len], # store the return code in @len
      [:write, @conn, ->(ret) { "#{ret.to_s(16)}\r\n" }],
      [:splice, r, @conn, :len], # use stored @len value
      [:write, @conn, "\r\n"]
    ],
    [:write, @conn, "0\r\n\r\n"]
  )
end

Now there are clearly a few problems here: this kind of API can quickly run into the problem of Turing-completeness - will developers be able to express any kind of program using this API? Where are the boundaries and how do we define them?

Also, how can we avoid having to allocate all those arrays every time we call the #respond_from_io method? All those allocations can put more pressure on the Ruby GC, and themselves can be costly in terms of performance. And that proc we provide - it’s still Ruby code that needs to be called for every iteration of the loop. That too can be costly to performance.

The answers to all those questions are still not clear to me, but one solution I thought about was to provide a “library” of operation types that is a bit higher-level than a simple write or splice. For example, we can come up with an operation to write the chunk header, which can look something like this:

Thread.backend.submit(
  [:write, @conn, formatted_headers],
  [:loop,
    [:splice, io, w, chunk_size],
    [:break_if_ret_eq, 0],
    [:store_ret, :len],
    [:write_cte_chunk_size, @conn, :len],
    [:splice, r, @conn, :len],
    [:write, @conn, "\r\n"]
  ],
  [:write, @conn, "0\r\n\r\n"]
)

Adding IO References

Another improvement we can make is to provide a way to reference io instances and dynamic strings from our respond_from_io “program” using indexes. This will allow us to avoid allocating all those arrays on each invocation:

# program references:
# 0 - headers
# 1 - io
# 2 - @conn
# 3 - pipe r
# 4 - pipe w
# 5 - chunk_size
RESPOND_FROM_IO_PROGRAM = [
  [:write, 2, 0],
  [:loop,
    [:splice, 1, 4, 5],
    [:break_if_ret_eq, 0],
    [:store_ret, :len],
    [:write_cte_chunk_size, 2, :len],
    [:splice, 3, 2, :len],
    [:write, 2, "\r\n"]
  ],
  [:write, 2, "0\r\n\r\n"]
]

def respond_from_io(request, io, headers, chunk_size = 2**14)
  formatted_headers = format_headers(headers, true, true)
  r, w = IO.pipe
  Thread.backend.submit(RESPOND_FROM_IO_PROGRAM, formatted_headers, io, @conn, r, w)
end

Creating IO Programs Using a DSL

Eventually, we could provide a way for developers to express IO programs with a DSL, instead of with arrays. We could then also use symbols for representing IO indexes:

RESPOND_FROM_IO_PROGRAM = Polyphony.io_program(
  :headers, :io, :conn, :pipe_r, :pipe_w, :chunk_size
) do
  write :conn, :headers
  io_loop do
    splice :io, :pipe_w, :chunk_size
    break_if_ret_eq 0
    store_ret :len
    write_cte_chunk_size :conn, :len
    splice :pipe_r, :conn, :len
    write :conn, "\r\n"
  end
  write :conn, "0\r\n\r\n"
end

Does this look better? I’m not sure. Anyways, there are some rough edges here that will need to be smoothed out for this approach to work.

Implementing a Protocol Parser Using the Compositional Approach

It as occurred to me that this kind of approach, expressing a “program” using plain Ruby objects, to be executed by a C-extension, could also be applied to protocol parsing. I’ve recently released a blocking HTTP/1 parser for Ruby, called h1p, implemented as a Ruby C extension, and I had some ideas about how this could be done.

We introduce a IO#parse method that accepts a program for parsing characters. The program expressed includes a set of steps, each one reading consecutive characters from the IO instance:

# for each part of the line we can express the valid range of lengths, 
REQUEST_LINE_RULES = [
  [:read, { delimiter: ' ', length: 1..40, invalid: ["\r", "\n"], consume_delimiter: true }],
  [:consume_whitespace],
  [:read, { delimiter: ' ', length: 1..2048, invalid: ["\r", "\n"], consume_delimiter: true }],
  [:consume_whitespace],
  [:read_to_eol, { consume_eol: true, length: 6..8 } ]
]

HEADER_RULES = [
  [:read_or_eol, { delimiter: ':', length: 1..128, consume_delimiter: true }],
  [:return_if_nil],
  [:consume_whitespace],
  [:read_to_eol, { consume_eol: true, length: 1..2048, consume_delimiter: true }]
]

def parse_http1_headers
  (method, request_path, protocol) = @conn.parse(REQUEST_LINE_RULES)
  headers = {
    ':method' => method,
    ':path' => request_path,
    ':protocol' => protocol
  }
  
  while true
    (key, value) = @conn.parse(HEADER_RULES)
    return headers if !key

    headers[key.downcase] = value
  end
end

Here too, we can imagine being able to express these parsing rules using a DSL:

REQUEST_LINE_RULES = Polyphony.parse_program do
  read delimiter: ' ', length: 1..40, invalid: ["\r", "\n"], consume_delimiter: true
  consume_whitespace
  read delimiter: ' ', length: 1..2048, invalid: ["\r", "\n"], consume_delimiter: true
  consume_whitespace
  read_to_eol consume_eol: true, length: 6..8
end

It remains to be seen where are the limits to what we can achieve with this approach: can we really express everything that we need in order to parse any conceivable protocol. In addition, it is not clear whether this kind of solution provides performance benefits.

Summary

In this article I have presented an approach to optimizing the performance of Ruby apps by separating the program into two layers: a top layer that written in Ruby, expressing low-level operations using Ruby data structures; and an implementation layer written in C for executing those operations in an optimized manner. This approach is particularly interesting when dealing with long running or complex operations: sending an HTTP response with chunked encoding, parsing incoming data, running I/O operations in loops etc.

As I have mentioned above, this this is similar to that employed by io_uring on Linux. The idea is the same: we express (I/O) operations using data structures, then offload the execution to an lower-level optimized layer - in io_uring’s case it’s the kernel, in Ruby’s case it’s a C-extension.

It seems to me that

This is definitely an avenue I intend on further exploring, and I invite other Ruby developers to join me in this exploration. While we wait for all those exciting Ruby developments I mentioned at the beginning of this article to materialize (the new YJIT effort from Shopify looks especially promising), we can investigate other approaches that take advantage of Ruby’s expressivity while relying on native C code to execute lower level code.

02·09·2021

How I Write Code: Pen & Paper

I am a self taught programmer. I first started programming as a kid, and have never bothered to formally study this discipline. To me, programming is first of all a pleasant creative pursuit. It’s a bit like putting together and taking apart all kinds of machines, except these machines happen to be virtual constructions, and you get to keep your hands clean (well, mostly…) Incidentally, this pleasant activity is also how I support my family.

But even though I love programming, I try not to sit in front of a computer screen too much. I do not find staring at a screen all day beneficial, not for my physical health, nor for my mental health. In the last few years, I’ve started a habit of sketching my programming ideas using pen and paper. I’m not talking here about todo lists, or making diagrams. I’m talking about actually writing code using pen and paper. Let me explain.

Why pen & paper

A lot has been written about the advantages of handwriting vs typing. I will not enumerate all of them here, but I will tell you that since I started this practice I find I’m more productive, and I seem to produce better-quality code.

The mere fact that I can concentrate on a single problem without any distractions is already a big advantage, and it seems to me that through writing pages upon pages with a pen, scratching bad ideas, rewriting small bits of code, I gain (as if by magic) a deeper understanding of my code.

What about debugging? What about testing?

When you write code on paper, you have no way to see if your code works. Maybe someday I’ll be able to handwrite code on an e-Ink device and then run it. Until that day, all that I have is my ideas, my intuition, my knowledge, and my “mental runtime” - a mental thought process that simulates some kind of computer runtime that goes through the code, evaluating it.

Of course, my mental process is not perfect. It will let slip through all kinds of bugs, things I’m not smart enough to detect before actually feeding the code to a computer. That’s why I try to concentrate on one small problem at a time. Nothing bigger than a page or two, and mostly short snippets that implement a specific algorithm.

Iterate, iterate, iterate

So once I start exploring a problem space with pen & paper, I just iterate on it until I feel I’ve found the best solution to the problem. Once I’ve achieved that, I can finally open my laptop and type the code into my favorite editor.

Sometimes it works like magic, I just type in the code and everything works. Sometimes it needs some additional effort in editing and elaborating. But still, even if my handwritten code turned out to be only partially correct, or needed some reworking for it to fit in with other components, I feel like spending time sketching code and reflecting on it before turning to the computer has given me a deeper understanding of the code.

A recent example

Here’s an example of coding I did completely with pen & paper: a few days ago I published a new open-source project called Ever - a libev-based event reactor for Ruby. The entire design was done with pen & paper. I did it over three evenings, taking a couple hours each evening to work through the design and different aspects of the implementation.

Finally, when I felt the design was solid and that I had resolved all the issues I could see, I opened my laptop, created a new Github repository, typed in the code, added some tests and wrote the README. It was probably all done in 4 or 5 hours of concentrated work.

At the end of the process, in addition to being able to create a small Ruby gem in a single day without too much fussing around looking for solutions to problems encountered during development, I feel like I have a deeper knowledge of the code, a deeper understanding of the implications of different choices, and a greater appreciation for writing the least amount of code.

When I hold a pen in my hand and set it to paper, I find it much easier to be “in the zone”, and I feel much closer to the ideas I’m exploring. Somehow, I never get this feeling of connectedness when laying my hands on a computer keyboard. It also empowers me to know that I don’t need a computer in order to create code. I’m not dependent in my creative pursuits on a machine that sometimes seems to disturb my creative process rather than facilitate it.

Steve Jobs once talked about computers being like a bicycle for our minds. To me it feels like a lot of times we expend lots of energy on bikeshedding rather than actually riding our bikes. And maybe we should also get off our bikes every once in a while, and just walk.

26·08·2021

What's new in Polyphony and Tipi - August 2021 edition

The summer is drawing to an end, and with it I bring another edition of Polyphony (and Tipi) news, this time on my own website, where I’ll be publishing periodically from now on.

Polyphony is a library for writing highly concurrent Ruby apps. Polyphony harnesses Ruby fibers and a powerful io_uring-based I/O runtime to provide a solid foundation for building high-performance concurrent Ruby apps.

Tipi is a new Polyphony-based web server for Ruby. Tipi provides out of the box support for HTTP/1, HTTP/2, and WebSocket. Tipi also provides SSL termination (support for HTTPS) with automatic certificate provisioning and automatic ALPN protocol selection.

From counterpoint to composition

In the last month I’ve been doing a lot more work on Tipi than on Polyphony, and most of my work on Polyphony has been just fixing bugs. For me this is a major milestone, as I’m transitioning from working on the low-level stuff, to an actual application that can do something useful. To me this feels a bit like transitioning from writing counterpoint exercises to composing an actual piece of music.

The Polyphony API is maturing nicely and I hope to be able to make a 1.0 release in the coming weeks. As for Tipi, there’s still a lot of work to do in order for it to be ready for public release, and I’ll discuss that towards the end of this post.

Changes in Polyphony

Support for splicing on non-Linux platforms

The libev backend now supports all splicing APIs by emulating the Linux splice system call. I’ve already written about splicing and the amazing things that can be done with these APIs. So now they can be used on cross-platform, even if the performance gains are only achievable on Linux.

Fiber supervision

One of the major advantages of using Polyphony as the basis for concurrent programs is that it implements structured concurrency, a programming paradigm that makes it easier to control fiber execution in a highly-concurrent environment. Just imagine writing a program that performs thousands of long-running tasks concurrently. How do you manage that complexity? How do you deal with failures? How can you control any of those concurrent tasks?

Polyphony deals with this problem by adhering to three principles:

  1. Fibers are arranged in a hierarchy: a fiber is considered the child of the fiber from which it was spun.
  2. A fiber’s lifetime is limited to that of its immediate parent. In other words, a fiber is guaranteed to terminate before its parent does.
  3. Any uncaught exception raised in a fiber will “bubble up” to its immediate parent, and potentially all the way up to the main fiber (which will cause the program to terminate with an exception, if not handled.)

Here’s an example to demonstrate these three principles in action:

# Kernel#spin starts a new fiber
@controller = spin do
  @worker = spin do
    loop do
      # Each fiber has a mailbox for receiving messages
      peer, op, x, y = receive
      result = x.send(op, y)
      # The result is sent back to the "client"
      peer << result
    end
  end
  # The controller fiber will block until the worker is done (but notice that
  # the worker runs an infinite loop.)
  @worker.await
end

def calc(op, x, y)
  # Send the job to the worker fiber...
  @worker << [Fiber.current, op, x, y]
  # ... and wait for the result
  receive
end

# wait for the controller to terminate
@controller.await

In the above example, we spin a controller fiber, which then spins a worker fiber. This creates the following hierarchy:

main
 |
 +- controller
      |
      +-- worker

Now we can just call #calc to perform calculations inside the worker:

# from the main fiber
calc(:+, 2, 3) #=> 5

# or from another fiber:
f = spin { calc(:**, 2, 3) }
f.await #=> 8

But notice what happens when we send an operation that results in an exception:

calc(:+, 2, nil)
Traceback (most recent call last):
        5: from examples/core/calc.rb:7:in `<main>'
        4: from examples/core/calc.rb:8:in `block in <main>'
        3: from examples/core/calc.rb:9:in `block (2 levels) in <main>'
        2: from examples/core/calc.rb:9:in `loop'
        1: from examples/core/calc.rb:12:in `block (3 levels) in <main>'
examples/core/calc.rb:12:in `+': nil can't be coerced into Integer (TypeError)

Actually, the exception that was raised inside of the worker fiber, has bubbled up to the controller fiber. The controller, which was busy waiting for the worker to terminate, has re-raised the exception, which bubbled up to the main fiber. The main fiber, which was waiting for the controller to terminate, has re-raised the exception and has finally exited with an error message and a back trace (you can find the full example here).

The fact that unrescued exceptions bubble up through the fiber hierarchy allow us to control the lifetime of child fibers. Here’s one way we can deal with uncaught exceptions in the worker fiber:

@controller = spin do
  @worker = spin { ... }
  @worker.await
rescue => e
  puts "Uncaught exception in worker: #{e}. Restarting..."
  # Yes, in Polyphony fibers can be restarted!
  @worker.restart
end

Since the controller fiber can intercept any unrescued exception that occurred in its child, we add a rescue block, report the error and then restart the fiber.

Another possibility would be to handle the error at the level of the main fiber, or maybe to handle it locally if it’s only about trivial errors, and let more serious exceptions bubble up - it really depends upon the circumstances. The point is that Polyphony allows us to control the lifetime of any fiber anywhere in the fiber hierarchy with a small set of tools that builds on the rubustness of Ruby exceptions: putting rescue and ensure blocks in the right places will already do 99% of the work for us.

But what if we want to automate the handling of errors? What if we just want things to continue working without us needing to manually write rescue blocks? Enter fiber supervision.

Inspired by Erlang

The new fiber supervision mechanism in Polyphony is greatly inspired by Erlang supervision trees. While Erlang processes are not organised hierarchically, Erlang provides a supervisor behaviour that allows expressing hierarchical dependencies between processes.

While a lot of the functionality of Erlang supervision trees is already included in Polyphony by virtue of the structured concurrency paradigm, the Erlang supervisor behaviour allows automating the handling of error conditions. This is what I set to solve in the new fiber supervision API.

The new Kernel#supervise method can be used to supervise one or more fibers. By default, it does nothing more than just waiting for all supervised fibers to terminate. But it can also be used to automatically restart fibers once they have terminated, or restart them only when an exception occurred, or to perform other work when a fiber is terminated (for example, writing to a log file).

Going back to our example, here’s how we can use the controller fiber to supervise the worker fiber:

@controller = spin do
  @worker = spin { ... }
  supervise(@worker, restart: :always)
end

The call to Kernel#supervise tells the controller fiber to monitor the worker fiber and to restart it always once it terminates, ignoring any exceptions. Alternatively, we can tell the controller to restart the worker only when an exception occurs:

supervise(@worker, restart: :on_error)

We can also define a custom behavior by passing a block that will be called when the worker fiber terminates:

supervise(@worker) do |fiber, result|
  log_exception(result) if result.is_a?(Exception)
  fiber.restart
end

Staying in the loop: the advantages of fiber supervision

In my work on Polyphony and on Tipi I have discovered a few programming patterns that I find very interesting:

If we look at Tipi, a Polyphony app that can be used to serve HTTP/S on multiple ports, we’ll have a separate fiber listening for incoming connections on each port. When a connection is accepted, we spin a new fiber in order to handle the new connection concurrently:

http_listener = spin do
  while (conn = http_server.accept)
    spin { handle_client(conn) }
  end
end

https_listener = spin do
  while (conn = https_server.accept)
    spin { handle_client(conn) }
  end
end

Since the client handling fibers are spun from the listener fibers (either http_listener or https_listener), they are considered the children of those fibers. If any exception is raised in a client handling fiber and is not rescued, it will bubble up to the listener fiber and will cause it to terminate with the exception.

In addition, the listeners themselves might raise exception when accepting connections - these can be system call errors, I/O errors, OpenSSL errors (for the HTTPS listener) etc. We’d like an easy way to catch these errors. One way would be to just do this with a rescue block:

...

https_listener = spin do
  loop do
    conn = https_server.accept
    spin { handle_client(conn)
  rescue => e
    puts "HTTPS accept error: #{e.inspect}"
  end
end

This is a possibility, but we need to do it manually for each fiber, and we risk adding a lot of rescue blocks (some of them can even be for a specific class of exception) everywhere, an error-prone methodology that can prove problematic if overdone.

Instead, we can use the Kernel#supervise API provided by Polyphony to make sure our infinite loops (i.e. our listener fibers) continue running, even when an exception occurs. Thus we can embrace the Erlang moto: “Let it crash.” We let it crash, and then we restart it. Here’s how we can employ this using fiber superivision:

http_listener = spin(:http) { ... }
https_listener = spin(:https) { ... }
# If specific fibers are not specified, #supervise will supervise all of the
# current fiber's children.
supervise do |fiber, result|
  if result.is_a?(Exception)
    puts "Fiber #{fiber.tag} terminated with exception: #{result}. Restarting..."
    fiber.restart
  end
end

In this way we ensure that any uncaught exception from one of the listeners or their children will not slip through and stop the server from functioning. Any listener that has stopped because of an exception will just be restarted. And applying this to our controller example above:

@controller = spin do
  @worker = spin do
    loop do
      peer, op, x, y = receive
      result = x.send(op, y)
      peer << result
    end
  end
  supervise(@worker, restart: :always)
end

def calc(op, x, y)
  # Send the job to the worker fiber...
  @worker << [Fiber.current, op, x, y]
  # ... and wait for the result
  receive
end

supervise(@controller, restart: :always)

Bug fixes and other changes

Here’s a list of other, smaller changes and fixes in Polyphony:

Changes in Tipi

The Tipi server is progressing nicely. I’ve been running it in production over the last few months, and while it’s still a long way from providing a stable, easy-to-use API for other developers, in terms of features and functionality it’s already got 90% of the features expected from a modern web server: support for HTTP/1 and HTTP/2, SSL termination, support for WebSocket and streaming responses, support for serving static files and of course running Rack apps. Tipi is also able to dynamically provision SSL certificates using an ACME provider (such as Let’s Encrypt), though this feature is still work in progress.

Following is a summary of the big changes in Tipi this month.

H1P - a new HTTP/1 parser

I’ve hinted before about writing an HTTP/1 parser made for Tipi. Well the work is more or less done, and I’ve released the parser as a separate project called H1P. What sets this parser apart is the fact that it is completely blocking. While other parsers (at least the ones I know of) provide a callback-based API, where you register callbacks for different events, and then feed the parser with data and wait for those callbacks to be invoked, by contrast H1P provides a blocking API that’s much easier to use:

conn = server.accept
parser = H1P::Parser.new(conn)
headers = parser.parse_headers
body = parser.read_body
handle_request(headers, body)

Yes, that’s it (for the most part). And, the beauty of this parser is that you don’t even need Polyphony in order to use it. In fact you can use it in a “normal” threaded server (spawning a thread for each connection), and you can use it in conjunction with the new fiber scheduler introduced in Ruby 3.0.

The H1P parser is implemented in less than 750 lines of C, has zero dependencies and supports chunked encoding and LF/CRLF line breaks, has hard limits on token length for minimizing server abuse, and is transport agnostic - you can have it read from any source, even sources that are not IO objects:

data = ['GET ', '/foo', " HTTP/1.1\r\n", "\r\n"]
parser = H1P::Parser.new { data.shift }

parser.parse_headers
#=> { ':method' => 'GET', ':path' => '/foo', ... }

I intend to keep on working on H1P, notably on the following:

In addition to that, I also plan to implement a similar H2P project for handling HTTP/2 connections.

Automatic SSL certificate provisioning

If there’s one feature that can be a game changer for a Ruby web server, it’s automatic SSL certificate provisioning. Tipi already does SSL termination, and that makes it possible to use Tipi without any load balancer or reverse proxy in front of it, since it can deal with incoming HTTPS connections all by itself. But automatic SSL certificates take this autonomy to the next level: you don’t even have to provide a certificate for Tipi to use. Tipi will just take care of it all by itself, by dynamically provisioning a certificate from an ACME provider, such as Let’s Encrypt or ZeroSSL.

Imagine not having to set up Nginx, Apache or Caddy as a reverse proxy in order to run your web app. You just run Tipi (preferably with port-forwarding, so you don’t need to deal with binding to privileged ports) and point it at your Rack app. This is what I’m aiming to achieve in the near future.

So automatic certificates already work in Tipi. In fact, this very website, which I’ve put together a few weekends ago, already uses automatic certificates. While it works, there’s still a lot of details to take care of: testing, handling of failures, adding more ACME providers, and finally coming up with a simple API for configuring automatic certificates.

Other changes

In addition to the above big new features, I’ve also worked on the following:

What’s next for the Polyphony ecosystem?

In the last few years I’ve been creating a set of Ruby gems that I call Digital Fabric, with the moto: “Software for a better world.” I believe in empowering small developers to build lightweight, autonomous digital systems to solve specific needs. The Digital Fabric suite already includes tools for working with SQLite databases, creating HTML templates, and managing dependencies, in addition to Polyphony and Tipi.

I’m a long-time Ruby programmer, and to date my most substantial contribution to the Ruby community is Sequel, of which I’m the original author. The same spirit that guided me in creating Sequel is the one that’s currently guiding me in working on the Digital Fabric suite of tools: create simple and powerfull APIs that make developers happy and that feel like natural extensions of the Ruby programming language. I believe Polyphony and Tipi have the potential to unleash a new wave of creativity in the Ruby community!

Here’s some of the things I intend to work on in the near future:

27·07·2021

What's new in Polyphony - July 2021 edition

Following last month’s update, here’s an update on the latest changes to Polyphony:

Redesigned tracing system

In previous versions, Polyphony included extensions to the core Ruby TracePoint API, so that events such as switching fibers, scheduling fibers, polling for I/O completions, could be traced using the same TracePoint API that’s used for tracing method calls, variable access etc. In Polyphony 0.59 the tracing system was completely overhauled and separated from TracePoint. Polyphony backend events can now be traced by calling Backend#trace_proc:

Thread.backend.trace_proc = proc { |*event| p event }

Events are fed to the tracing proc as plain Ruby arrays, where the first member signifies the type of event. Currently the following events are emitted:

[:fiber_create, fiber]
[:fiber_schedule, fiber, resume_value, is_priority]
[:fiber_switchpoint, current_fiber]
[:fiber_run, fiber, resume_value]
[:fiber_event_poll_enter, current_fiber]
[:fiber_event_poll_leave, current_fiber]

New methods for changing fiber ownership

Polyphony follows the structured concurrency paradigm, where the lifetime of each fiber is limited to that of the fiber from which it was spun. This mechanism, also called a parent-child relationship, permits developers to spin up thousands of fibers in a structured and controlled manner. Version 0.60 introduces two new methods which allow you to change the parent of a fiber.

Fiber#detach sets the fiber’s parent to the main fiber. This method could be useful if you need a fiber to outlive its parent:

parent = spin do
  child = spin do
    do_something
  end
  child.detach
  child.parent #=> Fiber.main
end

parent.await
# parent is dead, but child is still alive!

Fiber#attach lets you set the fiber’s parent to any other fiber, which might be useful if you start a fiber in some context but then need it to be limited by the lifetime of another fiber:

worker_parent = spin { sleep }

fiber_maker = spin_loop do
  job = receive
  worker = spin { perform(job) }
  worker.attach(worker_parent)
end

# at some point we want to stop all workers
worker_parent.terminate # boom, all workers are terminated

Support for appending to buffers when reading

Up until now, the backend read/recv APIs allowed you to provide a buffer and read into it, replacing any content it may hold. A major change introduced in version 0.60 allows reading to any position in the provided buffer, including appending to the buffer. The Backend#read and Backend#recv methods now accept a buffer_pos argument:

# append to a buffer
i, o = IO.pipe

spin do
  o << 'foo'
  o << 'bar'
  o << 'baz'
end

buffer = +''
# buffer_pos is the last argument. -1 denotes the end of the buffer
Polyphony.backend_read(i, buffer, 3, false, -1)
buffer #=> 'foo'
Polyphony.backend_read(i, buffer, 3, false, -1)
buffer #=> 'foobar'
Polyphony.backend_read(i, buffer, 3, false, -1)
buffer #=> 'foobarbaz'

This addition may seem minor but what it allows us to do, beyond not needing to concatenate strings, is to write parsers that are competely blocking. I’m currently writing a custom HTTP/1 parser for Tipi that’s based on this unique feature and which promises to significantly improve both throughput and memory usage. (I’ll discuss this new parser in detail in another post.)

Improved backend statistics

Polyphony version 0.61 has introduced streamlined and more comprehensive backend statistics, now accessible using Backend#stats. The statistics are returned as a hash with the following keys:

Improved control over reading with #read and #readpartial

Finally, Polyphony versions 0.63 and 0.64 have added optional arguments to IO#read and IO#readpartial in order to allow developers to have more flexibility and to use the new “append to buffer” feature discussed above. Here are the updated signatures for those methods (they apply also to all socket classes):

Note the raise_on_eof argument, which can be used to control whether #readpartial raises an EOFError when an EOF is encountered.

What’s next for Polyphony

As I wrote above, I’m currently developing a custom HTTP/1 parser for Tipi, which already has promising performance characteristics, reduces dependencies and is completely synchronous (i.e. no callbacks are involved). I hope to be able to switch Tipi to using the new parser in the coming weeks and having it battle tasted in one of my production projects, and then to continue to write a HTTP/2 parser with a similar design.

25·06·2021

What's new in Polyphony - June 2021 edition

Polyphony 0.58 has just been released. Here’s a summary and discussion of the latest changes and improvements:

Improved functionality for OpenSSL sockets and servers

Following the work I’ve lately been doing on the Tipi server (you can read more about that towards the end of this post), I’ve made significant improvements to working with OpenSSL encrypted servers and client sockets. Here are some of the changes:

Fixes to the Mutex class

Following a bug report from @primeapple, trying to use Polyphony in a Rails project (!), some missing methods were added to the Polyphony implementation of the Mutex class: #owned? and #locked?.

It is still too early to tell if Polyphony can be used to drive a Rails app, and frankly I am not actively trying to make it happen, but I’d love to receive more feedback on how Polyphony interacts with different parts of the Rails ecosystem.

A redesigned event anti-starvation algorithm

Polyphony is at its core an efficient fiber scheduler that eschews the traditional non-blocking design that wraps the entire program in one big event loop. Instead, when a fiber needs to perform some long-running operation, it simply passes control to the next fiber on the run queue. If the run queue is empty, the backend is polled for completion of events or operations. This is a blocking call that will return only when one or more events are available for the backend to process. Each completed event or operation will cause the corresponding fiber to be scheduled, and to be eventually resumed.

Under certain circumstances, though, if the runqueue is never empty (because fibers are kept being scheduled regardless of I/O events), this will prevent Polyphony from polling for events, leading to event starvation.

In order to prevent this from happening, Polyphony includes a mechanism for periodically performing a non-blocking poll, which assures the processing of ready events, even under such conditions. For the libev backend, this is done by calling ev_run(backend->ev_loop, EVRUN_NOWAIT), which will only process ready events without waiting. For the io_uring backend this done by simply processing the available CQEs without issuing a io_uring_enter system call.

Now, until Polyphony version 0.54, determining when to use this mechanism was problematic, and was based on false assumptions. In Polyphony 0.55 the algorithm for determining when to make a non-blocking poll was redesigned, and is now based on counting the number of times fibers have been switched, as well as keeping a high water mark for the number of fibers in the run queue:

If this tickles your interest, you can have a look at the code.

A new API for splicing to/from pipes

The Linux kernel includes a relatively little known system call called splice, which lets developers move data from one file descriptor to another (for example, from a file to a socket) without needing to copy data back and forth between userspace and the kernel, a costly operation, and in some cases even without copying data inside the kernel itself, by using pipes (which act as kernel buffers). To learn more about splice and what it’s good for, read this explanation by Linus Torvalds himself.

Starting from Polyphony 0.53, I’ve been gradually adding support for splicing to and from I/Os on both the libev and io_uring backends. The following APIs were added:

In addition, the corresponding methods have been added to the IO class:

So we know that to splice we need to use a pipe, either for the source or the destination or for both, but how do we use it in practice? Suppose we want to write the content of a file to a socket. Here’s one way we can do this with splice:

def write_file_to_socket(socket, path)
  r, w = IO.pipe
  File.open(path, 'r') do |f|
    spin do
      w.splice_to_eof(f)
    ensure
      w.close
    end
    socket.splice_to_eof(r)
  end
end

In the above example we create a pipe, and then we spin up a separate fiber that will splice from the file to the pipe, while the current fiber splices from the pipe to the socket. This technique can be used for files of arbitrary size (even GBs), without loading the file content into Ruby strings and putting pressure on the Ruby garbage collector. On top of this, we do this concurrently and with automatic backpressure (i.e. our socket will not get inondated with MBs of data.)

While the splice system call is only available on Linux, the libev backend includes fake implementations of Backend#splice and Backend#splice_to_eof done with plain read and write calls.

In addition to the above new methods, Polyphony 0.57 also introduces the Backend#splice_chunks method, which can be used for splicing chunks to some arbitrary destination IO instance, interespersed with writing plain Ruby strings to it. The use case arose while working on the Tipi web server, and trying to optimize serving static files on the web without loading the file content in Ruby strings. The Tipi HTTP/1.1 adapter tries whenver possible to use chunked encoding. In HTTP/1.1 for each chunk there should be a header including the chunk size, followed by the chunk itself, and finally a \r\n delimiter. In order to abstract away the creation of a pipe (for use with splicing) and the looping etc, I introduced the following method:

Backend#splice_chunks(src, dest, prefix, postfix, chunk_prefix, chunk_postfix, chunk_size)

… with the following arguments:

The chunk prefix and postfix can be a Proc that accepts the length of the current chunk, and returns a string to be written to the destination. Here’s how this new API is used in Tipi to serve big files:

# Edited for brevity
def respond_from_io(request, io, headers, chunk_size = 2**14)
  formatted_headers = format_headers(headers)
  Thread.current.backend.splice_chunks(
    io,
    @conn,
    # prefix: HTTP headers
    formatted_headers,
    # postfix: empty chunk denotes end of response 
    "0\r\n\r\n", 
    # dynamic chunk prefix with the chunk length
    ->(len) { "#{len.to_s(16)}\r\n" },
    # chunk delimiter 
    "\r\n", 
    chunk_size
  )
end

As the example demonstrates, this allows sending chunks from arbitrary IO instances (be it files or sockets or STDIO or pipes) without any of the data passing through the Ruby runtime, and the API is concise but also allows lots of flexibility. We can imagine using this API to send HTTP/2 data frames without much difficulty.

While the libev backend is more or less straightforward - doing splicing and writing sequentially one after the other, the io_uring backend implementation benefits from being able to issue multiple ordered I/O operations at once using the IOSQE_IO_LINK flag. This allows us to further minimize the number of system calls we make.

But what of the performance implications? Does using this technique result in any noticable improvements to performance? It’s still too early to tell how using this technique will affect the numbers in a real-world situation, but preliminary benchmarks for serving static files with Tipi show a marked improvement for files bigger than 1MB:

File size Normal - req/s Spliced - req/s Change
1KB 8300 7568 -8%
64KB 7256 5702 -21%
1MB 730 768 +5%
4MB 130 189 +45%
16MB 28 46 +64%
64M 9 12 +33%
256MB 2 3 +50%

This benchmark was done using the io_uring backend, using wrk with the stock settings, i.e. 2 threads and 10 concurrent connections, on an lowly EC2 t3.xlarge machine.

A new API for chaining multiple I/O operations

Another big new feature introduced in Polyphony 0.55 is chaining of multiple ordered I/O operations using the Backend#chain API, which allows developers to specify multiple (outgoing) I/O operations in a single call to the backend, in order to minimize the overhead involved in going back and forth between the fiber issuing the I/O operations and the backend.

While Polyphony can already write multiple strings to the same file descriptor with a single method call (using writev), this new API allows developers to perform multiple I/O operations on different file descriptors in a single method call.

Here as well, the io_uring backend can reap additional performance benefits by issuing multiple ordered I/O operations using a single system call, without having to wakeup the fiber after each I/O operation, in a similar fashion to the Backend#splice_chunks API we just discussed.

The Backend#chain method takes one or more operation specifications expressed using plain arrays. Here’s a simplified version of Backend#splice_chunks implemented using the #chain API:

def splice_chunks_in_ruby(src, dest, prefix, postfix, chunk_size)
  r, w = IO.pipe
  while true
    len = w.splice(src, chunk_size)
    break if len == 0

    chain(
      [:write, dest, prefix],
      [:splice, r, dest, len],
      [:write, dest, postfix]
    )
  end
end

The following operation specifications are currently supported:

New APIs for performing GC and other arbitrary work when idle

When running web servers in production, I’d like not only to maximize the server’s throughput (expressed in requests per second), but also minimize latency. And when we talk about latency we also need to talk about percentiles. One of the things that can really hurt those 99th percentile latency numbers in Ruby web servers is the fact that the Ruby runtime needs to perform garbage collection from time to time, and normally this garbage collection event is both slow (costing tens of milliseconds or even more), and can come at any time, including while processing an incoming request.

In order to prevent garbage collection from happening while your server is busy preparing a response, a technique called out-of-band GC, or out-of-band processing, consists of disabling the garbage collector, and manually running a GC cycle when the server is otherwise idle (i.e. not busy serving requests.)

Polyphony 0.58 introduces new APIs that allow you to perform garbage collection or run any code only when the process is otherwise idle (i.e. when no fibers are scheduled.) Here are the new APIs:

Here’s how you can set automatic GC when idle:

# Here we set the minimum interval between consecutive GC's done only when the
# thread is otherwise idle to 60 seconds:
Thread.current.idle_gc_period = 60

GC.disable

You can also run an arbitrary block of code when idle by passing a block to Thread#on_idle:

Thread.current.on_idle do
  do_something_really_unimportant
end

What’s next for Polyphony?

Polyphony has been in development for almost three years now, and its API is slowly stabilizing. I’d like to be able to release version 1.0 in a few month but I still have some work left before we arrive there, including:

As for actual applications using Polyphony, I am continuing work on the Tipi web server, which is already used in production (in a closed-source product for one of my clients), and which already knows how to do HTTP/1.1, HTTP/2, Websockets and SSL termination.

I am currently working on two really exciting things: