Обработка запросов json на Ruby + Ractor + Kafka
Довелось как-то пообщаться на Хабре с одним умником, который возомнил себя... уж не знаю кем возмнил, но сыпал терминами "GVL (GIL) Ruby", "CPU-bound", “IO-bound”. Таких "знатоков", к сожалению, полно, но это может произвести только лишь первое впечатление на незнающего работодателя. В общем, мне был брошен типа вызов:
Ну, давайте с относительно простой задачки начнём. Напишите тривиальный коллектор событий с фронта. Одна ручка, на которую можно прислать JSON (ориентировочно размером от 1 до 50 кб) и эти события надо сложить в Кафку с полным акком. Ориентировочная нагрузка: 10k событий в секунду.
Сомнения возникли в реальности такого механизма хотя бы в том, что уж больно какие-то нереальные (по нынешним временам - август 2025) нагрузки - обработка 10K x 50K ~= 500Mb/s. Пусть это так и останется чисто гипотетическим предположением о продобных нагрузках. И тем не менее, код сделан как раз с учётом больших нагрузок - Ractor, Kafka.
В общем, в результате изначально получился у меня вот такой код, но рабочий вариант чуть ниже. В этом коде всё хорошо, но проблема в том, что rdkafka работает с переменными класса, что не позволяет ему корректно работать в параллельных изолированных потоках Ractor. Если когда-нибудь исправят в gem rdkafka, то это будет вполне работоспособный код.
# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025
require 'sinatra'
require 'rdkafka'
require 'json'
require 'etc'
require 'mutex_m'
set :bind, '0.0.0.0'
set :port, 4567
BROKERS = 'localhost:9092'
TOPIC = 'events'
WORKERS = Etc.nprocessors
MAX_SIZE = 60_000
# Конфиг Kafka
RDKAFKA_CONFIG = {
'bootstrap.servers' => BROKERS,
'acks' => 'all',
'queue.buffering.max.ms' => '5',
'batch.num.messages' => '10000'
}
# Счётчик обработанных сообщений
rps_count = 0
rps_mutex = Mutex.new
# Пул Ractor-ов
worker_index = 0
worker_mutex = Mutex.new
workers = Array.new(WORKERS) do | i |
Ractor.new(i, RDKAFKA_CONFIG, TOPIC) do | wid, config, topic |
producer = Rdkafka::Config.new(config).producer
loop do
msg = Ractor.receive
break if msg == :shutdown # завершение работы
begin
# Парсим JSON с символами в ключах
data = JSON.parse(msg, symbolize_names: true)
# Фильтрация только разрешённых ключей
# т.к. это всего лишь тестовый пример
data.slice!(:type, :name, :a, :b)
# Анализируем сообщения
case data
in { type: 'greeting', name: String => name }
puts "[Ractor #{wid}] Greeting for #{name}"
in { type: 'sum', a: Integer => a, b: Integer => b }
puts "[Ractor #{wid}] Sum = #{a + b}"
else
puts "[Ractor #{wid}] Unrecognized event"
end
# Асинхронная отправка в Kafka без блокировки
producer.produce_async(topic: topic, payload: msg)
producer.poll(0) # Обработка колбэков
rescue JSON::ParserError
warn "[Ractor #{wid}] Invalid JSON"
rescue => e
warn "[Ractor #{wid}] Error: #{e.class} #{e.message}"
end
end
end
end
# Выводим количество обработанных событий в секунду
Thread.new do
loop do
sleep 1
count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } }
puts "[RPS] #{count} events/sec"
end
end
# HTTP endpoint для приёма событий
post '/events' do
body = request.body.read
halt 413, 'Payload too large' if body.bytesize > MAX_SIZE
halt 400, 'Empty payload' if body.empty?
# Считаем количество запросов в секунду
rps_mutex.synchronize { rps_count += 1 }
# Используем round-robin для выбора работника
worker_idx = worker_mutex.synchronize { (worker_index += 1) % WORKERS }
workers[worker_idx].send(body)
status 202
'Accepted'
end
# Завершаем работу всех Ractor'ов при выходе из программы
at_exit do
puts "[Main] Shutting down..."
workers.each { | w | w.send(:shutdown) }
workers.each(&:take) # ждём завершения
puts "[Main] All workers stopped."
end
Замена rdkafka на ruby-kafka
Вполне рабочий вариант, но, вероятно, будет работать медленнее, но это компенсируется использованием Ractor и YJIT в Ruby. Используется gem ruby-kafka вместо gem rdkafka.
# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025
require 'sinatra'
require 'kafka' # используем gem ruby-kafka
require 'json'
require 'etc'
require 'mutex_m'
set :bind, '0.0.0.0'
set :port, 4567
BROKERS = ['localhost:9092'] # ruby-kafka ожидает массив
TOPIC = 'events'
WORKERS = Etc.nprocessors
MAX_SIZE = 60_000
# Счётчик обработанных сообщений
rps_count = 0
rps_mutex = Mutex.new
# Пул Ractor'ов
worker_index = 0
worker_mutex = Mutex.new
workers = Array.new(WORKERS) do | i |
Ractor.new(i, BROKERS, TOPIC) do | wid, brokers, topic |
# Создаём Kafka-клиент и producer внутри Ractor'а
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "collector_worker_#{wid}"
)
# Синхронный продюсер (для полного контроля и ACK=all)
producer = kafka.sync_producer(
acks: :all,
max_retries: 3,
retry_backoff: 100
)
loop do
msg = Ractor.receive
break if msg == :shutdown
begin
# Парсим JSON с символами в ключах
data = JSON.parse(msg, symbolize_names: true)
# Фильтрация только разрешённых ключей
data.slice!(:type, :name, :a, :b)
# Анализируем сообщения
case data
in { type: 'greeting', name: String => name }
puts "[Ractor #{wid}] Greeting for #{name}"
in { type: 'sum', a: Integer => a, b: Integer => b }
puts "[Ractor #{wid}] Sum = #{a + b}"
else
puts "[Ractor #{wid}] Unrecognized event"
end
# Синхронная отправка (для гарантии ACK=all)
# Можно использовать async_producer, но sync проще для контроля
producer.deliver_message(msg, topic: topic)
rescue JSON::ParserError
warn "[Ractor #{wid}] Invalid JSON"
rescue => e
warn "[Ractor #{wid}] Error: #{e.class} #{e.message}"
ensure
# Важно: закрыть продюсер при выходе
producer&.close
end
end
end
end
# RPS-мониторинг
Thread.new do
loop do
sleep 1
count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } }
puts "[RPS] #{count} events/sec"
end
end
# HTTP endpoint для приёма событий
post '/events' do
body = request.body.read
halt 413, 'Payload too large' if body.bytesize > MAX_SIZE
halt 400, 'Empty payload' if body.empty?
# Считаем RPS
rps_mutex.synchronize { rps_count += 1 }
# Round-robin выбор Ractor'а
worker_idx = worker_mutex.synchronize { (worker_index += 1) % WORKERS }
workers[worker_idx].send(body)
status 202
'Accepted'
end
# Завершение работы
at_exit do
puts "[Main] Shutting down..."
# Остановить Ractor'ы
workers.each { | w | w.send(:shutdown) }
workers.each(&:take)
puts "[Main] All workers stopped."
end
Рабочий вариант
...но оказалось, что rdkafka использует переменные класса (что само по себе является небезопасной практикой - даже Matz упоминал об их нежелательном использовании в Ruby), поэтому было принято решение вынести работу с rdkafka в основной поток:
# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025
require 'sinatra'
require 'rdkafka'
require 'json'
require 'etc'
require 'thread'
require 'mutex_m'
set :bind, '0.0.0.0'
set :port, 4567
BROKERS = 'localhost:9092'
TOPIC = 'events'
WORKERS = Etc.nprocessors
MAX_SIZE = 60_000
MAX_QUEUE_SIZE = 10_000
# Конфиг Kafka
RDKAFKA_CONFIG = {
'bootstrap.servers' => BROKERS,
'acks' => 'all',
'queue.buffering.max.ms' => '5',
'batch.num.messages' => '10000'
}
# Первая очередь: для передачи событий в Ractor'ы (backpressure)
input_queue = SizedQueue.new(MAX_QUEUE_SIZE)
# Вторая очередь: для отправки обработанных данных в Kafka
events_queue = SizedQueue.new(MAX_QUEUE_SIZE)
# Главный producer (в main потоке)
producer = Rdkafka::Config.new(RDKAFKA_CONFIG).producer
# Запускаем фоновую отправку из main потока
# Поток отправки сообщений в Kafka (решает проблему Ractor::IsolationError в rdkafka)
# потому как rdkafka какого-то чёрта использует переменные класса, хотя давно
# всем известно, что их использование не рекомендует даже Matz
# поэтому работа с kafka будет производится в главном потоке
Thread.new do
loop do
msg = events_queue.pop
break if msg == :flush
begin
producer.produce_async(topic: TOPIC, payload: msg)
producer.poll(0)
rescue => e
warn "[Kafka Thread] Error: #{e.class} #{e.message}"
end
end
end
# Счётчик обработанных сообщений
rps_count = 0
rps_mutex = Mutex.new
# Пул Ractor'ов: читают из input_queue, обрабатывают, отправляют в events_queue
workers = Array.new(WORKERS) do | i |
Ractor.new(i, input_queue, events_queue) do | wid, input, output |
loop do
msg = input.pop # читаем из очереди ввода
break if msg == :shutdown
begin
data = JSON.parse(msg, symbolize_names: true)
data.slice!(:type, :name, :a, :b)
case data
in { type: 'greeting', name: String => name }
puts "[Ractor #{wid}] Greeting for #{name}"
in { type: 'sum', a: Integer => a, b: Integer => b }
puts "[Ractor #{wid}] Sum = #{a + b}"
else
puts "[Ractor #{wid}] Unrecognized event"
end
output << msg # в очередь для отправки в kafka
rescue JSON::ParserError
warn "[Ractor #{wid}] Invalid JSON"
rescue => e
warn "[Ractor #{wid}] Error: #{e.class} #{e.message}"
end
end
end
end
# RPS-мониторинг
Thread.new do
loop do
sleep 1
count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } }
puts "[RPS] #{count} events/sec"
end
end
# HTTP endpoint для приёма событий
post '/events' do
body = request.body.read
halt 413, 'Payload too large' if body.bytesize > MAX_SIZE
halt 400, 'Empty payload' if body.empty?
rps_mutex.synchronize { rps_count += 1 }
begin
# контроль переполнения
input_queue.push(body, exception: true)
rescue ThreadError
halt 429, 'Too many requests'
end
status 202
'Accepted'
end
# Завершение работы
at_exit do
puts "[Main] Shutting down..."
# 1. Отправить сигнал завершения всем Ractor'ам
workers.each { | worker | worker.send(:shutdown) }
# 2. Дождаться завершения всех Ractor'ов
workers.each(&:take)
# 3. Отправить сигнал завершения потоку Kafka
events_queue << :flush
# 4. Дождаться отправки всех сообщений
producer&.flush(5000)
puts "[Main] All workers stopped."
end
И в дополнение к нему ещё и тест нагрузки:
# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025
# test_load.rb
require 'net/http'
require 'json'
require 'uri'
require 'benchmark'
# Конфигурация теста
URL = URI.parse('http://localhost:4567/events') # Указываем правильный адрес сервера
EVENT_SIZE = 2000 # Размер тела запроса (в байтах) - регулируем по нуждам
TOTAL_REQUESTS = 10_000 # Количество запросов для теста
CONCURRENCY = 100 # Число потоков, которые будут посылать запросы
# Сгенерируем случайный JSON для запроса
def generate_random_event
{
type: 'greeting',
name: ('a'..'z').to_a.sample(10).join
}.to_json
end
# Функция для отправки одного запроса
def send_request(event)
http = Net::HTTP.new(URL.host, URL.port)
request = Net::HTTP::Post.new(URL.path, {'Content-Type' => 'application/json'})
request.body = event
http.request(request)
end
# Тест нагрузки
Benchmark.bm do | x |
x.report("load test:") do
# Используем потоки для параллельной отправки запросов
threads = Array.new(CONCURRENCY) do
Thread.new do
TOTAL_REQUESTS / CONCURRENCY.times do
event = generate_random_event
send_request(event)
end
end
end
threads.each(&:join) # Ждём завершения всех потоков
end
end
Этот код – просто пример использвания Ruby с Ractor (параллельная обработка) и Kafka, но, мало ли, вдруг кому и пригодится для реальных проектов. Рекомендую использовать Ruby с YJIT для более быстрого исполнения программы.