class Mongo::Cursor

A cursor over query results. Returned objects are hashes.

Attributes

collection[R]
cursor_id[R]
fields[R]
full_collection_name[R]
hint[R]
options[R]
order[R]
selector[R]
show_disk_loc[R]
snapshot[R]
timeout[R]
transformer[R]

Public Class Methods

new(collection, opts={}) click to toggle source

Create a new cursor.

Note: cursors are created when executing queries using [Collection#find] and other similar methods. Application developers shouldn't have to create cursors manually.

@return [Cursor]

@core cursors constructor_details

# File lib/mongo/cursor.rb, line 38
def initialize(collection, opts={})
  @cursor_id  = nil

  @db         = collection.db
  @collection = collection
  @connection = @db.connection
  @logger     = @connection.logger

  # Query selector
  @selector   = opts[:selector] || {}

  # Special operators that form part of $query
  @order      = opts[:order]
  @explain    = opts[:explain]
  @hint       = opts[:hint]
  @snapshot   = opts[:snapshot]
  @max_scan   = opts.fetch(:max_scan, nil)
  @return_key = opts.fetch(:return_key, nil)
  @show_disk_loc = opts.fetch(:show_disk_loc, nil)

  # Wire-protocol settings
  @fields     = convert_fields_for_query(opts[:fields])
  @skip       = opts[:skip]     || 0
  @limit      = opts[:limit]    || 0
  @tailable   = opts[:tailable] || false
  @timeout    = opts.fetch(:timeout, true)
  @options    = 0

  # Use this socket for the query
  @socket     = opts[:socket]

  @closed       = false
  @query_run    = false

  @transformer = opts[:transformer]
  if value = opts[:read]
    Mongo::Support.validate_read_preference(value)
  else
    value = collection.read_preference
  end
  @read_preference = value.is_a?(Hash) ? value.dup : value
  batch_size(opts[:batch_size] || 0)

  @full_collection_name = "#{@collection.db.name}.#{@collection.name}"
  @cache        = []
  @returned     = 0

  if(!@timeout)
    add_option(OP_QUERY_NO_CURSOR_TIMEOUT)
  end
  if(@read_preference != :primary)
    add_option(OP_QUERY_SLAVE_OK)
  end
  if(@tailable)
    add_option(OP_QUERY_TAILABLE)
  end

  if @collection.name =~ /^\$cmd/ || @collection.name =~ /^system/
    @command = true
  else
    @command = false
  end

  @checkin_read_pool = false
  @checkin_connection = false
  @read_pool = nil
end

Public Instance Methods

add_option(opt) click to toggle source

Add an option to the query options bitfield.

@param opt a valid query option

@raise InvalidOperation if this method is run after the cursor has bee

iterated for the first time.

@return [Integer] the current value of the options bitfield for this cursor.

@see www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY

# File lib/mongo/cursor.rb, line 376
def add_option(opt)
  check_modifiable

  @options |= opt
  @options
end
alive?() click to toggle source

Guess whether the cursor is alive on the server.

Note that this method only checks whether we have a cursor id. The cursor may still have timed out on the server. This will be indicated in the next call to #next.

@return [Boolean]

# File lib/mongo/cursor.rb, line 114
def alive?
  @cursor_id && @cursor_id != 0
end
batch_size(size=nil) click to toggle source

Set the batch size for server responses.

Note that the batch size will take effect only on queries where the number to be returned is greater than 100.

This can not override MongoDB's limit on the amount of data it will return to the client. Depending on server version this can be 4-16mb.

@param [Integer] size either 0 or some integer greater than 1. If 0,

the server will determine the batch size.

@return [Cursor]

# File lib/mongo/cursor.rb, line 266
def batch_size(size=nil)
  return @batch_size unless size
  check_modifiable
  if size < 0 || size == 1
    raise ArgumentError, "Invalid value for batch_size #{size}; must be 0 or > 1."
  else
    @batch_size = @limit != 0 && size > @limit ? @limit : size
  end

  self
end
close() click to toggle source

Close the cursor.

Note: if a cursor is read until exhausted (read until Mongo::Constants::OP_QUERY or Mongo::Constants::OP_GETMORE returns zero for the cursor id), there is no need to close it manually.

Note also: Mongo::Collection#find takes an optional block argument which can be used to ensure that your cursors get closed.

@return [True]

# File lib/mongo/cursor.rb, line 335
def close
  if @cursor_id && @cursor_id != 0
    message = BSON::ByteBuffer.new([0, 0, 0, 0])
    message.put_int(1)
    message.put_long(@cursor_id)
    log(:debug, "Cursor#close #{@cursor_id}")
    @connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, :connection => :reader)
  end
  @cursor_id = 0
  @closed    = true
end
closed?() click to toggle source

Is this cursor closed?

@return [Boolean]

# File lib/mongo/cursor.rb, line 350
def closed?
  @closed
end
count(skip_and_limit = false) click to toggle source

Get the size of the result set for this query.

@param [Boolean] whether of not to take notice of skip and limit

@return [Integer] the number of objects in the result set for this query.

@raise [OperationFailure] on a database error.

# File lib/mongo/cursor.rb, line 180
def count(skip_and_limit = false)
  command = BSON::OrderedHash["count",  @collection.name, "query",  @selector]

  if skip_and_limit
    command.merge!(BSON::OrderedHash["limit", @limit]) if @limit != 0
    command.merge!(BSON::OrderedHash["skip", @skip]) if @skip != 0
  end

  command.merge!(BSON::OrderedHash["fields", @fields])

  response = @db.command(command)
  return response['n'].to_i if Mongo::Support.ok?(response)
  return 0 if response['errmsg'] == "ns missing"
  raise OperationFailure.new("Count failed: #{response['errmsg']}", response['code'], response)
end
each() { |doc| ... } click to toggle source

Iterate over each document in this cursor, yielding it to the given block.

Iterating over an entire cursor will close it.

@yield passes each document to a block for processing.

@example if 'comments' represents a collection of comments:

comments.find.each do |doc|
  puts doc['user']
end
# File lib/mongo/cursor.rb, line 289
def each
  while doc = self.next
    yield doc
  end
end
explain() click to toggle source

Get the explain plan for this cursor.

@return [Hash] a document containing the explain plan for this cursor.

@core explain explain-instance_method

# File lib/mongo/cursor.rb, line 316
def explain
  c = Cursor.new(@collection,
    query_options_hash.merge(:limit => -@limit.abs, :explain => true))
  explanation = c.next_document
  c.close

  explanation
end
has_next?() click to toggle source

Determine whether this cursor has any remaining results.

@return [Boolean]

# File lib/mongo/cursor.rb, line 169
def has_next?
  num_remaining > 0
end
inspect() click to toggle source

Clean output for inspect.

# File lib/mongo/cursor.rb, line 418
def inspect
  "<Mongo::Cursor:0x#{object_id.to_s(16)} namespace='#{@db.name}.#{@collection.name}' " +
    "@selector=#{@selector.inspect} @cursor_id=#{@cursor_id}>"
end
limit(number_to_return=nil) click to toggle source

Limit the number of results to be returned by this cursor.

This method overrides any limit specified in the Mongo::Collection#find method, and only the last limit applied has an effect.

@return [Integer] the current number_to_return if no parameter is given.

@raise [InvalidOperation] if this cursor has already been used.

@core limit limit-instance_method

# File lib/mongo/cursor.rb, line 229
def limit(number_to_return=nil)
  return @limit unless number_to_return
  check_modifiable

  @limit = number_to_return
  self
end
next() click to toggle source

Get the next document specified the cursor options.

@return [Hash, Nil] the next document or Nil if no documents remain.

# File lib/mongo/cursor.rb, line 121
def next
  if @cache.length == 0
    if @query_run && (@options & OP_QUERY_EXHAUST != 0)
      close
      return nil
    else
      refresh
    end
  end
  doc = @cache.shift

  if doc && doc['$err']
    err = doc['$err']

    # If the server has stopped being the master (e.g., it's one of a
    # pair but it has died or something like that) then we close that
    # connection. The next request will re-open on master server.
    if err.include?("not master")
      @connection.close
      raise ConnectionFailure.new(err, doc['code'], doc)
    end

    raise OperationFailure.new(err, doc['code'], doc)
  end

  if @transformer.nil?
    doc
  else
    @transformer.call(doc) if doc
  end
end
Also aliased as: next_document
next_document()
Alias for: next
query_options_hash() click to toggle source

Get the query options for this Cursor.

@return [Hash]

# File lib/mongo/cursor.rb, line 403
def query_options_hash
  { :selector => @selector,
    :fields   => @fields,
    :skip     => @skip,
    :limit    => @limit,
    :order    => @order,
    :hint     => @hint,
    :snapshot => @snapshot,
    :timeout  => @timeout,
    :max_scan => @max_scan,
    :return_key => @return_key,
    :show_disk_loc => @show_disk_loc }
end
query_opts() click to toggle source

Returns an integer indicating which query options have been selected.

@return [Integer]

@see www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY The MongoDB wire protocol.

# File lib/mongo/cursor.rb, line 360
def query_opts
  warn "The method Cursor#query_opts has been deprecated " +
    "and will removed in v2.0. Use Cursor#options instead."
  @options
end
remove_option(opt) click to toggle source

Remove an option from the query options bitfield.

@param opt a valid query option

@raise InvalidOperation if this method is run after the cursor has bee

iterated for the first time.

@return [Integer] the current value of the options bitfield for this cursor.

@see www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY

# File lib/mongo/cursor.rb, line 393
def remove_option(opt)
  check_modifiable

  @options &= ~opt
  @options
end
rewind!() click to toggle source

Reset this cursor on the server. Cursor options, such as the query string and the values for skip and limit, are preserved.

# File lib/mongo/cursor.rb, line 156
def rewind!
  close
  @cache.clear
  @cursor_id  = nil
  @closed     = false
  @query_run  = false
  @n_received = nil
  true
end
skip(number_to_skip=nil) click to toggle source

Skips the first number_to_skip results of this cursor. Returns the current number_to_skip if no parameter is given.

This method overrides any skip specified in the Mongo::Collection#find method, and only the last skip applied has an effect.

@return [Integer]

@raise [InvalidOperation] if this cursor has already been used.

# File lib/mongo/cursor.rb, line 246
def skip(number_to_skip=nil)
  return @skip unless number_to_skip
  check_modifiable

  @skip = number_to_skip
  self
end
sort(order, direction=nil) click to toggle source

Sort this cursor's results.

This method overrides any sort order specified in the Mongo::Collection#find method, and only the last sort applied has an effect.

@param [Symbol, Array] key_or_list either 1) a key to sort by 2)

an array of [key, direction] pairs to sort by or 3) a hash of
field => direction pairs to sort by. Direction should be specified as 
Mongo::ASCENDING (or :ascending / :asc) or Mongo::DESCENDING
(or :descending / :desc)

@raise [InvalidOperation] if this cursor has already been used.

@raise [InvalidSortValueError] if the specified order is invalid.

# File lib/mongo/cursor.rb, line 210
def sort(order, direction=nil)
  check_modifiable

  order = [[order, direction]] unless direction.nil?

  @order = order
  self
end
to_a() click to toggle source

Receive all the documents from this cursor as an array of hashes.

Notes:

If you've already started iterating over the cursor, the array returned by this method contains only the remaining documents. See #rewind! if you need to reset the cursor.

Use of this method is discouraged - in most cases, it's much more efficient to retrieve documents as you need them by iterating over the cursor.

@return [Array] an array of documents.

Calls superclass method
# File lib/mongo/cursor.rb, line 307
def to_a
  super
end

Private Instance Methods

check_modifiable() click to toggle source
# File lib/mongo/cursor.rb, line 644
def check_modifiable
  if @query_run || @closed
    raise InvalidOperation, "Cannot modify the query once it has been run or closed."
  end
end
checkin_socket(sock) click to toggle source
# File lib/mongo/cursor.rb, line 569
def checkin_socket(sock)
  if @checkin_read_pool
    @read_pool.checkin(sock)
    @checkin_read_pool = false
  elsif @checkin_connection
    if @command || @read_preference == :primary
      @connection.checkin_writer(sock)
    else
      @connection.checkin_reader(sock)
    end
    @checkin_connection = false
  end
end
checkout_socket_for_op_get_more() click to toggle source
# File lib/mongo/cursor.rb, line 539
def checkout_socket_for_op_get_more
  if @read_pool && (@read_pool != @connection.read_pool)
    checkout_socket_from_read_pool
  else
    checkout_socket_from_connection
  end
end
checkout_socket_from_connection() click to toggle source
# File lib/mongo/cursor.rb, line 519
def checkout_socket_from_connection
  socket = nil
  begin
    @checkin_connection = true
    if @command || @read_preference == :primary
      socket = @connection.checkout_writer
    elsif @read_preference == :secondary_only
      socket = @connection.checkout_secondary
    else
      @read_pool = @connection.read_pool
      socket = @connection.checkout_reader
    end
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    @connection.close
    raise ex
  end

  socket
end
checkout_socket_from_read_pool() click to toggle source
# File lib/mongo/cursor.rb, line 547
def checkout_socket_from_read_pool
  new_pool = @connection.secondary_pools.detect do |pool|
    pool.host == @read_pool.host && pool.port == @read_pool.port
  end
  if new_pool
    sock = nil
    begin
      @read_pool = new_pool
      sock = new_pool.checkout
      @checkin_read_pool = true
    rescue SystemStackError, NoMemoryError, SystemCallError => ex
      @connection.close
      raise ex
    end
    return sock
  else
    raise Mongo::OperationFailure, "Failure to continue iterating " +
      "cursor because the the replica set member persisting this " +
      "cursor at #{@read_pool.host_string} cannot be found."
  end
end
close_cursor_if_query_complete() click to toggle source
# File lib/mongo/cursor.rb, line 638
def close_cursor_if_query_complete
  if @limit > 0 && @returned >= @limit
    close
  end
end
construct_query_message() click to toggle source
# File lib/mongo/cursor.rb, line 597
def construct_query_message
  message = BSON::ByteBuffer.new
  message.put_int(@options)
  BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}")
  message.put_int(@skip)
  message.put_int(@limit)
  spec = query_contains_special_fields? ? construct_query_spec : @selector
  message.put_binary(BSON::BSON_CODER.serialize(spec, false).to_s)
  message.put_binary(BSON::BSON_CODER.serialize(@fields, false).to_s) if @fields
  message
end
construct_query_spec() click to toggle source
# File lib/mongo/cursor.rb, line 618
def construct_query_spec
  return @selector if @selector.has_key?('$query')
  spec = BSON::OrderedHash.new
  spec['$query']    = @selector
  spec['$orderby']  = Mongo::Support.format_order_clause(@order) if @order
  spec['$hint']     = @hint if @hint && @hint.length > 0
  spec['$explain']  = true if @explain
  spec['$snapshot'] = true if @snapshot
  spec['$maxScan']  = @max_scan if @max_scan
  spec['$returnKey']   = true if @return_key
  spec['$showDiskLoc'] = true if @show_disk_loc
  spec
end
convert_fields_for_query(fields) click to toggle source

Convert the :fields parameter from a single field name or an array of fields names to a hash, with the field names for keys and '1' for each value.

# File lib/mongo/cursor.rb, line 428
def convert_fields_for_query(fields)
  case fields
    when String, Symbol
      {fields => 1}
    when Array
      return nil if fields.length.zero?
      fields.each_with_object({}) { |field, hash| hash[field] = 1 }
    when Hash
      return fields
  end
end
force_checkin_socket(sock) click to toggle source
# File lib/mongo/cursor.rb, line 583
def force_checkin_socket(sock)
  if @checkin_read_pool
    @read_pool.checkin(sock)
    @checkin_read_pool = false
  else
    if @command || @read_preference == :primary
      @connection.checkin_writer(sock)
    else
      @connection.checkin_reader(sock)
    end
    @checkin_connection = false
  end
end
instrument_payload() click to toggle source
# File lib/mongo/cursor.rb, line 609
def instrument_payload
  log = { :database => @db.name, :collection => @collection.name, :selector => selector }
  log[:fields] = @fields if @fields
  log[:skip]   = @skip   if @skip && (@skip != 0)
  log[:limit]  = @limit  if @limit && (@limit != 0)
  log[:order]  = @order  if @order
  log
end
num_remaining() click to toggle source

Return the number of documents remaining for this cursor.

# File lib/mongo/cursor.rb, line 441
def num_remaining
  if @cache.length == 0
    if @query_run && (@options & OP_QUERY_EXHAUST != 0)
      close
      return 0
    else
      refresh
    end
  end

  @cache.length
end
query_contains_special_fields?() click to toggle source

Returns true if the query contains order, explain, hint, or snapshot.

# File lib/mongo/cursor.rb, line 633
def query_contains_special_fields?
  @order || @explain || @hint || @snapshot || @show_disk_loc ||
    @max_scan || @return_key
end
refresh() click to toggle source

Refresh the documents in @cache. This means either sending the initial query or sending a GET_MORE operation.

# File lib/mongo/cursor.rb, line 456
def refresh
  if !@query_run
    send_initial_query
  elsif !@cursor_id.zero?
    send_get_more
  end
end
send_get_more() click to toggle source
# File lib/mongo/cursor.rb, line 484
def send_get_more
  message = BSON::ByteBuffer.new([0, 0, 0, 0])

  # DB name.
  BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}")

  # Number of results to return.
  if @limit > 0
    limit = @limit - @returned
    if @batch_size > 0
      limit = limit < @batch_size ? limit : @batch_size
    end
    message.put_int(limit)
  else
    message.put_int(@batch_size)
  end

  # Cursor id.
  message.put_long(@cursor_id)
  log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger
  sock = @socket || checkout_socket_for_op_get_more

  begin
  results, @n_received, @cursor_id = @connection.receive_message(
    Mongo::Constants::OP_GET_MORE, message, nil, sock, @command, nil)
  rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
    force_checkin_socket(sock)
    raise ex
  end
  checkin_socket(sock) unless @socket
  @returned += @n_received
  @cache += results
  close_cursor_if_query_complete
end
send_initial_query() click to toggle source
# File lib/mongo/cursor.rb, line 464
def send_initial_query
  message = construct_query_message
  sock    = @socket || checkout_socket_from_connection
  instrument(:find, instrument_payload) do
    begin
    results, @n_received, @cursor_id = @connection.receive_message(
      Mongo::Constants::OP_QUERY, message, nil, sock, @command,
      nil, @options & OP_QUERY_EXHAUST != 0)
    rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
      force_checkin_socket(sock) unless @socket
      raise ex
    end
    checkin_socket(sock) unless @socket
    @returned += @n_received
    @cache += results
    @query_run = true
    close_cursor_if_query_complete
  end
end