Обработка запросов json на Ruby + Ractor + Kafka
Довелось как-то пообщаться на Хабре с одним умником, который возомнил себя... уж не знаю кем возмнил, но сыпал терминами "GVL (GIL) Ruby", "CPU-bound", “IO-bound”. Таких "знатоков", к сожалению, полно, но это может произвести только лишь первое впечатление на незнающего работодателя. В общем, мне был брошен типа вызов:
Ну, давайте с относительно простой задачки начнём. Напишите тривиальный коллектор событий с фронта. Одна ручка, на которую можно прислать JSON (ориентировочно размером от 1 до 50 кб) и эти события надо сложить в Кафку с полным акком. Ориентировочная нагрузка: 10k событий в секунду.
Сомнения возникли в реальности такого механизма хотя бы в том, что уж больно какие-то нереальные (по нынешним временам - август 2025) нагрузки - обработка 10K x 50K ~= 500Mb/s. Пусть это так и останется чисто гипотетическим предположением о продобных нагрузках. И тем не менее, код сделан как раз с учётом больших нагрузок - Ractor, Kafka.
В общем, в результате изначально получился у меня вот такой код, но рабочий вариант чуть ниже. В этом коде всё хорошо, но проблема в том, что rdkafka
работает с переменными класса, что не позволяет ему корректно работать в параллельных изолированных потоках Ractor. Если когда-нибудь исправят в gem rdkafka
, то это будет вполне работоспособный код.
# encoding: utf-8 # frozen_string_literal: true # # @author ESV Corp. © 08.2025 require 'sinatra' require 'rdkafka' require 'json' require 'etc' require 'mutex_m' set :bind, '0.0.0.0' set :port, 4567 BROKERS = 'localhost:9092' TOPIC = 'events' WORKERS = Etc.nprocessors MAX_SIZE = 60_000 # Конфиг Kafka RDKAFKA_CONFIG = { 'bootstrap.servers' => BROKERS, 'acks' => 'all', 'queue.buffering.max.ms' => '5', 'batch.num.messages' => '10000' } # Счётчик обработанных сообщений rps_count = 0 rps_mutex = Mutex.new # Пул Ractor-ов worker_index = 0 worker_mutex = Mutex.new workers = Array.new(WORKERS) do | i | Ractor.new(i, RDKAFKA_CONFIG, TOPIC) do | wid, config, topic | producer = Rdkafka::Config.new(config).producer loop do msg = Ractor.receive break if msg == :shutdown # завершение работы begin # Парсим JSON с символами в ключах data = JSON.parse(msg, symbolize_names: true) # Фильтрация только разрешённых ключей # т.к. это всего лишь тестовый пример data.slice!(:type, :name, :a, :b) # Анализируем сообщения case data in { type: 'greeting', name: String => name } puts "[Ractor #{wid}] Greeting for #{name}" in { type: 'sum', a: Integer => a, b: Integer => b } puts "[Ractor #{wid}] Sum = #{a + b}" else puts "[Ractor #{wid}] Unrecognized event" end # Асинхронная отправка в Kafka без блокировки producer.produce_async(topic: topic, payload: msg) producer.poll(0) # Обработка колбэков rescue JSON::ParserError warn "[Ractor #{wid}] Invalid JSON" rescue => e warn "[Ractor #{wid}] Error: #{e.class} #{e.message}" end end end end # Выводим количество обработанных событий в секунду Thread.new do loop do sleep 1 count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } } puts "[RPS] #{count} events/sec" end end # HTTP endpoint для приёма событий post '/events' do body = request.body.read halt 413, 'Payload too large' if body.bytesize > MAX_SIZE halt 400, 'Empty payload' if body.empty? # Считаем количество запросов в секунду rps_mutex.synchronize { rps_count += 1 } # Используем round-robin для выбора работника worker_idx = worker_mutex.synchronize { (worker_index += 1) % WORKERS } workers[worker_idx].send(body) status 202 'Accepted' end # Завершаем работу всех Ractor'ов при выходе из программы at_exit do puts "[Main] Shutting down..." workers.each { | w | w.send(:shutdown) } workers.each(&:take) # ждём завершения puts "[Main] All workers stopped." end
Замена rdkafka на ruby-kafka
Вполне рабочий вариант, но, вероятно, будет работать медленнее, но это компенсируется использованием Ractor и YJIT в Ruby. Используется gem ruby-kafka
вместо gem rdkafka.
# encoding: utf-8 # frozen_string_literal: true # # @author ESV Corp. © 08.2025 require 'sinatra' require 'kafka' # используем gem ruby-kafka require 'json' require 'etc' require 'mutex_m' set :bind, '0.0.0.0' set :port, 4567 BROKERS = ['localhost:9092'] # ruby-kafka ожидает массив TOPIC = 'events' WORKERS = Etc.nprocessors MAX_SIZE = 60_000 # Счётчик обработанных сообщений rps_count = 0 rps_mutex = Mutex.new # Пул Ractor'ов worker_index = 0 worker_mutex = Mutex.new workers = Array.new(WORKERS) do | i | Ractor.new(i, BROKERS, TOPIC) do | wid, brokers, topic | # Создаём Kafka-клиент и producer внутри Ractor'а kafka = Kafka.new( seed_brokers: brokers, client_id: "collector_worker_#{wid}" ) # Синхронный продюсер (для полного контроля и ACK=all) producer = kafka.sync_producer( acks: :all, max_retries: 3, retry_backoff: 100 ) loop do msg = Ractor.receive break if msg == :shutdown begin # Парсим JSON с символами в ключах data = JSON.parse(msg, symbolize_names: true) # Фильтрация только разрешённых ключей data.slice!(:type, :name, :a, :b) # Анализируем сообщения case data in { type: 'greeting', name: String => name } puts "[Ractor #{wid}] Greeting for #{name}" in { type: 'sum', a: Integer => a, b: Integer => b } puts "[Ractor #{wid}] Sum = #{a + b}" else puts "[Ractor #{wid}] Unrecognized event" end # Синхронная отправка (для гарантии ACK=all) # Можно использовать async_producer, но sync проще для контроля producer.deliver_message(msg, topic: topic) rescue JSON::ParserError warn "[Ractor #{wid}] Invalid JSON" rescue => e warn "[Ractor #{wid}] Error: #{e.class} #{e.message}" ensure # Важно: закрыть продюсер при выходе producer&.close end end end end # RPS-мониторинг Thread.new do loop do sleep 1 count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } } puts "[RPS] #{count} events/sec" end end # HTTP endpoint для приёма событий post '/events' do body = request.body.read halt 413, 'Payload too large' if body.bytesize > MAX_SIZE halt 400, 'Empty payload' if body.empty? # Считаем RPS rps_mutex.synchronize { rps_count += 1 } # Round-robin выбор Ractor'а worker_idx = worker_mutex.synchronize { (worker_index += 1) % WORKERS } workers[worker_idx].send(body) status 202 'Accepted' end # Завершение работы at_exit do puts "[Main] Shutting down..." # Остановить Ractor'ы workers.each { | w | w.send(:shutdown) } workers.each(&:take) puts "[Main] All workers stopped." end
Рабочий вариант
...но оказалось, что rdkafka
использует переменные класса (что само по себе является небезопасной практикой - даже Matz упоминал об их нежелательном использовании в Ruby), поэтому было принято решение вынести работу с rdkafka
в основной поток:
# encoding: utf-8 # frozen_string_literal: true # # @author ESV Corp. © 08.2025 require 'sinatra' require 'rdkafka' require 'json' require 'etc' require 'thread' require 'mutex_m' set :bind, '0.0.0.0' set :port, 4567 BROKERS = 'localhost:9092' TOPIC = 'events' WORKERS = Etc.nprocessors MAX_SIZE = 60_000 MAX_QUEUE_SIZE = 10_000 # Конфиг Kafka RDKAFKA_CONFIG = { 'bootstrap.servers' => BROKERS, 'acks' => 'all', 'queue.buffering.max.ms' => '5', 'batch.num.messages' => '10000' } # Первая очередь: для передачи событий в Ractor'ы (backpressure) input_queue = SizedQueue.new(MAX_QUEUE_SIZE) # Вторая очередь: для отправки обработанных данных в Kafka events_queue = SizedQueue.new(MAX_QUEUE_SIZE) # Главный producer (в main потоке) producer = Rdkafka::Config.new(RDKAFKA_CONFIG).producer # Запускаем фоновую отправку из main потока # Поток отправки сообщений в Kafka (решает проблему Ractor::IsolationError в rdkafka) # потому как rdkafka какого-то чёрта использует переменные класса, хотя давно # всем известно, что их использование не рекомендует даже Matz # поэтому работа с kafka будет производится в главном потоке Thread.new do loop do msg = events_queue.pop break if msg == :flush begin producer.produce_async(topic: TOPIC, payload: msg) producer.poll(0) rescue => e warn "[Kafka Thread] Error: #{e.class} #{e.message}" end end end # Счётчик обработанных сообщений rps_count = 0 rps_mutex = Mutex.new # Пул Ractor'ов: читают из input_queue, обрабатывают, отправляют в events_queue workers = Array.new(WORKERS) do | i | Ractor.new(i, input_queue, events_queue) do | wid, input, output | loop do msg = input.pop # читаем из очереди ввода break if msg == :shutdown begin data = JSON.parse(msg, symbolize_names: true) data.slice!(:type, :name, :a, :b) case data in { type: 'greeting', name: String => name } puts "[Ractor #{wid}] Greeting for #{name}" in { type: 'sum', a: Integer => a, b: Integer => b } puts "[Ractor #{wid}] Sum = #{a + b}" else puts "[Ractor #{wid}] Unrecognized event" end output << msg # в очередь для отправки в kafka rescue JSON::ParserError warn "[Ractor #{wid}] Invalid JSON" rescue => e warn "[Ractor #{wid}] Error: #{e.class} #{e.message}" end end end end # RPS-мониторинг Thread.new do loop do sleep 1 count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } } puts "[RPS] #{count} events/sec" end end # HTTP endpoint для приёма событий post '/events' do body = request.body.read halt 413, 'Payload too large' if body.bytesize > MAX_SIZE halt 400, 'Empty payload' if body.empty? rps_mutex.synchronize { rps_count += 1 } begin # контроль переполнения input_queue.push(body, exception: true) rescue ThreadError halt 429, 'Too many requests' end status 202 'Accepted' end # Завершение работы at_exit do puts "[Main] Shutting down..." # 1. Отправить сигнал завершения всем Ractor'ам workers.each { | worker | worker.send(:shutdown) } # 2. Дождаться завершения всех Ractor'ов workers.each(&:take) # 3. Отправить сигнал завершения потоку Kafka events_queue << :flush # 4. Дождаться отправки всех сообщений producer&.flush(5000) puts "[Main] All workers stopped." end
И в дополнение к нему ещё и тест нагрузки:
# encoding: utf-8 # frozen_string_literal: true # # @author ESV Corp. © 08.2025 # test_load.rb require 'net/http' require 'json' require 'uri' require 'benchmark' # Конфигурация теста URL = URI.parse('http://localhost:4567/events') # Указываем правильный адрес сервера EVENT_SIZE = 2000 # Размер тела запроса (в байтах) - регулируем по нуждам TOTAL_REQUESTS = 10_000 # Количество запросов для теста CONCURRENCY = 100 # Число потоков, которые будут посылать запросы # Сгенерируем случайный JSON для запроса def generate_random_event { type: 'greeting', name: ('a'..'z').to_a.sample(10).join }.to_json end # Функция для отправки одного запроса def send_request(event) http = Net::HTTP.new(URL.host, URL.port) request = Net::HTTP::Post.new(URL.path, {'Content-Type' => 'application/json'}) request.body = event http.request(request) end # Тест нагрузки Benchmark.bm do | x | x.report("load test:") do # Используем потоки для параллельной отправки запросов threads = Array.new(CONCURRENCY) do Thread.new do TOTAL_REQUESTS / CONCURRENCY.times do event = generate_random_event send_request(event) end end end threads.each(&:join) # Ждём завершения всех потоков end end
Этот код – просто пример использвания Ruby с Ractor (параллельная обработка) и Kafka, но, мало ли, вдруг кому и пригодится для реальных проектов. Рекомендую использовать Ruby с YJIT для более быстрого исполнения программы.