UPPDATERAD EDIT AT END:Visar fungerande kod. Huvudmodulen oförändrad förutom felsökningskoden. Obs! Jag upplevde problemet som jag redan noterat angående behovet av att avsluta prenumerationen före uppsägning.
Koden ser korrekt ut. Jag skulle vilja se hur du instansierar det.
I config/application.rb har du förmodligen åtminstone något som:
require 'ws_communication'
config.middleware.use WsCommunication
Sedan, i din JavaScript-klient, bör du ha något sånt här:
var ws = new WebSocket(uri);
Instantierar du en annan instans av WsCommunication? Det skulle ställa @clients till en tom array och kan visa dina symtom. Något sådant här skulle vara felaktigt:
var ws = new WsCommunication;
Det skulle hjälpa oss om du visar klienten och kanske config/application.rb om det här inlägget inte hjälper.
Förresten, jag håller med kommentaren att @klienter ska skyddas av en mutex på alla uppdateringar, om inte läser också. Det är en dynamisk struktur som kan förändras när som helst i ett händelsestyrt system. redis-mutex är ett bra alternativ. (Hoppas att länken är korrekt eftersom Github verkar slänga 500 fel på allt för tillfället.)
Du kanske också noterar att $redis.publish returnerar ett heltalsvärde av antalet klienter som tog emot meddelandet.
Slutligen kanske du upptäcker att du måste se till att din kanal avslutas innan prenumerationen avslutas. Jag har haft situationer där jag har skickat varje meddelande flera, till och med många, gånger på grund av tidigare prenumerationer på samma kanal som inte städats. Eftersom du prenumererar på kanalen i en tråd, måste du avsluta prenumerationen inom samma tråd, annars kommer processen bara att "hänga" i väntan på att rätt tråd ska dyka upp på ett magiskt sätt. Jag hanterar den situationen genom att sätta en "unsubscribe"-flagga och sedan skicka ett meddelande. Sedan, inom on.message-blocket, testar jag för flaggan för att avbryta prenumerationen och utfärdar avregistreringen där.
Modulen du tillhandahöll, med endast mindre felsökningsändringar:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
Testprenumerationskoden jag angav:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
Testpubliceringskoden jag angav. Publisher och Prenumerant kan lätt kombineras, eftersom dessa bara är tester:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
Ett exempel på config.ru som kör allt detta i rackmellanvarulagret:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
Det här är Main. Jag tog bort det från min körversion så det kan behöva justeras om du använder det:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end