Class: Carnivore::Source Abstract

Inherits:
Object
  • Object
show all
Includes:
Utils::Logging, Utils::Logging, Celluloid
Defined in:
lib/carnivore/source.rb,
lib/carnivore/spec_helper.rb,
lib/carnivore/source/test.rb,
lib/carnivore/source_container.rb

Overview

This class is abstract.

Message source

Direct Known Subclasses

Spec, Test

Defined Under Namespace

Classes: SourceContainer, Spec, Test

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Source) initialize(args = {})

Create new Source

Parameters:

  • args (Hash) (defaults to: {})

Options Hash (args):

  • :name (String, Symbol)

    name of source

  • :auto_process (TrueClass, FalseClass)

    start processing on initialization

  • :auto_confirm (TrueClass, FalseClass)

    confirm messages automatically on receive

  • :orphan_callback (Proc)

    execute block when no callbacks are valid for message

  • :prevent_duplicates (TrueClass, FalseClass)

    setup and use message registry

  • :callbacks (Array<Callback>)

    callbacks to register on this source



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/carnivore/source.rb', line 135

def initialize(args={})
  @args = Smash.new(args)
  @callbacks = []
  @message_loop = Queue.new
  @message_remote = Queue.new
  @callback_names = {}
  @auto_process = !!args.fetch(:auto_process, true)
  @run_process = true
  @auto_confirm = !!args[:auto_confirm]
  @callback_supervisor = Carnivore::Supervisor.create!.last
  if(args[:orphan_callback])
    unless(args[:orphan_callback].is_a?(Proc))
      raise TypeError.new("Expected `Proc` type for `orphan_callback` but received `#{args[:orphan_callback].class}`")
    end
    define_singleton_method(:orphan_callback, &args[:orphan_callback])
  end
  if(args[:prevent_duplicates])
    init_registry
  end
  @processing = false
  @name = args[:name] || Celluloid.uuid
  if(args[:callbacks])
    args[:callbacks].each do |name, block|
      add_callback(name, block)
    end
  end
  setup(args)
  connect
  if(auto_process && !callbacks.empty?)
    async.process
  end
rescue => e
  debug "Failed to initialize: #{self} - #{e.class}: #{e}\n#{e.backtrace.join("\n")}"
  raise
end

Instance Attribute Details

- (TrueClass, FalseClass) auto_confirm (readonly)

Returns auto confirm received messages

Returns:

  • (TrueClass, FalseClass)

    auto confirm received messages



110
111
112
# File 'lib/carnivore/source.rb', line 110

def auto_confirm
  @auto_confirm
end

- (TrueClass, FalseClass) auto_process (readonly)

Returns start source processing on initialization

Returns:

  • (TrueClass, FalseClass)

    start source processing on initialization



112
113
114
# File 'lib/carnivore/source.rb', line 112

def auto_process
  @auto_process
end

- (Carnivore::Supervisor) callback_supervisor (readonly)

Returns supervisor maintaining callback instances

Returns:



116
117
118
# File 'lib/carnivore/source.rb', line 116

def callback_supervisor
  @callback_supervisor
end

- (Array<Callback>) callbacks (readonly)

Returns registered callbacks

Returns:

  • (Array<Callback>)

    registered callbacks



108
109
110
# File 'lib/carnivore/source.rb', line 108

def callbacks
  @callbacks
end

- (Queue) message_loop (readonly)

Returns local loop message queue

Returns:

  • (Queue)

    local loop message queue



120
121
122
# File 'lib/carnivore/source.rb', line 120

def message_loop
  @message_loop
end

- (Hash) message_registry (readonly)

Returns registry of processed messages

Returns:

  • (Hash)

    registry of processed messages



118
119
120
# File 'lib/carnivore/source.rb', line 118

def message_registry
  @message_registry
end

- (Queue) message_remote (readonly)

Returns remote message queue

Returns:

  • (Queue)

    remote message queue



122
123
124
# File 'lib/carnivore/source.rb', line 122

def message_remote
  @message_remote
end

- (String, Symbol) name (readonly)

Returns name of source

Returns:

  • (String, Symbol)

    name of source



106
107
108
# File 'lib/carnivore/source.rb', line 106

def name
  @name
end

- (TrueClass, FalseClass) processing (readonly)

Returns currently processing a message

Returns:

  • (TrueClass, FalseClass)

    currently processing a message



124
125
126
# File 'lib/carnivore/source.rb', line 124

def processing
  @processing
end

- (TrueClass, FalseClass) run_process (readonly)

Returns message processing control switch

Returns:

  • (TrueClass, FalseClass)

    message processing control switch



114
115
116
# File 'lib/carnivore/source.rb', line 114

def run_process
  @run_process
end

Class Method Details

+ (SourceContainer) build(args = {})

Builds a source container

Parameters:

  • args (Hash) (defaults to: {})

    source configuration

Options Hash (args):

  • :type (String, Symbol)

    type of source to build

  • :args (Hash)

    configuration hash for source initialization

Returns:



20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/carnivore/source.rb', line 20

def build(args={})
  [:args, :type].each do |key|
    unless(args.has_key?(key))
      abort ArgumentError.new "Missing required parameter `:#{key}`"
    end
  end
  require Source.require_path(args[:type]) || "carnivore/source/#{args[:type]}"
  klass = args[:type].to_s.split('_').map(&:capitalize).join
  klass = Source.const_get(klass)
  args[:args][:name] ||= Celluloid.uuid
  inst = SourceContainer.new(klass, args[:args])
  register(args[:args][:name], inst)
  inst
end

+ (TrueClass) provide(type, require_path)

Register a new source type

Parameters:

  • type (Symbol)

    name of source type

  • require_path (String)

    path to require when requested

Returns:

  • (TrueClass)


40
41
42
43
44
# File 'lib/carnivore/source.rb', line 40

def provide(type, require_path)
  @source_klass ||= Smash.new
  @source_klass[type] = require_path
  true
end

+ (TrueClass) register(name, inst)

Register the container

Parameters:

Returns:

  • (TrueClass)


60
61
62
63
64
# File 'lib/carnivore/source.rb', line 60

def register(name, inst)
  @sources ||= Smash.new
  @sources[name] = inst
  true
end

+ (String, NilClass) require_path(type)

Registered path for given source type

Parameters:

  • type (String, Symbol)

    name of source type

Returns:

  • (String, NilClass)


50
51
52
53
# File 'lib/carnivore/source.rb', line 50

def require_path(type)
  @source_klass ||= Smash.new
  @source_klass[type]
end

+ (Object) reset_comms!

Reset communication methods within class



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/carnivore/source.rb', line 85

def reset_comms!
  self.class_eval do
    unless(method_defined?(:reset_communications?))
      alias_method :custom_transmit, :transmit
      alias_method :transmit, :_transmit
      def reset_communications?
        true
      end
    end
  end
end

+ (SourceContainer) source(name)

Source container with given name

Parameters:

  • name (String, Symbol)

    name of source

Returns:



70
71
72
73
74
75
76
77
# File 'lib/carnivore/source.rb', line 70

def source(name)
  if(@sources && @sources[name.to_sym])
    @sources[name.to_sym]
  else
    Celluloid.logger.error "Source lookup failed (name: #{name})"
    abort KeyError.new("Requested named source is not registered: #{name}")
  end
end

+ (Array<SourceContainer>) sources

Returns registered source containers

Returns:



80
81
82
# File 'lib/carnivore/source.rb', line 80

def sources
  @sources ? @sources.values : []
end

Instance Method Details

- (TrueClass) _transmit(*args)

Send to local loop if processing otherwise use regular transmit

Parameters:

  • args (Object)

    argument list

Returns:

  • (TrueClass)


395
396
397
398
399
400
401
402
# File 'lib/carnivore/source.rb', line 395

def _transmit(*args)
  if(loop_enabled? && processing)
    loop_transmit(*args)
  else
    custom_transmit(*args)
  end
  true
end

- (self) add_callback(callback_name, block_or_class)

Adds the given callback to the source for message processing

Parameters:

  • callback_name (String, Symbol)

    name of callback

  • block_or_class (Carnivore::Callback, Proc)

Returns:

  • (self)


236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/carnivore/source.rb', line 236

def add_callback(callback_name, block_or_class)
  name = "#{self.name}:#{callback_name}"
  if(block_or_class.is_a?(Class))
    size = block_or_class.workers || 1
    if(size < 1)
      warn "Callback class (#{block_or_class}) defined no workers. Skipping."
      return self
    elsif(size == 1)
      debug "Adding callback class (#{block_or_class}) under supervision. Name: #{callback_name(name)}"
      callback_supervisor.supervise_as callback_name(name), block_or_class, name, current_actor
    else
      debug "Adding callback class (#{block_or_class}) under supervision pool (#{size} workers). Name: #{callback_name(name)}"
      callback_supervisor.pool block_or_class, as: callback_name(name), size: size, args: [name, current_actor]
    end
  else
    debug "Adding custom callback class  from block (#{block_or_class}) under supervision. Name: #{callback_name(name)}"
    callback_supervisor.supervise_as callback_name(name), Callback, name, current_actor, block_or_class
  end
  callbacks.push(name).uniq!
  self
end

- (TrueClass, FalseClass) auto_confirm?

Returns automatic message confirmation enabled

Returns:

  • (TrueClass, FalseClass)

    automatic message confirmation enabled



178
179
180
# File 'lib/carnivore/source.rb', line 178

def auto_confirm?
  @auto_confirm
end

- (Carnivore::Callback, NilClass) callback_name(name)

Returns namespaced name (prefixed with source name and instance id)

Parameters:

  • name (String, Symbol)

    name of callback

Returns:



275
276
277
278
279
280
# File 'lib/carnivore/source.rb', line 275

def callback_name(name)
  unless(@callback_names[name])
    @callback_names[name] = [@name, self.object_id, name].join(':').to_sym
  end
  @callback_names[name]
end

- (Object) confirm(message)

Confirm receipt of the message on source

Parameters:



227
228
229
# File 'lib/carnivore/source.rb', line 227

def confirm(message)
  debug 'No custom confirm declared'
end

- (Object) connect

Connection hook for sources requiring customized connect

Parameters:

  • args (Hash)

    initialization hash



202
203
204
# File 'lib/carnivore/source.rb', line 202

def connect
  debug 'No custom connect declared'
end

- (Carnivore::Message) format(msg)

Create new Message from received payload

Parameters:

  • msg (Object)

    received payload

Returns:



286
287
288
289
290
291
292
293
294
295
296
# File 'lib/carnivore/source.rb', line 286

def format(msg)
  actor = Carnivore::Supervisor.supervisor[name]
  if(actor)
    Message.new(
      :message => msg,
      :source => actor.current_actor
    )
  else
    abort "Failed to locate self in registry (#{name})"
  end
end

- (MessageRegistry) init_registry

Load and initialize the message registry

Returns:

  • (MessageRegistry)

    new registry



415
416
417
418
# File 'lib/carnivore/source.rb', line 415

def init_registry
  require 'carnivore/message_registry'
  @message_registry = MessageRegistry.new
end

- (String) inspect

Returns inspection formatted string

Returns:

  • (String)

    inspection formatted string



183
184
185
# File 'lib/carnivore/source.rb', line 183

def inspect
  "<#{self.class.name}:#{object_id} @name=#{name} @callbacks=#{Hash[*callbacks.map{|k,v| [k,v.object_id]}.flatten]}>"
end

- (NilClass) log(*args) Originally defined in module Utils::Logging

Log message

Parameters:

  • args (Object)

    argument list

Returns:

  • (NilClass)

- (TrueClass, FalseClass) loop_enabled?

Local message loopback is enabled. Custom sources should override this method to allow loopback delivery if desired

Returns:

  • (TrueClass, FalseClass)


408
409
410
# File 'lib/carnivore/source.rb', line 408

def loop_enabled?
  false
end

- (Carnivore::Message, NilClass) loop_receive(*args)

Get received message on local loopback

Parameters:

  • args (Object)

    argument list (unused)

Returns:



375
376
377
# File 'lib/carnivore/source.rb', line 375

def loop_receive(*args)
  message_loop.shift
end

- (TrueClass) loop_transmit(message, original_message = nil, args = {})

Push message onto internal loop queue

Parameters:

  • message (Carnivore::Message)
  • original_message (Object) (defaults to: nil)

    unused

  • args (Hash) (defaults to: {})

    unused

Returns:

  • (TrueClass)


385
386
387
388
389
# File 'lib/carnivore/source.rb', line 385

def loop_transmit(message, original_message=nil, args={})
  message_loop.push message
  signal(:messages_available)
  true
end

- (TrueClass) process(*args)

Process incoming messages from this source

Parameters:

  • args (Object)

    list of arguments

Returns:

  • (TrueClass)


321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/carnivore/source.rb', line 321

def process(*args)
  begin
    while(run_process && !callbacks.empty?)
      @processing = true
      async.receive_messages
      if(message_loop.empty? && message_remote.empty?)
        wait(:messages_available)
      end
      msgs = []
      msgs.push message_loop.pop unless message_loop.empty?
      msgs.push message_remote.pop unless message_remote.empty?
      msgs = [msgs].flatten.compact.map do |m|
        if(valid_message?(m))
          format(m)
        end
      end.compact
      msgs.each do |msg|
        if(respond_to?(:orphan_callback))
          valid_callbacks = callbacks.find_all do |name|
            callback_supervisor[callback_name(name)].valid?(msg)
          end
        else
          valid_callbacks = callbacks
        end
        valid_callbacks.each do |name|
          debug "Dispatching message<#{msg[:message].object_id}> to callback<#{name} (#{callback_name(name)})>"
          callback_supervisor[callback_name(name)].async.call(msg)
        end
        if(valid_callbacks.empty?)
          warn "Received message was not processed through any callbacks on this source: #{msg}"
          orphan_callback(current_actor, msg) if respond_to?(:orphan_callback)
        end
      end
    end
  ensure
    @processing = false
  end
  true
end

- (Object+) receive(n = 1)

This method is abstract.

Receive messages from source

Parameters:

  • n (Integer) (defaults to: 1)

    number of messages

Returns:

  • (Object, Array<Object>)

    payload or array of payloads

Raises:

  • (NotImplementedError)


211
212
213
# File 'lib/carnivore/source.rb', line 211

def receive(n=1)
  raise NotImplementedError.new('Abstract method not valid for runtime')
end

- (TrueClass) receive_messages

Receive messages from source

Returns:

  • (TrueClass)


363
364
365
366
367
368
369
# File 'lib/carnivore/source.rb', line 363

def receive_messages
  loop do
    message_remote.push receive
    signal(:messages_available)
  end
  true
end

- (self) remove_callback(name)

Remove the named callback from the source

Parameters:

  • name (String, Symbol)

Returns:

  • (self)


262
263
264
265
266
267
268
269
# File 'lib/carnivore/source.rb', line 262

def remove_callback(name)
  unless(@callbacks.include?(callback_name(name)))
    abort NameError.new("Failed to locate callback named: #{name}")
  end
  actors[callback_name(name)].terminate
  @callbacks.delete(name)
  self
end

- (Object) setup(args = {})

Setup hook for source requiring customized setup

Parameters:

  • args (Hash) (defaults to: {})

    initialization hash



195
196
197
# File 'lib/carnivore/source.rb', line 195

def setup(args={})
  debug 'No custom setup declared'
end

- (Object) teardown_cleanup

Ensure we cleanup our internal supervisor before bailing out



172
173
174
175
# File 'lib/carnivore/source.rb', line 172

def teardown_cleanup
  warn 'Termination request received. Tearing down!'
  callback_supervisor.terminate
end

- (String) to_s

Returns stringified instance

Returns:

  • (String)

    stringified instance



188
189
190
# File 'lib/carnivore/source.rb', line 188

def to_s
  "<#{self.class.name}:#{object_id} @name=#{name}>"
end

- (Object) transmit(message, original_message = nil, args = {})

Send payload to source

Parameters:

  • message (Object)

    payload

  • original_message (Carnviore::Message) (defaults to: nil)

    original message if reply to extract optional metadata

  • args (Hash) (defaults to: {})

    optional extra arguments

Raises:

  • (NotImplemented)


220
221
222
# File 'lib/carnivore/source.rb', line 220

def transmit(message, original_message=nil, args={})
  raise NotImplemented.new('Abstract method not valid for runtime')
end

- (TrueClass, FalseClass) valid_message?(m)

Validate message is allowed before processing. This is currently only used when the message registry is enabled to prevent duplicate message processing.

Parameters:

Returns:

  • (TrueClass, FalseClass)


304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/carnivore/source.rb', line 304

def valid_message?(m)
  if(message_registry)
    if(message_registry.valid?(m))
      true
    else
      warn "Message was already received. Discarding: #{m.inspect}"
      false
    end
  else
    true
  end
end