Methods
- A
- L
- N
- R
- S
Class Public methods
new(adapter, config_options, executor) Link
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 79 def initialize(adapter, config_options, executor) super(executor) @adapter = adapter @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @reconnect_attempt = 0 # Use the same config as used by Redis conn @reconnect_attempts = config_options.fetch(:reconnect_attempts, 1) @reconnect_attempts = Array.new(@reconnect_attempts, 0) if @reconnect_attempts.is_a?(Integer) @subscribed_client = nil @when_connected = [] @thread = nil end
Instance Public methods
add_channel(channel, on_success) Link
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 144 def add_channel(channel, on_success) @subscription_lock.synchronize do ensure_listener_running @subscribe_callbacks[channel] << on_success when_connected { @subscribed_client.call("subscribe", channel) } end end
listen(conn) Link
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 99 def listen(conn) pubsub_client = conn.pubsub @reconnect_attempt = 0 @subscribed_client = pubsub_client until @when_connected.empty? @when_connected.shift.call end loop do type, chan, message = pubsub_client.next_event(60) case type when "subscribe", "psubscribe" if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift @executor.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end when "message", "pmessage" broadcast(chan, message) when "unsubscribe", "punsubscribe" if message == 0 @subscription_lock.synchronize do @subscribed_client = nil end break end end end end