From 0b9eb15c1e3f4b88963cc5a037d5cb5cedc36d38 Mon Sep 17 00:00:00 2001 From: "Glover, Rene (rg9975)" Date: Fri, 16 Jan 2026 11:48:43 -0600 Subject: [PATCH 1/4] fixes for agent.send peer disconnect issues during command execution --- .../cloud/agent/manager/AgentManagerImpl.java | 166 +++++++++++++++++- .../agent/manager/ClusteredAgentAttache.java | 3 +- .../manager/ClusteredAgentManagerImpl.java | 69 +++++++- 3 files changed, 231 insertions(+), 7 deletions(-) diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index 3d398ca5dd95..a1ba2632dc97 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@ -655,7 +655,7 @@ public Answer[] send(final Long hostId, final Commands commands, int timeout) th final Command[] cmds = checkForCommandsAndTag(commands); //check what agent is returned. - final AgentAttache agent = getAttache(hostId); + AgentAttache agent = getAttache(hostId); if (agent == null || agent.isClosed()) { throw new AgentUnavailableException("agent not logged into this management server", hostId); } @@ -665,7 +665,41 @@ public Answer[] send(final Long hostId, final Commands commands, int timeout) th reconcileCommandService.persistReconcileCommands(hostId, req.getSequence(), cmds); - final Answer[] answers = agent.send(req, wait); + final int retries = Math.max(0, getAgentSendRetryCount()); + final int intervalMs = Math.max(0, getAgentSendRetryIntervalMs()); + AgentUnavailableException last = null; + Answer[] answers = null; + + for (int attempt = 0; attempt <= retries; attempt++) { + if (attempt > 0 && intervalMs > 0) { + sleepRetry(intervalMs); + } + + try { + agent = resolveAttacheForRetry(hostId, agent, attempt > 0); + } catch (AgentUnavailableException e) { + last = e; + continue; + } + + if (isForwardWithoutPeer(agent, hostId)) { + last = new AgentUnavailableException("Unable to find peer", hostId); + agent = null; + continue; + } + + try { + answers = agent.send(req, wait); + break; + } catch (AgentUnavailableException e) { + last = e; + agent = null; + } + } + + if (answers == null) { + throw (last != null) ? last : new AgentUnavailableException("agent not logged into this management server", hostId); + } reconcileCommandService.processAnswers(req.getSequence(), cmds, answers); @@ -705,7 +739,7 @@ protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableExce @Override public long send(final Long hostId, final Commands commands, final Listener listener) throws AgentUnavailableException { - final AgentAttache agent = getAttache(hostId); + AgentAttache agent = getAttache(hostId); if (agent.isClosed()) { throw new AgentUnavailableException(String.format( "Agent [id: %d, name: %s] is closed", @@ -717,10 +751,134 @@ public long send(final Long hostId, final Commands commands, final Listener list final Request req = new Request(hostId, agent.getName(), _nodeId, cmds, commands.stopOnError(), true); req.setSequence(agent.getNextSequence()); - agent.send(req, listener); + final int retries = Math.max(0, getAgentSendRetryCount()); + final int intervalMs = Math.max(0, getAgentSendRetryIntervalMs()); + AgentUnavailableException last = null; + + for (int attempt = 0; attempt <= retries; attempt++) { + if (attempt > 0 && intervalMs > 0) { + sleepRetry(intervalMs); + } + + try { + agent = resolveAttacheForRetry(hostId, agent, attempt > 0); + } catch (AgentUnavailableException e) { + last = e; + continue; + } + + if (isForwardWithoutPeer(agent, hostId)) { + last = new AgentUnavailableException("Unable to find peer", hostId); + agent = null; + continue; + } + + try { + agent.send(req, listener); + last = null; + break; + } catch (AgentUnavailableException e) { + last = e; + agent = null; + } + } + + if (last != null) { + throw last; + } return req.getSequence(); } + protected int getAgentSendRetryCount() { + return 0; + } + + protected int getAgentSendRetryIntervalMs() { + return 0; + } + + protected AgentAttache resolveAttacheForRetry(final Long hostId, final AgentAttache current, final boolean forceReload) + throws AgentUnavailableException { + AgentAttache agent = current; + if (forceReload || agent == null || agent.isClosed()) { + agent = findAttache(hostId); + } + if (agent == null || agent.isClosed()) { + agent = getAttache(hostId); + } + return agent; + } + + protected boolean isForwardWithoutPeer(final AgentAttache agent, final Long hostId) { + if (agent == null || hostId == null) { + return false; + } + if (!(this instanceof ClusteredAgentManagerImpl) || !agent.forForward()) { + return false; + } + final ClusteredAgentManagerImpl clusteredMgr = (ClusteredAgentManagerImpl)this; + return clusteredMgr.getPeerName(hostId) == null; + } + + protected void sleepRetry(final int intervalMs) { + try { + Thread.sleep(intervalMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + protected AgentAttache resolveAttacheForSend(final Long hostId, final AgentAttache agent) throws AgentUnavailableException { + if (hostId == null || agent == null) { + return agent; + } + + // Only clustered forwarding attaches need peer resolution. + if (!(this instanceof ClusteredAgentManagerImpl) || !agent.forForward()) { + return agent; + } + + final ClusteredAgentManagerImpl clusteredMgr = (ClusteredAgentManagerImpl)this; + if (clusteredMgr.getPeerName(hostId) != null) { + return agent; + } + + final int retries = Math.max(0, clusteredMgr.getPeerLookupRetryCount()); + final int intervalMs = Math.max(0, clusteredMgr.getPeerLookupRetryIntervalMs()); + if (retries <= 0) { + return agent; + } + + for (int attempt = 1; attempt <= retries; attempt++) { + if (intervalMs > 0) { + try { + Thread.sleep(intervalMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + + final AgentAttache current = findAttache(hostId); + if (current == null || current.isClosed()) { + continue; + } + + // If the agent reconnected locally while we were retrying, use it. + if (!current.forForward()) { + return current; + } + + // If the host ownership mapping updated to a remote MS, forwarding can proceed. + if (clusteredMgr.getPeerName(hostId) != null) { + return current; + } + } + + // Preserve the original error semantics, but fail before we persist reconcile commands. + throw new AgentUnavailableException("Unable to find peer", hostId); + } + public void removeAgent(final AgentAttache attache, final Status nextState) { if (attache == null) { return; diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentAttache.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentAttache.java index 5e4ccfa67c6f..5c67b8b03b98 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentAttache.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentAttache.java @@ -158,7 +158,8 @@ public void send(final Request req, final Listener listener) throws AgentUnavail boolean error = true; try { while (i++ < 5) { - String peerName = s_clusteredAgentMgr.findPeer(_id); + final String peerName = s_clusteredAgentMgr.findPeer(_id); + if (peerName == null) { throw new AgentUnavailableException("Unable to find peer", _id); } diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index c64489828033..d6ffb8bf9d5c 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -162,6 +162,14 @@ protected ClusteredAgentManagerImpl() { protected final ConfigKey ScanInterval = new ConfigKey<>(Integer.class, "direct.agent.scan.interval", "Advanced", "90", "Interval between scans to load direct agents", false, ConfigKey.Scope.Global, 1000); + protected final ConfigKey PeerLookupRetryCount = new ConfigKey<>(Integer.class, + "cluster.agent.peer.lookup.retry.count", "Advanced", "0", + "Number of retries (in addition to the initial attempt) to resolve the peer management server for a host when forwarding agent commands in a management server cluster.", true); + + protected final ConfigKey PeerLookupRetryIntervalMs = new ConfigKey<>(Integer.class, + "cluster.agent.peer.lookup.retry.interval.ms", "Advanced", "200", + "Sleep interval in milliseconds between peer lookup retries when forwarding agent commands in a management server cluster.", true); + @Override public boolean configure(final String name, final Map xmlParams) throws ConfigurationException { _peers = new HashMap<>(7); @@ -467,6 +475,14 @@ public String findPeer(final long hostId) { return getPeerName(hostId); } + protected int getPeerLookupRetryCount() { + return PeerLookupRetryCount.value(); + } + + protected int getPeerLookupRetryIntervalMs() { + return PeerLookupRetryIntervalMs.value(); + } + public SSLEngine getSSLEngine(final String peerName) { return _sslEngines.get(peerName); } @@ -594,6 +610,16 @@ protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableExce return agent; } + @Override + protected int getAgentSendRetryCount() { + return Math.max(0, getPeerLookupRetryCount()); + } + + @Override + protected int getAgentSendRetryIntervalMs() { + return Math.max(0, getPeerLookupRetryIntervalMs()); + } + @Override public boolean stop() { if (_peers != null) { @@ -679,13 +705,13 @@ protected void doTask(final Task task) throws TaskExecutionException { // But we have the serialize the control commands here so we have // to deserialize this and send it through the agent attache. final Request req = Request.parse(data); - agent.send(req, null); + sendToAgentWithRetry(hostId, agent, req); } else { if (agent instanceof Routable) { final Routable cluster = (Routable) agent; cluster.routeToAgent(data); } else { - agent.send(Request.parse(data)); + sendToAgentWithRetry(hostId, agent, Request.parse(data)); } return; } @@ -733,6 +759,43 @@ protected void doTask(final Task task) throws TaskExecutionException { } } + private void sendToAgentWithRetry(final long hostId, final AgentAttache initialAgent, final Request req) throws AgentUnavailableException { + final int retries = Math.max(0, getAgentSendRetryCount()); + final int intervalMs = Math.max(0, getAgentSendRetryIntervalMs()); + + AgentAttache agent = initialAgent; + AgentUnavailableException last = null; + + for (int attempt = 0; attempt <= retries; attempt++) { + if (attempt > 0 && intervalMs > 0) { + sleepRetry(intervalMs); + } + + try { + agent = resolveAttacheForRetry(hostId, agent, attempt > 0); + } catch (AgentUnavailableException e) { + last = e; + continue; + } + + if (isForwardWithoutPeer(agent, hostId)) { + last = new AgentUnavailableException("Unable to find peer", hostId); + agent = null; + continue; + } + + try { + agent.send(req, null); + return; + } catch (AgentUnavailableException e) { + last = e; + agent = null; + } + } + + throw (last != null) ? last : new AgentUnavailableException("agent not logged into this management server", hostId); + } + @Override public void onManagementNodeJoined(final List nodeList, final long selfNodeId) { } @@ -1613,6 +1676,8 @@ public ConfigKey[] getConfigKeys() { keysLst.add(ConnectedAgentThreshold); keysLst.add(LoadSize); keysLst.add(ScanInterval); + keysLst.add(PeerLookupRetryCount); + keysLst.add(PeerLookupRetryIntervalMs); return keysLst.toArray(new ConfigKey[keysLst.size()]); } } From 2d05bd99978f390f620772f61cf2f6d7ecde5eb5 Mon Sep 17 00:00:00 2001 From: "Glover, Rene (rg9975)" Date: Fri, 16 Jan 2026 12:05:08 -0600 Subject: [PATCH 2/4] fixes for agent.send peer disconnect issues during command execution --- .../cloud/agent/manager/AgentManagerImpl.java | 16 +++++++++++++--- .../agent/manager/ClusteredAgentManagerImpl.java | 10 ---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index a1ba2632dc97..c323d41aeae7 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@ -238,6 +238,14 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl false); protected final ConfigKey RemoteAgentNewConnectionsMonitorInterval = new ConfigKey<>("Advanced", Integer.class, "agent.connections.monitor.interval", "1800", "Time in seconds to monitor the new agent connections and cleanup the expired connections.", false); + + protected final ConfigKey PeerLookupRetryCount = new ConfigKey<>(Integer.class, + "cluster.agent.peer.lookup.retry.count", "Advanced", "1", + "Number of retries (in addition to the initial attempt) to resolve the peer management server for a host when forwarding agent commands in a management server cluster.", true); + + protected final ConfigKey PeerLookupRetryIntervalMs = new ConfigKey<>(Integer.class, + "cluster.agent.peer.lookup.retry.interval.ms", "Advanced", "200", + "Sleep interval in milliseconds between peer lookup retries when forwarding agent commands in a management server cluster.", true); protected final ConfigKey AlertWait = new ConfigKey<>("Advanced", Integer.class, "alert.wait", "1800", "Seconds to wait before alerting on a disconnected agent", true); protected final ConfigKey DirectAgentLoadSize = new ConfigKey<>("Advanced", Integer.class, "direct.agent.load.size", "16", @@ -790,11 +798,13 @@ public long send(final Long hostId, final Commands commands, final Listener list } protected int getAgentSendRetryCount() { - return 0; + final String value = _configDao != null ? _configDao.getValue("cluster.agent.peer.lookup.retry.count") : null; + return NumbersUtil.parseInt(value, 1); } protected int getAgentSendRetryIntervalMs() { - return 0; + final String value = _configDao != null ? _configDao.getValue("cluster.agent.peer.lookup.retry.interval.ms") : null; + return NumbersUtil.parseInt(value, 200); } protected AgentAttache resolveAttacheForRetry(final Long hostId, final AgentAttache current, final boolean forceReload) @@ -2271,7 +2281,7 @@ public ConfigKey[] getConfigKeys() { return new ConfigKey[] { CheckTxnBeforeSending, Workers, Port, Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize, DirectAgentThreadCap, EnableKVMAutoEnableDisable, ReadyCommandWait, GranularWaitTimeForCommands, RemoteAgentSslHandshakeTimeout, RemoteAgentMaxConcurrentNewConnections, - RemoteAgentNewConnectionsMonitorInterval }; + RemoteAgentNewConnectionsMonitorInterval, PeerLookupRetryCount, PeerLookupRetryIntervalMs }; } protected class SetHostParamsListener implements Listener { diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index d6ffb8bf9d5c..fc3b9e164d93 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -162,14 +162,6 @@ protected ClusteredAgentManagerImpl() { protected final ConfigKey ScanInterval = new ConfigKey<>(Integer.class, "direct.agent.scan.interval", "Advanced", "90", "Interval between scans to load direct agents", false, ConfigKey.Scope.Global, 1000); - protected final ConfigKey PeerLookupRetryCount = new ConfigKey<>(Integer.class, - "cluster.agent.peer.lookup.retry.count", "Advanced", "0", - "Number of retries (in addition to the initial attempt) to resolve the peer management server for a host when forwarding agent commands in a management server cluster.", true); - - protected final ConfigKey PeerLookupRetryIntervalMs = new ConfigKey<>(Integer.class, - "cluster.agent.peer.lookup.retry.interval.ms", "Advanced", "200", - "Sleep interval in milliseconds between peer lookup retries when forwarding agent commands in a management server cluster.", true); - @Override public boolean configure(final String name, final Map xmlParams) throws ConfigurationException { _peers = new HashMap<>(7); @@ -1676,8 +1668,6 @@ public ConfigKey[] getConfigKeys() { keysLst.add(ConnectedAgentThreshold); keysLst.add(LoadSize); keysLst.add(ScanInterval); - keysLst.add(PeerLookupRetryCount); - keysLst.add(PeerLookupRetryIntervalMs); return keysLst.toArray(new ConfigKey[keysLst.size()]); } } From 32ce1c20a205d30feabf6c1b9c45d4851c0df3f0 Mon Sep 17 00:00:00 2001 From: "Glover, Rene (rg9975)" Date: Fri, 16 Jan 2026 12:10:04 -0600 Subject: [PATCH 3/4] fixes for agent.send peer disconnect issues during command execution --- .../java/com/cloud/agent/manager/AgentManagerImpl.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index c323d41aeae7..fe5bc39fa18a 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@ -239,12 +239,12 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected final ConfigKey RemoteAgentNewConnectionsMonitorInterval = new ConfigKey<>("Advanced", Integer.class, "agent.connections.monitor.interval", "1800", "Time in seconds to monitor the new agent connections and cleanup the expired connections.", false); - protected final ConfigKey PeerLookupRetryCount = new ConfigKey<>(Integer.class, - "cluster.agent.peer.lookup.retry.count", "Advanced", "1", + protected final ConfigKey PeerLookupRetryCount = new ConfigKey<>("Advanced", Integer.class, + "agent.peer.lookup.retry.count", "1", "Number of retries (in addition to the initial attempt) to resolve the peer management server for a host when forwarding agent commands in a management server cluster.", true); - protected final ConfigKey PeerLookupRetryIntervalMs = new ConfigKey<>(Integer.class, - "cluster.agent.peer.lookup.retry.interval.ms", "Advanced", "200", + protected final ConfigKey PeerLookupRetryIntervalMs = new ConfigKey<>("Advanced", Integer.class, + "agent.peer.lookup.retry.interval.ms", "200", "Sleep interval in milliseconds between peer lookup retries when forwarding agent commands in a management server cluster.", true); protected final ConfigKey AlertWait = new ConfigKey<>("Advanced", Integer.class, "alert.wait", "1800", "Seconds to wait before alerting on a disconnected agent", true); From 714c00aa20f1e77efbc884c2f93b429b2c5a1255 Mon Sep 17 00:00:00 2001 From: "Glover, Rene (rg9975)" Date: Fri, 16 Jan 2026 14:39:19 -0600 Subject: [PATCH 4/4] fixes for agent.send peer disconnect issues during command execution --- .../main/java/com/cloud/agent/manager/AgentManagerImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index fe5bc39fa18a..f256b8d11185 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@ -798,13 +798,11 @@ public long send(final Long hostId, final Commands commands, final Listener list } protected int getAgentSendRetryCount() { - final String value = _configDao != null ? _configDao.getValue("cluster.agent.peer.lookup.retry.count") : null; - return NumbersUtil.parseInt(value, 1); + PeerLookupRetryCount.value(); } protected int getAgentSendRetryIntervalMs() { - final String value = _configDao != null ? _configDao.getValue("cluster.agent.peer.lookup.retry.interval.ms") : null; - return NumbersUtil.parseInt(value, 200); + PeerLookupRetryIntervalMs.value(); } protected AgentAttache resolveAttacheForRetry(final Long hostId, final AgentAttache current, final boolean forceReload)