public class UNICAST2 extends Protocol implements AgeOutCache.Handler<Address>
UNICAST
is that it doesn't send acks for every
message. Instead, it sends 'acks' after receiving max_bytes and/ or periodically (stable_interval).Modifier and Type | Class and Description |
---|---|
protected class |
UNICAST2.ConnectionReaper |
protected class |
UNICAST2.ReceiverEntry |
protected class |
UNICAST2.RetransmitTask
Retransmitter task which periodically (every xmit_interval ms) looks at all the retransmit tables and
sends retransmit request to all members from which we have missing messages
|
protected class |
UNICAST2.SenderEntry |
static class |
UNICAST2.Unicast2Header
The following types and fields are serialized:
|
Modifier and Type | Field and Description |
---|---|
protected AgeOutCache<Address> |
cache |
protected long |
conn_expiry_timeout |
protected Future<?> |
connection_reaper |
static long |
DEFAULT_FIRST_SEQNO |
protected int |
exponential_backoff
Deprecated.
|
protected short |
last_conn_id |
protected Address |
local_addr |
protected boolean |
log_not_found_msgs |
protected long |
max_bytes |
protected int |
max_msg_batch_size |
protected long |
max_retransmit_time |
protected int |
max_stable_msgs |
protected List<Address> |
members |
protected int |
num_messages_received |
protected int |
num_messages_sent |
protected ConcurrentMap<Address,UNICAST2.ReceiverEntry> |
recv_table |
protected ReentrantLock |
recv_table_lock |
protected boolean |
running |
protected ConcurrentMap<Address,UNICAST2.SenderEntry> |
send_table |
protected long |
stable_interval |
protected Future<?> |
stable_task_future |
protected int[] |
timeout
Deprecated.
|
protected TimeScheduler |
timer |
protected boolean |
use_range_based_retransmitter |
protected long |
xmit_interval |
protected AtomicLong |
xmit_reqs_received |
protected AtomicLong |
xmit_reqs_sent |
protected AtomicLong |
xmit_rsps_sent |
protected boolean |
xmit_table_automatic_purging
Deprecated.
|
protected long |
xmit_table_max_compaction_time |
protected int |
xmit_table_msgs_per_row |
protected int |
xmit_table_num_rows |
protected double |
xmit_table_resize_factor |
protected Future<?> |
xmit_task
RetransmitTask running every xmit_interval ms
|
protected Map<Address,Long> |
xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.jboss.org/browse/JGRP-1539)
|
Constructor and Description |
---|
UNICAST2() |
Modifier and Type | Method and Description |
---|---|
boolean |
connectionEstablished(Address target)
Is the send connection to target established
|
Object |
down(Event evt)
An event is to be sent down the stack.
|
void |
expired(Address key)
Called by AgeOutCache, to removed expired connections
|
AgeOutCache<Address> |
getAgeOutCache() |
int |
getAgeOutCacheSize() |
String |
getLocalAddress() |
long |
getMaxRetransmitTime() |
String |
getMembers() |
protected short |
getNewConnectionId() |
int |
getNumConnections() |
int |
getNumReceiveConnections() |
int |
getNumSendConnections() |
protected UNICAST2.ReceiverEntry |
getOrCreateReceiverEntry(Address sender,
long seqno,
short conn_id) |
protected UNICAST2.ReceiverEntry |
getReceiverEntry(Address sender,
long seqno,
boolean first,
short conn_id) |
int[] |
getTimeout()
Deprecated.
|
TimeScheduler |
getTimer() |
long |
getXmitTableMissingMessages() |
int |
getXmitTableNumCompactions() |
int |
getXmitTableNumMoves() |
int |
getXmitTableNumPurges() |
int |
getXmitTableNumResizes() |
long |
getXmitTableUndeliveredMessages() |
protected void |
handleBatchReceived(Address sender,
Map<Short,List<Message>> map) |
protected boolean |
handleDataReceived(Address sender,
long seqno,
short conn_id,
boolean first,
Message msg,
Event evt)
Check whether the hashmap contains an entry e for
sender (create if not). |
protected void |
handleResendingOfFirstMessage(Address sender,
long seqno)
We need to resend our first message with our conn_id
|
protected void |
handleXmitRequest(Address sender,
SeqnoList missing) |
boolean |
hasSendConnectionTo(Address dest)
Used for testing only
|
void |
init()
Called after instance has been created (null constructor) and before protocol is started.
|
boolean |
isConnectionReaperRunning() |
boolean |
isXmitTaskRunning() |
String |
printAgeOutCache() |
String |
printConnections() |
protected String |
printMessageList(List<Message> list) |
String |
printReceiveWindowMessages() |
String |
printSendWindowMessages() |
void |
reapIdleConnections() |
void |
removeAllConnections()
This method is public only so it can be invoked by unit testing, but should not otherwise be used !
|
protected void |
removeAndPassUp(Table<Message> win,
Address sender)
Try to remove as many messages as possible and pass them up.
|
void |
removeConnection(Address mbr)
Removes and resets from connection table (which is already locked).
|
void |
removeReceiveConnection(Address mbr) |
void |
removeSendConnection(Address mbr) |
void |
resetStats() |
void |
retransmit(SeqnoList missing,
Address sender) |
protected void |
sendAck(Address dest,
long seqno,
short conn_id) |
protected void |
sendRequestForFirstSeqno(Address dest,
long seqno_received) |
protected void |
sendStableMessage(Address dest,
short conn_id,
long hd,
long hr) |
void |
sendStableMessages() |
void |
setMaxMessageBatchSize(int size) |
void |
setMaxRetransmitTime(long max_retransmit_time) |
void |
setTimeout(int[] val)
Deprecated.
|
void |
setTimer(TimeScheduler timer)
Only used for unit tests, don't use !
|
protected void |
stable(Address sender,
short conn_id,
long hd,
long hr)
Purge all messages in window for local_addr, which are <= low.
|
void |
start()
This method is called on a
Channel.connect(String) . |
protected void |
startConnectionReaper() |
protected void |
startRetransmitTask() |
protected void |
startStableTask() |
void |
stop()
This method is called on a
Channel.disconnect() . |
protected void |
stopConnectionReaper() |
protected void |
stopRetransmitTask() |
protected void |
stopStableTask() |
void |
triggerXmit() |
Object |
up(Event evt)
An event was received from the layer below.
|
void |
up(MessageBatch batch)
Sends up a multiple messages in a
MessageBatch . |
accept, destroy, dumpStats, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, printStats, providedDownServices, providedUpServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, setValues, statsEnabled
public static final long DEFAULT_FIRST_SEQNO
@Deprecated protected int[] timeout
@Deprecated protected int exponential_backoff
protected int max_msg_batch_size
protected long max_bytes
protected long stable_interval
protected int max_stable_msgs
protected int xmit_table_num_rows
protected int xmit_table_msgs_per_row
protected double xmit_table_resize_factor
protected long xmit_table_max_compaction_time
@Deprecated protected boolean xmit_table_automatic_purging
protected boolean use_range_based_retransmitter
protected boolean log_not_found_msgs
protected long conn_expiry_timeout
protected long max_retransmit_time
protected long xmit_interval
protected int num_messages_sent
protected int num_messages_received
protected final ConcurrentMap<Address,UNICAST2.SenderEntry> send_table
protected final ConcurrentMap<Address,UNICAST2.ReceiverEntry> recv_table
protected Future<?> xmit_task
protected final Map<Address,Long> xmit_task_map
protected final ReentrantLock recv_table_lock
protected Address local_addr
protected TimeScheduler timer
protected volatile boolean running
protected short last_conn_id
protected AgeOutCache<Address> cache
protected Future<?> stable_task_future
protected Future<?> connection_reaper
protected final AtomicLong xmit_reqs_received
protected final AtomicLong xmit_reqs_sent
protected final AtomicLong xmit_rsps_sent
@Deprecated public int[] getTimeout()
@Deprecated public void setTimeout(int[] val)
public void setMaxMessageBatchSize(int size)
public String getLocalAddress()
public String getMembers()
public int getNumSendConnections()
public int getNumReceiveConnections()
public int getNumConnections()
public String printConnections()
public boolean connectionEstablished(Address target)
public boolean isConnectionReaperRunning()
public long getXmitTableUndeliveredMessages()
public long getXmitTableMissingMessages()
public int getXmitTableNumCompactions()
public int getXmitTableNumMoves()
public int getXmitTableNumResizes()
public int getXmitTableNumPurges()
public String printReceiveWindowMessages()
public String printSendWindowMessages()
public boolean isXmitTaskRunning()
public long getMaxRetransmitTime()
public void setMaxRetransmitTime(long max_retransmit_time)
public int getAgeOutCacheSize()
public String printAgeOutCache()
public AgeOutCache<Address> getAgeOutCache()
public boolean hasSendConnectionTo(Address dest)
public void resetStats()
resetStats
in class Protocol
public TimeScheduler getTimer()
public void setTimer(TimeScheduler timer)
timer
- public void init() throws Exception
Protocol
public void start() throws Exception
Protocol
Channel.connect(String)
. Starts work.
Protocols are connected and queues are ready to receive events.
Will be called from bottom to top. This call will replace
the START and START_OK events.start
in class Protocol
Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
to fail, so Channel.connect(String)
will throw an exceptionpublic void stop()
Protocol
Channel.disconnect()
. Stops work (e.g. by closing multicast socket).
Will be called from top to bottom. This means that at the time of the method invocation the
neighbor protocol below is still working. This method will replace the
STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that
when this method is called all messages in the down queue will have been flushedpublic Object up(Event evt)
Protocol
down_prot.down()
or c) the event (or another event) is sent up
the stack using up_prot.up()
.public void up(MessageBatch batch)
Protocol
MessageBatch
. The sender of the batch is always the same, and so is the
destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed
messages, although the transport itself will create initial MessageBatches that contain only either OOB or
regular messages.
The default processing below sends messages up the stack individually, based on a matching criteria
(calling Protocol.accept(org.jgroups.Message)
), and - if true - calls Protocol.up(org.jgroups.Event)
for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.
Subclasses should check if there are any messages destined for them (e.g. using
MessageBatch.getMatchingMessages(short,boolean)
), then possibly remove and process them and finally pass
the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all
encrypted messages in the batch, not remove them, and pass the batch up when done.public Object down(Event evt)
Protocol
down_prot.down()
. In case of a GET_ADDRESS event (which tries to
retrieve the stack's address from one of the bottom layers), the layer may need to send
a new response event back up the stack using up_prot.up()
.protected void stable(Address sender, short conn_id, long hd, long hr)
sender
- hd
- Highest delivered seqnohr
- Highest received seqnopublic void sendStableMessages()
protected void sendStableMessage(Address dest, short conn_id, long hd, long hr)
protected void startStableTask()
protected void stopStableTask()
protected void startConnectionReaper()
protected void stopConnectionReaper()
public void removeConnection(Address mbr)
public void removeSendConnection(Address mbr)
public void removeReceiveConnection(Address mbr)
public void removeAllConnections()
public void expired(Address key)
expired
in interface AgeOutCache.Handler<Address>
key
- protected boolean handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg, Event evt)
sender
(create if not). If
e.received_msgs is null and first
is true: create a new AckReceiverWindow(seqno) and
add message. Set e.received_msgs to the new window. Else just add the message.protected void handleBatchReceived(Address sender, Map<Short,List<Message>> map)
protected void removeAndPassUp(Table<Message> win, Address sender)
protected UNICAST2.ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id)
protected UNICAST2.ReceiverEntry getOrCreateReceiverEntry(Address sender, long seqno, short conn_id)
protected void handleResendingOfFirstMessage(Address sender, long seqno)
sender
- seqno
- Resend the non null messages in the range [lowest .. seqno]protected void startRetransmitTask()
protected void stopRetransmitTask()
protected short getNewConnectionId()
protected void sendRequestForFirstSeqno(Address dest, long seqno_received)
protected void sendAck(Address dest, long seqno, short conn_id)
public void reapIdleConnections()
public void triggerXmit()
Copyright © 2014 JBoss, a division of Red Hat. All rights reserved.