Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
false);
protected final ConfigKey<Integer> RemoteAgentNewConnectionsMonitorInterval = new ConfigKey<>("Advanced", Integer.class, "agent.connections.monitor.interval", "1800",
"Time in seconds to monitor the new agent connections and cleanup the expired connections.", false);

protected final ConfigKey<Integer> PeerLookupRetryCount = new ConfigKey<>("Advanced", Integer.class,
"agent.peer.lookup.retry.count", "1",
"Number of retries (in addition to the initial attempt) to resolve the peer management server for a host when forwarding agent commands in a management server cluster.", true);

protected final ConfigKey<Integer> PeerLookupRetryIntervalMs = new ConfigKey<>("Advanced", Integer.class,
"agent.peer.lookup.retry.interval.ms", "200",
"Sleep interval in milliseconds between peer lookup retries when forwarding agent commands in a management server cluster.", true);
protected final ConfigKey<Integer> AlertWait = new ConfigKey<>("Advanced", Integer.class, "alert.wait", "1800",
"Seconds to wait before alerting on a disconnected agent", true);
protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<>("Advanced", Integer.class, "direct.agent.load.size", "16",
Expand Down Expand Up @@ -655,7 +663,7 @@ public Answer[] send(final Long hostId, final Commands commands, int timeout) th
final Command[] cmds = checkForCommandsAndTag(commands);

//check what agent is returned.
final AgentAttache agent = getAttache(hostId);
AgentAttache agent = getAttache(hostId);
if (agent == null || agent.isClosed()) {
throw new AgentUnavailableException("agent not logged into this management server", hostId);
}
Expand All @@ -665,7 +673,41 @@ public Answer[] send(final Long hostId, final Commands commands, int timeout) th

reconcileCommandService.persistReconcileCommands(hostId, req.getSequence(), cmds);

final Answer[] answers = agent.send(req, wait);
final int retries = Math.max(0, getAgentSendRetryCount());
final int intervalMs = Math.max(0, getAgentSendRetryIntervalMs());
AgentUnavailableException last = null;
Answer[] answers = null;

for (int attempt = 0; attempt <= retries; attempt++) {
if (attempt > 0 && intervalMs > 0) {
sleepRetry(intervalMs);
}

try {
agent = resolveAttacheForRetry(hostId, agent, attempt > 0);
} catch (AgentUnavailableException e) {
last = e;
continue;
}

if (isForwardWithoutPeer(agent, hostId)) {
last = new AgentUnavailableException("Unable to find peer", hostId);
agent = null;
continue;
}

try {
answers = agent.send(req, wait);
break;
} catch (AgentUnavailableException e) {
last = e;
agent = null;
}
}

if (answers == null) {
throw (last != null) ? last : new AgentUnavailableException("agent not logged into this management server", hostId);
}

reconcileCommandService.processAnswers(req.getSequence(), cmds, answers);

Expand Down Expand Up @@ -705,7 +747,7 @@ protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableExce

@Override
public long send(final Long hostId, final Commands commands, final Listener listener) throws AgentUnavailableException {
final AgentAttache agent = getAttache(hostId);
AgentAttache agent = getAttache(hostId);
if (agent.isClosed()) {
throw new AgentUnavailableException(String.format(
"Agent [id: %d, name: %s] is closed",
Expand All @@ -717,10 +759,134 @@ public long send(final Long hostId, final Commands commands, final Listener list
final Request req = new Request(hostId, agent.getName(), _nodeId, cmds, commands.stopOnError(), true);
req.setSequence(agent.getNextSequence());

agent.send(req, listener);
final int retries = Math.max(0, getAgentSendRetryCount());
final int intervalMs = Math.max(0, getAgentSendRetryIntervalMs());
AgentUnavailableException last = null;

for (int attempt = 0; attempt <= retries; attempt++) {
if (attempt > 0 && intervalMs > 0) {
sleepRetry(intervalMs);
}

try {
agent = resolveAttacheForRetry(hostId, agent, attempt > 0);
} catch (AgentUnavailableException e) {
last = e;
continue;
}

if (isForwardWithoutPeer(agent, hostId)) {
last = new AgentUnavailableException("Unable to find peer", hostId);
agent = null;
continue;
}

try {
agent.send(req, listener);
last = null;
break;
} catch (AgentUnavailableException e) {
last = e;
agent = null;
}
}

if (last != null) {
throw last;
}
return req.getSequence();
}

protected int getAgentSendRetryCount() {
PeerLookupRetryCount.value();
}

protected int getAgentSendRetryIntervalMs() {
PeerLookupRetryIntervalMs.value();
}

protected AgentAttache resolveAttacheForRetry(final Long hostId, final AgentAttache current, final boolean forceReload)
throws AgentUnavailableException {
AgentAttache agent = current;
if (forceReload || agent == null || agent.isClosed()) {
agent = findAttache(hostId);
}
if (agent == null || agent.isClosed()) {
agent = getAttache(hostId);
}
return agent;
}

protected boolean isForwardWithoutPeer(final AgentAttache agent, final Long hostId) {
if (agent == null || hostId == null) {
return false;
}
if (!(this instanceof ClusteredAgentManagerImpl) || !agent.forForward()) {
return false;
}
final ClusteredAgentManagerImpl clusteredMgr = (ClusteredAgentManagerImpl)this;
return clusteredMgr.getPeerName(hostId) == null;
}

protected void sleepRetry(final int intervalMs) {
try {
Thread.sleep(intervalMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}

protected AgentAttache resolveAttacheForSend(final Long hostId, final AgentAttache agent) throws AgentUnavailableException {
if (hostId == null || agent == null) {
return agent;
}

// Only clustered forwarding attaches need peer resolution.
if (!(this instanceof ClusteredAgentManagerImpl) || !agent.forForward()) {
return agent;
}

final ClusteredAgentManagerImpl clusteredMgr = (ClusteredAgentManagerImpl)this;
if (clusteredMgr.getPeerName(hostId) != null) {
return agent;
}

final int retries = Math.max(0, clusteredMgr.getPeerLookupRetryCount());
final int intervalMs = Math.max(0, clusteredMgr.getPeerLookupRetryIntervalMs());
if (retries <= 0) {
return agent;
}

for (int attempt = 1; attempt <= retries; attempt++) {
if (intervalMs > 0) {
try {
Thread.sleep(intervalMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}

final AgentAttache current = findAttache(hostId);
if (current == null || current.isClosed()) {
continue;
}

// If the agent reconnected locally while we were retrying, use it.
if (!current.forForward()) {
return current;
}

// If the host ownership mapping updated to a remote MS, forwarding can proceed.
if (clusteredMgr.getPeerName(hostId) != null) {
return current;
}
}

// Preserve the original error semantics, but fail before we persist reconcile commands.
throw new AgentUnavailableException("Unable to find peer", hostId);
}

public void removeAgent(final AgentAttache attache, final Status nextState) {
if (attache == null) {
return;
Expand Down Expand Up @@ -2113,7 +2279,7 @@ public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[] { CheckTxnBeforeSending, Workers, Port, Wait, AlertWait, DirectAgentLoadSize,
DirectAgentPoolSize, DirectAgentThreadCap, EnableKVMAutoEnableDisable, ReadyCommandWait,
GranularWaitTimeForCommands, RemoteAgentSslHandshakeTimeout, RemoteAgentMaxConcurrentNewConnections,
RemoteAgentNewConnectionsMonitorInterval };
RemoteAgentNewConnectionsMonitorInterval, PeerLookupRetryCount, PeerLookupRetryIntervalMs };
}

protected class SetHostParamsListener implements Listener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public void send(final Request req, final Listener listener) throws AgentUnavail
boolean error = true;
try {
while (i++ < 5) {
String peerName = s_clusteredAgentMgr.findPeer(_id);
final String peerName = s_clusteredAgentMgr.findPeer(_id);

if (peerName == null) {
throw new AgentUnavailableException("Unable to find peer", _id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,14 @@ public String findPeer(final long hostId) {
return getPeerName(hostId);
}

protected int getPeerLookupRetryCount() {
return PeerLookupRetryCount.value();
}

protected int getPeerLookupRetryIntervalMs() {
return PeerLookupRetryIntervalMs.value();
}

public SSLEngine getSSLEngine(final String peerName) {
return _sslEngines.get(peerName);
}
Expand Down Expand Up @@ -594,6 +602,16 @@ protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableExce
return agent;
}

@Override
protected int getAgentSendRetryCount() {
return Math.max(0, getPeerLookupRetryCount());
}

@Override
protected int getAgentSendRetryIntervalMs() {
return Math.max(0, getPeerLookupRetryIntervalMs());
}

@Override
public boolean stop() {
if (_peers != null) {
Expand Down Expand Up @@ -679,13 +697,13 @@ protected void doTask(final Task task) throws TaskExecutionException {
// But we have the serialize the control commands here so we have
// to deserialize this and send it through the agent attache.
final Request req = Request.parse(data);
agent.send(req, null);
sendToAgentWithRetry(hostId, agent, req);
} else {
if (agent instanceof Routable) {
final Routable cluster = (Routable) agent;
cluster.routeToAgent(data);
} else {
agent.send(Request.parse(data));
sendToAgentWithRetry(hostId, agent, Request.parse(data));
}
return;
}
Expand Down Expand Up @@ -733,6 +751,43 @@ protected void doTask(final Task task) throws TaskExecutionException {
}
}

private void sendToAgentWithRetry(final long hostId, final AgentAttache initialAgent, final Request req) throws AgentUnavailableException {
final int retries = Math.max(0, getAgentSendRetryCount());
final int intervalMs = Math.max(0, getAgentSendRetryIntervalMs());

AgentAttache agent = initialAgent;
AgentUnavailableException last = null;

for (int attempt = 0; attempt <= retries; attempt++) {
if (attempt > 0 && intervalMs > 0) {
sleepRetry(intervalMs);
}

try {
agent = resolveAttacheForRetry(hostId, agent, attempt > 0);
} catch (AgentUnavailableException e) {
last = e;
continue;
}

if (isForwardWithoutPeer(agent, hostId)) {
last = new AgentUnavailableException("Unable to find peer", hostId);
agent = null;
continue;
}

try {
agent.send(req, null);
return;
} catch (AgentUnavailableException e) {
last = e;
agent = null;
}
}

throw (last != null) ? last : new AgentUnavailableException("agent not logged into this management server", hostId);
}

@Override
public void onManagementNodeJoined(final List<? extends ManagementServerHost> nodeList, final long selfNodeId) {
}
Expand Down
Loading