Monday, November 28, 2022
HomeRuby On RailsRuby concurrency is tough: how I turned a Ruby on Rails contributor

Ruby concurrency is tough: how I turned a Ruby on Rails contributor


For the previous a number of weeks, I have been attempting to repair a cranky spec in Karafka integrations suite, which in the long run, lead me to grow to be a Ruby on Rails micro-contributor and submitting related repair to a number of different high-popularity initiatives from the Ruby ecosystem. This is my story of attempting to make sense of my specs and Ruby concurrency.

Ephemeral bug from a test-suite

Karafka is a Ruby and Rails multi-threaded environment friendly Kafka processing framework. To offer dependable OSS that’s multi-threaded, I needed to have the choice to run my take a look at suite concurrently to simulate how Karafka operates. Because it was a selected use case, I created my micro-framework.

Lengthy story brief: It runs end-to-end integration specs by operating them in separate Ruby processes. Every begins Karafka, runs all of the code in numerous configurations, connects to Kafka, checks assertions, and on the finish, shuts down.

Such an strategy allowed me to make sure that the method’s entire lifecycle and its elements work as anticipated. Specs are began with supervision, so in case of any cling, will probably be killed after 5 minutes.

Karafka itself additionally has an inside shutdown supervisor. In case of a person shutdown request, if the shutdown takes longer than the outlined anticipated time, Karafka will cease regardless of having issues operating. And that is what was occurring with this single spec:

E, [2022-11-19T16:47:49.602718 #14843] ERROR -- : Forceful Karafka server cease
F, [2022-11-19T16:47:49.602825 #14843] FATAL -- : #<Karafka::Core::Monitoring::Occasion:0x0000562932d752b0 @id="error.occurred", @payload={:caller=>Karafka::Server, :error=>#<Karafka::Errors::ForcefulShutdownError: Karafka::Errors::ForcefulShutdownError>, :sort=>"app.stopping.error"}>

This rattling spec didn’t need to cease!

Many issues are working beneath the hood:

  • employees that course of jobs that would cling and drive the method to attend
  • jobs queue that can be linked to the polling thread (to ballot extra knowledge when no work is to be finished)
  • listeners that ballot knowledge from Kafka that would cling
  • shopper teams with a number of threads polling Kafka knowledge which may get caught due to some underlying error
  • Different bugs within the coordination of labor and states.

One factor that definitely labored was the method supervision that might forcefully kill it after 30 seconds.

Course of shutdown coordination

The sleek shutdown of such a course of takes work. When you have got many connections to Kafka, upon a poorly organized shutdown, you could set off a number of rebalances which will trigger short-lived matters assignments inflicting nothing besides friction and probably blocking the entire course of.

To mitigate this, Karafka shuts down actively and gracefully. That’s, till absolutely the finish, it claims the possession of given matters and partitions, actively ready for all the present work to be completed. This appears to be like roughly like so:

Word: Client teams internally in Karafka are a bit totally different than Kafka shopper teams. Right here we concentrate on inside Karafka ideas.

Pinpointing the problem

After a number of failed makes an attempt and fixing different bugs, I added loads of further instrumentation to verify what Karafka hangs on. It was hanging as a result of there have been hanging listener threads!

As acknowledged above, to shut Karafka gracefully, all work from the roles queue must be completed, and listeners that ballot knowledge from Kafka want to have the ability to exit the polling loops. It is all coordinated utilizing a job queue. The job queue we’re utilizing is fairly complicated with some blocking capabilities, and you may examine it right here, however the fascinating a part of the code could be lowered to this:

@semaphores = Concurrent::Map.new  h[k] = Queue.new 

These queues are used as semaphores within the polling loops till all the present work is completed. Since every Queue is assigned to a unique subscription group inside its thread and hidden behind a concurrent map, there must be no drawback. Proper?

Replica

As soon as I had my loopy suspicion, I made a decision to cut back it all the way down to a proof of idea:

require 'concurrent-ruby'

100.instances do
  ids = Set.new
  semaphores = Concurrent::Hash.new  h[k] = Queue.new 

  100.instances.map do
    Thread.new do
      ids << semaphores['test'].object_id
    finish
  finish.every(&:be a part of)

  elevate "I anticipated 1 semaphore however obtained #{ids.measurement}" if ids.measurement != 1
finish

as soon as executed, growth:

poc.rb:13:in `<principal>': I anticipated 1 semaphore however obtained 2 (RuntimeError)

There’s multiple semaphore for one listener! This brought about Karafka to attend till compelled to cease as a result of the employee thread would use a unique semaphore than the listener thread.

However how is that even doable?

Properly, Concurrent::Hash and Concurrent::Map initialization is certainly thread-safe however not exactly as you’ll anticipate them to be. The docs state that:

This model locks towards the item itself for each methodology name, guaranteeing just one thread could be studying or writing at a time. This consists of iteration strategies like #every, which takes the lock repeatedly when studying an merchandise.

“just one thread could be studying or writing at a time”. Nonetheless, we’re doing each at totally different instances. Our code:

semaphores = Concurrent::Hash.new  h[k] = Queue.new 

is definitely equal to:

semaphores = Concurrent::Hash.new do |h, ok|
  queue = Queue.new
  h[k] = queue
finish

and the block content material is not locked absolutely. One threads queue can overwrite the opposite if the Ruby scheduler stops the execution within the center. This is the movement of issues occurring within the type of a diagram:

From time to time listener would obtain a dangling queue object, successfully blocking the polling course of.

Fixing the problem

This may be fastened both by changing the Concurrent::Hash with Concurrent::Map and utilizing the #compute_if_absent methodology or by introducing a lock inside the Concurrent::Hash initialization block:

Concurrent::Map.new do |ok, v|
  ok.compute_if_absent(v) { [] }
finish

mutex = Mutex.new

Concurrent::Hash.new do |ok, v|
  mutex.synchronize do
    ok[v] = []
  finish
finish

Okay, however what does Ruby on Rails and different initiatives do with all of this?

Fixing the world

I found out that if I made this error, possibly others did. I made a decision to verify my native gems to search out occurrences shortly. Inside my native gem cache, I executed the next code:

fgrep -R 'Concurrent::Hash.new {' ./
fgrep -R 'Concurrent::Hash.new do' ./
fgrep -R 'Concurrent::Map.new {' ./
fgrep -R 'Concurrent::Map.new do' ./

and validated that I am not an remoted case. I wasn’t alone!

Then utilizing Sourcegraph I pinpointed a number of initiatives that had the potential for fixes:

  • rails (activesupport and actionview)
  • i18n
  • dry-schema
  • finite_machine
  • graphql-ruby
  • rom-factory
  • apache whimsy
  • krane
  • puppet

I’m not a website professional in any of these, and understanding the severity of every was past my time constraints, however I made a decision to provide it a shot.

Rails (ActiveSupport and ActionView)

Inside Rails, this “sample” was used twice: in ActiveSupport and ActionView.

In ActionView, it was used inside a cache:

PREFIXED_PARTIAL_NAMES = Concurrent::Map.new do |h, ok|
  h[k] = Concurrent::Map.new
finish

and assuming that the cached result’s stateless (identical outcome every time for a similar key), the problem might solely trigger an additional computation upon first parallel requests to this cache.

Within the case of ActiveSupport, not one of the concurrency code was wanted, so I simply changed it with a easy Hash.

Each, fortunately, weren’t that extreme, although value fixing nonetheless.

PR: https://github.com/rails/rails/pull/46536
PR: https://github.com/rails/rails/pull/46534

Each had been merged, and that is how I turned a Ruby on Rails contributor 🙂

i18n

This case was barely extra fascinating as a result of the concurrent cache shops all translations. In concept, this might trigger related leakage as in Karafka, successfully dropping a language by loading it to a unique Concurrent::Hash:

100.instances.map do
  Thread.new do
    I18n.backend.store_translations(rand.to_s, :foo => { :bar => 'bar', :baz => 'baz' })
  finish
finish.every(&:be a part of)

I18n.available_locales.rely #=> 1

This might result in hard-to-debug issues. From time to time, your system might elevate one thing like this:

:en shouldn't be a sound locale (I18n::InvalidLocale)

with out an obvious motive, and this drawback would go away after a restart.

PR: https://github.com/ruby-i18n/i18n/pull/644

dry-schema

One other cache case the place the chance would revolve round double-computing.

PR: https://github.com/dry-rb/dry-schema/pull/440

rom-factory

This one is fascinating! Let’s scale back the code to a smaller POC first and see what is going to occur beneath heavy threading:

require 'singleton'
require 'concurrent-ruby'

class Sequences
  embrace Singleton

  attr_reader :registry

  def initialize
    reset
  finish

  def subsequent(key)
    registry[key] += 1
  finish

  def reset
    @registry = Concurrent::Map.new h, ok
    self
  finish
finish

seq = Sequences.occasion

loop do
  100.instances.map do
    Thread.new { seq.subsequent('growth') }
  finish.every(&:be a part of)

  measurement = seq.registry['boom']

  elevate "Needed 100 however obtained #{measurement}" until measurement == 100

  seq.reset
finish
poc.rb:37:in `block in <principal>': Needed 100 however obtained 1 (RuntimeError)

The counter worth will get biased. What’s much more fascinating is that making the map secure will not be sufficient:

@registry = Concurrent::Map.new { |h, ok| h.compute_if_absent(ok) { 0 } }
poc.rb:36:in `block in <principal>': Needed 100 however obtained 55 (RuntimeError)

there’s yet another “unsafe” methodology:

def subsequent(key)
  registry[key] += 1
finish

this operation additionally shouldn’t be atomic, thus must be wrapped with a mutex:

def initialize
  @mutex = Mutex.new
  reset
finish

def subsequent(key)
  @mutex.synchronize do
    registry[key] += 1
  finish
finish

Solely then is that this code secure for use.

https://github.com/rom-rb/rom-factory/pull/80

Different repositories

Abstract

In my view, there are a number of outcomes of this story:

  • Karafka has a strong test-suite!
  • If you’re doing concurrency-related work, you higher take a look at it in a multi-threaded setting and take a look at it properly.
  • Concurrency is tough to many people (possibly that is as a result of we’re particular 😉 ).
  • RTFM and browse it properly 🙂
  • Don’t be afraid to assist others by submitting pull requests!

However, wanting on the frequency of this subject, it could be value opening a dialogue about altering this habits and making the initialization absolutely locked.

Afterwords

Concurrent::Hash beneath cRuby is only a Hash. You’ll be able to test it out right here.


Cowl picture by James Broad on Attribution-NonCommercial-ShareAlike 2.0 Generic (CC BY-NC-SA 2.0). Picture has been cropped.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments