Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions lib/splitclient-rb/cache/repositories/segments_repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ def add_to_segment(segment)
name = segment[:name]

@adapter.initialize_set(segment_data(name)) unless @adapter.exists?(segment_data(name))

add_keys(name, segment[:added])
remove_keys(name, segment[:removed])
@internal_events_queue.push(
SplitIoClient::Engine::Models::SdkInternalEventNotification.new(
SplitIoClient::Engine::Models::SdkInternalEvent::SEGMENTS_UPDATED,
SplitIoClient::Engine::Models::EventsMetadata.new(
SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE,
[]
if segment[:added].length > 0 || segment[:removed].length > 0
@internal_events_queue.push(
SplitIoClient::Engine::Models::SdkInternalEventNotification.new(
SplitIoClient::Engine::Models::SdkInternalEvent::SEGMENTS_UPDATED,
SplitIoClient::Engine::Models::EventsMetadata.new(
SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE,
[]
)
)
)
)
end
end

def get_segment_keys(name)
Expand Down
4 changes: 2 additions & 2 deletions lib/splitclient-rb/clients/split_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ def destroy
@config.logger.info('Split client shutdown started...') if @config.debug_enabled
if !@config.cache_adapter.is_a?(SplitIoClient::Cache::Adapters::RedisAdapter) && @config.impressions_mode != :none &&
(!@impressions_repository.empty? || !@events_repository.empty?)
@config.logger.debug("Impressions and/or Events cache is not empty")
@config.logger.debug("Impressions and/or Events cache is not empty") if @config.debug_enabled
# Adding small delay to ensure sender threads are fully running
sleep(0.1)
if !@config.threads.key?(:impressions_sender) || !@config.threads.key?(:events_sender)
@config.logger.debug("Periodic data recording thread has not started yet, waiting for service startup.")
@config.logger.debug("Periodic data recording thread has not started yet, waiting for service startup.") if @config.debug_enabled
@config.threads[:start_sdk].join(5) if @config.threads.key?(:start_sdk)
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/splitclient-rb/engine/api/splits.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def since(since, since_rbs, fetch_options = { cache_control_headers: false, till

if check_last_proxy_check_timestamp
@spec_version = SplitIoClient::Spec::FeatureFlags::SPEC_VERSION
@config.logger.debug("Switching to new Feature flag spec #{@spec_version} and fetching.")
@config.logger.debug("Switching to new Feature flag spec #{@spec_version} and fetching.") if @config.debug_enabled
@old_spec_since = since
since = -1
since_rbs = -1
Expand All @@ -41,7 +41,7 @@ def since(since, since_rbs, fetch_options = { cache_control_headers: false, till

params[:sets] = @flag_sets_filter.join(",") unless @flag_sets_filter.empty?
params[:till] = fetch_options[:till] unless fetch_options[:till].nil?
@config.logger.debug("Fetching from splitChanges with #{params}: ")
@config.logger.debug("Fetching from splitChanges with #{params}: ") if @config.debug_enabled
response = get_api("#{@config.base_uri}/splitChanges", @api_key, params, fetch_options[:cache_control_headers])
if response.status == 414
@config.logger.error("Error fetching feature flags; the amount of flag sets provided are too big, causing uri length error.")
Expand Down
10 changes: 7 additions & 3 deletions lib/splitclient-rb/engine/auth_api_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ def authenticate(api_key)
return process_error(response) if response.status >= 400 && response.status < 500

@telemetry_runtime_producer.record_sync_error(Telemetry::Domain::Constants::TOKEN_SYNC, response.status.to_i)
@config.logger.debug("Error connecting to: #{@config.auth_service_url}. Response status: #{response.status}")
if @config.debug_enabled
@config.logger.debug("Error connecting to: #{@config.auth_service_url}. Response status: #{response.status}")
end
{ push_enabled: false, retry: true }
rescue StandardError => e
@config.logger.debug("AuthApiClient error: #{e.inspect}.")
@config.logger.debug("AuthApiClient error: #{e.inspect}.") if @config.debug_enabled
{ push_enabled: false, retry: false }
end

Expand All @@ -51,7 +53,9 @@ def decode_token(token)
end

def process_error(response)
@config.logger.debug("Error connecting to: #{@config.auth_service_url}. Response status: #{response.status}")
if @config.debug_enabled
@config.logger.debug("Error connecting to: #{@config.auth_service_url}. Response status: #{response.status}")
end
@telemetry_runtime_producer.record_auth_rejections if response.status == 401

{ push_enabled: false, retry: false }
Expand Down
22 changes: 14 additions & 8 deletions lib/splitclient-rb/engine/events/events_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ def register(sdk_event, event_handler)

@mutex.synchronize do
# SDK ready already fired
if sdk_event == SplitIoClient::Engine::Models::SdkEvent::SDK_READY && event_already_triggered(sdk_event)
@active_subscriptions[sdk_event] = SplitIoClient::Engine::Models::EventActiveSubscriptions.new(true, event_handler)
if sdk_event == Engine::Models::SdkEvent::SDK_READY && event_already_triggered(sdk_event)
@active_subscriptions[sdk_event] = Engine::Models::EventActiveSubscriptions.new(true, event_handler)
@config.logger.debug('EventsManager: Firing SDK_READY event for new subscription') if @config.debug_enabled
fire_sdk_event(sdk_event, nil)
return
end

@config.logger.debug("EventsManager: Register event: #{sdk_event}") if @config.debug_enabled
@active_subscriptions[sdk_event] = SplitIoClient::Engine::Models::EventActiveSubscriptions.new(false, event_handler)
@active_subscriptions[sdk_event] = Engine::Models::EventActiveSubscriptions.new(false, event_handler)
end
end

Expand All @@ -48,7 +48,7 @@ def notify_internal_event(sdk_internal_event, event_metadata)
end

# if client is not subscribed to SDK_READY
if sorted_event == SplitIoClient::Engine::Models::SdkEvent::SDK_READY && get_event_handler(sorted_event).nil?
if check_if_register_needed(sorted_event)
@config.logger.debug('EventsManager: Registering SDK_READY event as fired') if @config.debug_enabled
@active_subscriptions[Engine::Models::SdkEvent::SDK_READY] = Engine::Models::EventActiveSubscriptions.new(true, nil)
end
Expand All @@ -65,6 +65,12 @@ def destroy

private

def check_if_register_needed(sorted_event)
sorted_event == Engine::Models::SdkEvent::SDK_READY &&
get_event_handler(sorted_event).nil? &&
!@active_subscriptions.include?(sorted_event)
end

def fire_sdk_event(sdk_event, event_metadata)
@config.logger.debug("EventsManager: Firing Sdk event: #{sdk_event}") if @config.debug_enabled
@config.threads[:sdk_event_notify] = Thread.new do
Expand Down Expand Up @@ -104,15 +110,15 @@ def get_event_handler(sdk_event)
end

def get_sdk_event_if_applicable(sdk_internal_event)
final_sdk_event = SplitIoClient::Engine::Models::ValidSdkEvent.new(nil, false)
final_sdk_event = Engine::Models::ValidSdkEvent.new(nil, false)

events_to_fire = []
require_any_sdk_event = check_require_any(sdk_internal_event)
if require_any_sdk_event.valid
if (!event_already_triggered(require_any_sdk_event.sdk_event) &&
execution_limit(require_any_sdk_event.sdk_event) == 1) ||
execution_limit(require_any_sdk_event.sdk_event) == -1
final_sdk_event = SplitIoClient::Engine::Models::ValidSdkEvent.new(
final_sdk_event = Engine::Models::ValidSdkEvent.new(
require_any_sdk_event.sdk_event,
check_prerequisites(require_any_sdk_event.sdk_event) &&
check_suppressed_by(require_any_sdk_event.sdk_event)
Expand Down Expand Up @@ -172,10 +178,10 @@ def execution_limit(sdk_event)
end

def check_require_any(sdk_internal_event)
valid_sdk_event = SplitIoClient::Engine::Models::ValidSdkEvent.new(nil, false)
valid_sdk_event = Engine::Models::ValidSdkEvent.new(nil, false)
@manager_config.require_any.each do |name, val|
if val.include?(sdk_internal_event)
valid_sdk_event = SplitIoClient::Engine::Models::ValidSdkEvent.new(name, true)
valid_sdk_event = Engine::Models::ValidSdkEvent.new(name, true)
return valid_sdk_event
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# frozen_string_literal: false

module SplitIoClient
module Engine::Models
class SdkInternalEventNotification
attr_reader :internal_event, :metadata
module Engine
module Models
class SdkInternalEventNotification
attr_reader :internal_event, :metadata

def initialize(internal_event, metadata)
@internal_event = internal_event
@metadata = metadata
def initialize(internal_event, metadata)
@internal_event = internal_event
@metadata = metadata
end
end
end
end
Expand Down
16 changes: 10 additions & 6 deletions lib/splitclient-rb/engine/sync_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ def start_thread
connected = false

if @config.streaming_enabled
@config.logger.debug('Starting Streaming mode ...')
@config.logger.debug('Starting Streaming mode ...') if @config.debug_enabled
start_push_status_monitor
connected = @push_manager.start_sse
end

unless connected
@config.logger.debug('Starting Polling mode ...')
@config.logger.debug('Starting Polling mode ...') if @config.debug_enabled
@synchronizer.start_periodic_fetch
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
end
Expand Down Expand Up @@ -92,7 +92,7 @@ def process_push_shutdown

def process_connected
if @sse_connected.value
@config.logger.debug('Streaming already connected.')
@config.logger.debug('Streaming already connected.') if @config.debug_enabled
return
end

Expand All @@ -107,7 +107,7 @@ def process_connected

def process_forced_stop
unless @sse_connected.value
@config.logger.debug('Streaming already disconnected.')
@config.logger.debug('Streaming already disconnected.') if @config.debug_enabled
return
end

Expand All @@ -120,7 +120,7 @@ def process_forced_stop

def process_disconnect(reconnect)
unless @sse_connected.value
@config.logger.debug('Streaming already disconnected.')
@config.logger.debug('Streaming already disconnected.') if @config.debug_enabled
return
end

Expand Down Expand Up @@ -169,12 +169,16 @@ def incoming_push_status_handler
when Constants::PUSH_SUBSYSTEM_OFF
process_push_shutdown
else
@config.logger.debug('Incorrect push status type.')
log_if_debug('Incorrect push status type.')
end
end
rescue StandardError => e
@config.logger.error("Push status handler error: #{e.inspect}")
end
end

def log_if_debug(msg)
@config.logger.debug(msg) if @config.debug_enabled
end
end
end
27 changes: 13 additions & 14 deletions lib/splitclient-rb/sse/event_source/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ def initialize(config,

def close(status = nil)
unless connected?
@config.logger.debug('SSEClient already disconected.')
@config.logger.debug('SSEClient already disconected.') if @config.debug_enabled
return
end
@config.logger.debug("Closing SSEClient socket")
@config.logger.debug("Closing SSEClient socket") if @config.debug_enabled

push_status(status)
@connected.make_false
@socket.sync_close = true if @socket.is_a? OpenSSL::SSL::SSLSocket
@socket.close
@config.logger.debug("SSEClient socket state #{@socket.state}") if @socket.is_a? OpenSSL::SSL::SSLSocket
@config.logger.debug("SSEClient socket state #{@socket.state}") if @socket.is_a? OpenSSL::SSL::SSLSocket && @config.debug_enabled
rescue StandardError => e
@config.logger.error("SSEClient close Error: #{e.inspect}")
end

def start(url)
if connected?
@config.logger.debug('SSEClient already running.')
@config.logger.debug('SSEClient already running.') if @config.debug_enabled
return true
end

Expand Down Expand Up @@ -96,18 +96,17 @@ def connect_stream(latch)

raise 'eof exception' if partial_data == :eof
rescue IO::WaitReadable => e
@config.logger.debug("SSE client IO::WaitReadable transient error: #{e.inspect}")
@config.logger.debug("SSE client IO::WaitReadable transient error: #{e.inspect}") if @config.debug_enabled
IO.select([@socket], nil, nil, @read_timeout)
retry
rescue Errno::EAGAIN => e
@config.logger.debug("SSE client transient error: #{e.inspect}")
@config.logger.debug("SSE client transient error: #{e.inspect}") if @config.debug_enabled
IO.select([@socket], nil, nil, @read_timeout)
retry
rescue Errno::ETIMEDOUT => e
@config.logger.error("SSE read operation timed out!: #{e.inspect}")
return Constants::PUSH_RETRYABLE_ERROR
rescue EOFError => e
puts "SSE read operation EOF Exception!: #{e.inspect}"
@config.logger.error("SSE read operation EOF Exception!: #{e.inspect}")
raise 'eof exception'
rescue Errno::EBADF, IOError => e
Expand All @@ -125,12 +124,12 @@ def connect_stream(latch)
return Constants::PUSH_RETRYABLE_ERROR
end
rescue Errno::EBADF
@config.logger.debug("SSE socket is not connected (Errno::EBADF)")
@config.logger.debug("SSE socket is not connected (Errno::EBADF)") if @config.debug_enabled
break
rescue RuntimeError
raise 'eof exception'
rescue Exception => e
@config.logger.debug("SSE socket is not connected: #{e.inspect}")
@config.logger.debug("SSE socket is not connected: #{e.inspect}") if @config.debug_enabled
break
end

Expand All @@ -156,7 +155,7 @@ def read_first_event(data, latch)
return unless @first_event.value

response_code = @event_parser.first_event(data)
@config.logger.debug("SSE client first event code: #{response_code}")
@config.logger.debug("SSE client first event code: #{response_code}") if @config.debug_enabled

error_event = false
events = @event_parser.parse(data)
Expand All @@ -165,7 +164,7 @@ def read_first_event(data, latch)

if response_code == OK_CODE && !error_event
@connected.make_true
@config.logger.debug("SSE client first event Connected is true")
@config.logger.debug("SSE client first event Connected is true") if @config.debug_enabled
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SSE_CONNECTION_ESTABLISHED, nil)
push_status(Constants::PUSH_CONNECTED)
end
Expand Down Expand Up @@ -202,7 +201,7 @@ def socket_connect
end

def process_data(partial_data)
@config.logger.debug("Event partial data: #{partial_data}")
@config.logger.debug("Event partial data: #{partial_data}") if @config.debug_enabled
return if partial_data.nil? || partial_data == KEEP_ALIVE_RESPONSE

events = @event_parser.parse(partial_data)
Expand All @@ -220,7 +219,7 @@ def build_request(uri)
req << "SplitSDKMachineName: #{@config.machine_name}\r\n"
req << "SplitSDKClientKey: #{@api_key.split(//).last(4).join}\r\n" unless @api_key.nil?
req << "Cache-Control: no-cache\r\n\r\n"
@config.logger.debug("Request info: #{req}")
@config.logger.debug("Request info: #{req}") if @config.debug_enabled
req
end

Expand Down Expand Up @@ -255,7 +254,7 @@ def dispatch_event(event)
def push_status(status)
return if status.nil?

@config.logger.debug("Pushing new sse status: #{status}")
@config.logger.debug("Pushing new sse status: #{status}") if @config.debug_enabled
@status_queue.push(status)
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/splitclient-rb/sse/event_source/event_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ def parse(raw_event)

events
rescue StandardError => e
@config.logger.debug("Error during parsing a event: #{e.inspect}")
@config.logger.debug("Error during parsing a event: #{e.inspect}") if @config.debug_enabled
[]
end

def first_event(raw_data)
raw_data.split("\n")[0].split(' ')[1].to_i
rescue StandardError => e
@config.logger.debug("Error parsing first event: #{e.inspect}")
@config.logger.error("Error parsing first event: #{e.inspect}")
BAD_REQUEST_CODE
end

Expand Down
6 changes: 3 additions & 3 deletions lib/splitclient-rb/sse/notification_manager_keeper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def process_event_control(type)
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::STREAMING_STATUS, DISABLED)
push_status(Constants::PUSH_SUBSYSTEM_OFF)
else
@config.logger.error("Incorrect event type: #{incoming_notification}")
@config.logger.error("Incorrect event type: #{incoming_notification}") if @config.debug_enabled
end
end

def process_event_occupancy(channel, publishers)
@config.logger.debug("Processed occupancy event with #{publishers} publishers. Channel: #{channel}")
@config.logger.debug("Processed occupancy event with #{publishers} publishers. Channel: #{channel}") if @config.debug_enabled

update_publishers(channel, publishers)

Expand Down Expand Up @@ -76,7 +76,7 @@ def are_publishers_available?
end

def push_status(status)
@config.logger.debug("Pushing occupancy status: #{status}")
@config.logger.debug("Pushing occupancy status: #{status}") if @config.debug_enabled
@status_queue.push(status)
end
end
Expand Down
Loading