Cайт веб-разработчика, программиста Ruby on Rails ESV Corp. Екатеринбург, Москва, Санкт-Петербург, Новосибирск, Первоуральск

Обработка запросов json на Ruby + Ractor + Kafka

Довелось как-то пообщаться на Хабре с одним умником, который возомнил себя... уж не знаю кем возмнил, но сыпал терминами "GVL (GIL) Ruby", "CPU-bound", “IO-bound”. Таких "знатоков", к сожалению, полно, но это может произвести только лишь первое впечатление на незнающего работодателя. В общем, мне был брошен типа вызов:

Ну, давайте с относительно простой задачки начнём. Напишите тривиальный коллектор событий с фронта. Одна ручка, на которую можно прислать JSON (ориентировочно размером от 1 до 50 кб) и эти события надо сложить в Кафку с полным акком. Ориентировочная нагрузка: 10k событий в секунду.

Сомнения возникли в реальности такого механизма хотя бы в том, что уж больно какие-то нереальные (по нынешним временам - август 2025) нагрузки - обработка 10K x 50K ~= 500Mb/s. Пусть это так и останется чисто гипотетическим предположением о продобных нагрузках. И тем не менее, код сделан как раз с учётом больших нагрузок - Ractor, Kafka.

В общем, в результате изначально получился у меня вот такой код, но рабочий вариант чуть ниже. В этом коде всё хорошо, но проблема в том, что rdkafka работает с переменными класса, что не позволяет ему корректно работать в параллельных изолированных потоках Ractor. Если когда-нибудь исправят в gem rdkafka, то это будет вполне работоспособный код.

# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025

require 'sinatra'
require 'rdkafka'
require 'json'
require 'etc'
require 'mutex_m'

set :bind, '0.0.0.0'
set :port, 4567

BROKERS = 'localhost:9092'
TOPIC = 'events'
WORKERS = Etc.nprocessors
MAX_SIZE = 60_000

# Конфиг Kafka
RDKAFKA_CONFIG = {
  'bootstrap.servers'      => BROKERS,
  'acks'                   => 'all',
  'queue.buffering.max.ms' => '5',
  'batch.num.messages'     => '10000'
}

# Счётчик обработанных сообщений
rps_count = 0
rps_mutex = Mutex.new

# Пул Ractor-ов
worker_index = 0
worker_mutex = Mutex.new

workers = Array.new(WORKERS) do | i |

  Ractor.new(i, RDKAFKA_CONFIG, TOPIC) do | wid, config, topic |

    producer = Rdkafka::Config.new(config).producer

    loop do

      msg = Ractor.receive

      break if msg == :shutdown # завершение работы

      begin

        # Парсим JSON с символами в ключах
        data = JSON.parse(msg, symbolize_names: true)

        # Фильтрация только разрешённых ключей
        # т.к. это всего лишь тестовый пример
        data.slice!(:type, :name, :a, :b)

        # Анализируем сообщения
        case data
        in { type: 'greeting', name: String => name }
          puts "[Ractor #{wid}] Greeting for #{name}"

        in { type: 'sum', a: Integer => a, b: Integer => b }
          puts "[Ractor #{wid}] Sum = #{a + b}"

        else
          puts "[Ractor #{wid}] Unrecognized event"

        end

        # Асинхронная отправка в Kafka без блокировки
        producer.produce_async(topic: topic, payload: msg)
        producer.poll(0) # Обработка колбэков

      rescue JSON::ParserError
        warn "[Ractor #{wid}] Invalid JSON"

      rescue => e
        warn "[Ractor #{wid}] Error: #{e.class} #{e.message}"

      end

    end
  end
end

# Выводим количество обработанных событий в секунду
Thread.new do
  loop do
    sleep 1
    count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } }
    puts "[RPS] #{count} events/sec"
  end
end

# HTTP endpoint для приёма событий
post '/events' do

  body = request.body.read

  halt 413, 'Payload too large' if body.bytesize > MAX_SIZE
  halt 400, 'Empty payload' if body.empty?

  # Считаем количество запросов в секунду
  rps_mutex.synchronize { rps_count += 1 }

  # Используем round-robin для выбора работника
  worker_idx = worker_mutex.synchronize { (worker_index += 1) % WORKERS }
  workers[worker_idx].send(body)

  status 202
  'Accepted'

end

# Завершаем работу всех Ractor'ов при выходе из программы
at_exit do
  puts "[Main] Shutting down..."
  workers.each { | w | w.send(:shutdown) }
  workers.each(&:take) # ждём завершения
  puts "[Main] All workers stopped."
end

Замена rdkafka на ruby-kafka

Вполне рабочий вариант, но, вероятно, будет работать медленнее, но это компенсируется использованием Ractor и YJIT в Ruby. Используется gem ruby-kafka вместо gem rdkafka.

# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025

require 'sinatra'
require 'kafka' # используем gem ruby-kafka
require 'json'
require 'etc'
require 'mutex_m'

set :bind, '0.0.0.0'
set :port, 4567

BROKERS = ['localhost:9092'] # ruby-kafka ожидает массив
TOPIC = 'events'
WORKERS = Etc.nprocessors
MAX_SIZE = 60_000

# Счётчик обработанных сообщений
rps_count = 0
rps_mutex = Mutex.new

# Пул Ractor'ов
worker_index = 0
worker_mutex = Mutex.new

workers = Array.new(WORKERS) do | i |
  Ractor.new(i, BROKERS, TOPIC) do | wid, brokers, topic |

    # Создаём Kafka-клиент и producer внутри Ractor'а
    kafka = Kafka.new(
      seed_brokers: brokers,
      client_id: "collector_worker_#{wid}"
    )

    # Синхронный продюсер (для полного контроля и ACK=all)
    producer = kafka.sync_producer(
      acks: :all,
      max_retries: 3,
      retry_backoff: 100
    )

    loop do

      msg = Ractor.receive

      break if msg == :shutdown

      begin

        # Парсим JSON с символами в ключах
        data = JSON.parse(msg, symbolize_names: true)

        # Фильтрация только разрешённых ключей
        data.slice!(:type, :name, :a, :b)

        # Анализируем сообщения
        case data
        in { type: 'greeting', name: String => name }
          puts "[Ractor #{wid}] Greeting for #{name}"

        in { type: 'sum', a: Integer => a, b: Integer => b }
          puts "[Ractor #{wid}] Sum = #{a + b}"

        else
          puts "[Ractor #{wid}] Unrecognized event"

        end

        # Синхронная отправка (для гарантии ACK=all)
        # Можно использовать async_producer, но sync проще для контроля
        producer.deliver_message(msg, topic: topic)

      rescue JSON::ParserError
        warn "[Ractor #{wid}] Invalid JSON"

      rescue => e
        warn "[Ractor #{wid}] Error: #{e.class} #{e.message}"

      ensure
        # Важно: закрыть продюсер при выходе
        producer&.close

      end

    end

  end
end

# RPS-мониторинг
Thread.new do
  loop do
    sleep 1
    count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } }
    puts "[RPS] #{count} events/sec"
  end
end

# HTTP endpoint для приёма событий
post '/events' do

  body = request.body.read

  halt 413, 'Payload too large' if body.bytesize > MAX_SIZE
  halt 400, 'Empty payload' if body.empty?

  # Считаем RPS
  rps_mutex.synchronize { rps_count += 1 }

  # Round-robin выбор Ractor'а
  worker_idx = worker_mutex.synchronize { (worker_index += 1) % WORKERS }
  workers[worker_idx].send(body)

  status 202
  'Accepted'

end

# Завершение работы
at_exit do

  puts "[Main] Shutting down..."

  # Остановить Ractor'ы
  workers.each { | w | w.send(:shutdown) }
  workers.each(&:take)

  puts "[Main] All workers stopped."

end

Рабочий вариант

...но оказалось, что rdkafka использует переменные класса (что само по себе является небезопасной практикой - даже Matz упоминал об их нежелательном использовании в Ruby), поэтому было принято решение вынести работу с rdkafka в основной поток:

# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025

require 'sinatra'
require 'rdkafka'
require 'json'
require 'etc'
require 'thread'
require 'mutex_m'

set :bind, '0.0.0.0'
set :port, 4567

BROKERS = 'localhost:9092'
TOPIC = 'events'
WORKERS = Etc.nprocessors
MAX_SIZE = 60_000
MAX_QUEUE_SIZE = 10_000

# Конфиг Kafka
RDKAFKA_CONFIG = {
  'bootstrap.servers'      => BROKERS,
  'acks'                   => 'all',
  'queue.buffering.max.ms' => '5',
  'batch.num.messages'     => '10000'
}

# Первая очередь: для передачи событий в Ractor'ы (backpressure)
input_queue = SizedQueue.new(MAX_QUEUE_SIZE)

# Вторая очередь: для отправки обработанных данных в Kafka
events_queue = SizedQueue.new(MAX_QUEUE_SIZE)

# Главный producer (в main потоке)
producer = Rdkafka::Config.new(RDKAFKA_CONFIG).producer

# Запускаем фоновую отправку из main потока
# Поток отправки сообщений в Kafka (решает проблему Ractor::IsolationError в rdkafka)
# потому как rdkafka какого-то чёрта использует переменные класса, хотя давно
# всем известно, что их использование не рекомендует даже Matz
# поэтому работа с kafka будет производится в главном потоке
Thread.new do
  loop do

    msg = events_queue.pop

    break if msg == :flush

    begin
      producer.produce_async(topic: TOPIC, payload: msg)
      producer.poll(0)
    rescue => e
      warn "[Kafka Thread] Error: #{e.class} #{e.message}"
    end

  end
end

# Счётчик обработанных сообщений
rps_count = 0
rps_mutex = Mutex.new

# Пул Ractor'ов: читают из input_queue, обрабатывают, отправляют в events_queue
workers = Array.new(WORKERS) do | i |
  Ractor.new(i, input_queue, events_queue) do | wid, input, output |

    loop do

      msg = input.pop # читаем из очереди ввода

      break if msg == :shutdown

      begin

        data = JSON.parse(msg, symbolize_names: true)

        data.slice!(:type, :name, :a, :b)

        case data
        in { type: 'greeting', name: String => name }
          puts "[Ractor #{wid}] Greeting for #{name}"

        in { type: 'sum', a: Integer => a, b: Integer => b }
          puts "[Ractor #{wid}] Sum = #{a + b}"

        else
          puts "[Ractor #{wid}] Unrecognized event"

        end

        output << msg # в очередь для отправки в kafka

      rescue JSON::ParserError
        warn "[Ractor #{wid}] Invalid JSON"
      rescue => e
        warn "[Ractor #{wid}] Error: #{e.class} #{e.message}"
      end

    end

  end
end

# RPS-мониторинг
Thread.new do
  loop do
    sleep 1
    count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } }
    puts "[RPS] #{count} events/sec"
  end
end

# HTTP endpoint для приёма событий
post '/events' do

  body = request.body.read

  halt 413, 'Payload too large' if body.bytesize > MAX_SIZE
  halt 400, 'Empty payload' if body.empty?

  rps_mutex.synchronize { rps_count += 1 }

  begin
    # контроль переполнения
    input_queue.push(body, exception: true)
  rescue ThreadError
    halt 429, 'Too many requests'
  end

  status 202
  'Accepted'

end

# Завершение работы
at_exit do

  puts "[Main] Shutting down..."

  # 1. Отправить сигнал завершения всем Ractor'ам
  workers.each { | worker | worker.send(:shutdown) }

  # 2. Дождаться завершения всех Ractor'ов
  workers.each(&:take)

  # 3. Отправить сигнал завершения потоку Kafka
  events_queue << :flush

  # 4. Дождаться отправки всех сообщений
  producer&.flush(5000)

  puts "[Main] All workers stopped."

end

И в дополнение к нему ещё и тест нагрузки:

# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025

# test_load.rb

require 'net/http'
require 'json'
require 'uri'
require 'benchmark'

# Конфигурация теста
URL = URI.parse('http://localhost:4567/events') # Указываем правильный адрес сервера
EVENT_SIZE = 2000                               # Размер тела запроса (в байтах) - регулируем по нуждам
TOTAL_REQUESTS = 10_000                         # Количество запросов для теста
CONCURRENCY = 100                               # Число потоков, которые будут посылать запросы

# Сгенерируем случайный JSON для запроса
def generate_random_event
  {
    type: 'greeting',
    name: ('a'..'z').to_a.sample(10).join
  }.to_json
end

# Функция для отправки одного запроса
def send_request(event)
  http = Net::HTTP.new(URL.host, URL.port)
  request = Net::HTTP::Post.new(URL.path, {'Content-Type' => 'application/json'})
  request.body = event
  http.request(request)
end

# Тест нагрузки
Benchmark.bm do | x |

  x.report("load test:") do

    # Используем потоки для параллельной отправки запросов
    threads = Array.new(CONCURRENCY) do
      Thread.new do
        TOTAL_REQUESTS / CONCURRENCY.times do
          event = generate_random_event
          send_request(event)
        end
      end
    end

    threads.each(&:join) # Ждём завершения всех потоков

  end
end

Этот код – просто пример использвания Ruby с Ractor (параллельная обработка) и Kafka, но, мало ли, вдруг кому и пригодится для реальных проектов. Рекомендую использовать Ruby с YJIT для более быстрого исполнения программы.