module Mongo::Operation::SessionsSupported

Shared behavior of operations that support a session.

@since 2.5.2

Constants

READ_COMMANDS
ZERO_TIMESTAMP

Private Instance Methods

add_write_concern!(sel) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 90
def add_write_concern!(sel)
  sel[:writeConcern] = write_concern.options if write_concern
end
apply_autocommit!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 94
def apply_autocommit!(selector)
  session.add_autocommit!(selector)
end
apply_causal_consistency!(selector, server) click to toggle source

Adds causal consistency document to the selector, if one can be constructed and the selector is for a startTransaction command.

When operations are performed in a transaction, only the first operation (the one which starts the transaction via startTransaction) is allowed to have a read concern, and with it the causal consistency document, specified.

# File lib/mongo/operation/shared/sessions_supported.rb, line 48
def apply_causal_consistency!(selector, server)
  return unless selector[:startTransaction]

  apply_causal_consistency_if_possible(selector, server)
end
apply_causal_consistency_if_possible(selector, server) click to toggle source

Adds causal consistency document to the selector, if one can be constructed.

In order for the causal consistency document to be constructed, causal consistency must be enabled for the session and the session must have the current operation time. Also, topology must be replica set or sharded cluster.

# File lib/mongo/operation/shared/sessions_supported.rb, line 61
def apply_causal_consistency_if_possible(selector, server)
  if !server.standalone?
    cc_doc = session.send(:causal_consistency_doc)
    if cc_doc
      selector[:readConcern] = (selector[:readConcern] || read_concern || {}).merge(cc_doc)
    end
  end
end
apply_cluster_time!(selector, server) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 74
def apply_cluster_time!(selector, server)
  if !server.standalone?
    cluster_time = [server.cluster_time, (session && session.cluster_time)].max_by do |doc|
      (doc && doc[Cluster::CLUSTER_TIME]) || ZERO_TIMESTAMP
    end

    if cluster_time && (cluster_time[Cluster::CLUSTER_TIME] > ZERO_TIMESTAMP)
      selector[CLUSTER_TIME] = cluster_time
    end
  end
end
apply_read_pref!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 110
def apply_read_pref!(selector)
  session.apply_read_pref!(selector) if read_command?(selector)
end
apply_session_id!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 98
def apply_session_id!(selector)
  session.add_id!(selector)
end
apply_start_transaction!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 102
def apply_start_transaction!(selector)
  session.add_start_transaction!(selector)
end
apply_txn_num!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 106
def apply_txn_num!(selector)
  session.add_txn_num!(selector)
end
apply_txn_opts!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 114
def apply_txn_opts!(selector)
  session.add_txn_opts!(selector, read_command?(selector))
end
command(server) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 130
def command(server)
  sel = selector(server).dup
  add_write_concern!(sel)
  sel[Protocol::Msg::DATABASE_IDENTIFIER] = db_name
  sel['$readPreference'] = read.to_doc if read

  if server.features.sessions_enabled?
    apply_cluster_time!(sel, server)
    if session && (acknowledged_write? || session.in_transaction?)
      sel[:txnNumber] = BSON::Int64.new(txn_num) if txn_num
      apply_session_id!(sel)
      apply_start_transaction!(sel)
      apply_causal_consistency!(sel, server)
      apply_autocommit!(sel)
      apply_txn_opts!(sel)
      suppress_read_write_concern!(sel)
      validate_read_preference!(sel)
      update_session_state!
      apply_txn_num!(sel)
    end
  elsif session && session.explicit?
    apply_cluster_time!(sel, server)
    sel[:txnNumber] = BSON::Int64.new(txn_num) if txn_num
    apply_session_id!(sel)
    apply_start_transaction!(sel)
    apply_causal_consistency!(sel, server)
    apply_autocommit!(sel)
    apply_txn_opts!(sel)
    suppress_read_write_concern!(sel)
    validate_read_preference!(sel)
    update_session_state!
    apply_txn_num!(sel)
  end

  sel
end
flags() click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 70
def flags
  acknowledged_write? ? [] : [:more_to_come]
end
read_command?(sel) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 86
def read_command?(sel)
  READ_COMMANDS.any? { |c| sel[c] }
end
suppress_read_write_concern!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 118
def suppress_read_write_concern!(selector)
  session.suppress_read_write_concern!(selector)
end
update_session_state!() click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 126
def update_session_state!
  session.update_state!
end
validate_read_preference!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 122
def validate_read_preference!(selector)
  session.validate_read_preference!(selector) if read_command?(selector)
end