Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-3639] Replace asserts in daemon code. #3274

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -828,9 +828,15 @@ private static Map<String, Map<WorkerSlot, WorkerResources>> computeTopoToNodePo
return ret;
}

/**
* Check new assignments with existing assignments and determine difference is any.
*
* @param existingAssignments non-null map of topology-id to existing assignments.
* @param newAssignments non-null map of topology-id to new assignments.
* @return true if there is a change in assignments, false otherwise.
*/
private boolean auditAssignmentChanges(Map<String, Assignment> existingAssignments,
Map<String, Assignment> newAssignments) {
assert existingAssignments != null && newAssignments != null;
agresch marked this conversation as resolved.
Show resolved Hide resolved
boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty();
long numRemovedExec = 0;
long numRemovedSlot = 0;
Expand Down Expand Up @@ -1966,8 +1972,12 @@ private void waitForDesiredCodeReplication(Map<String, Object> topoConf, String

private TopologyDetails readTopologyDetails(String topoId, StormBase base) throws KeyNotFoundException,
AuthorizationException, IOException, InvalidTopologyException {
assert (base != null);
assert (topoId != null);
if (base == null) {
throw new InvalidTopologyException("Cannot readTopologyDetails: StormBase parameter value is null");
}
if (topoId == null) {
throw new InvalidTopologyException("Cannot readTopologyDetails: topoId parameter value is null");
}

Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, topoCache);
StormTopology topo = readStormTopologyAsNimbus(topoId, topoCache);
Expand Down Expand Up @@ -2067,7 +2077,9 @@ private List<List<Integer>> computeExecutors(StormBase base, Map<String, Object>
StormTopology topology)
throws InvalidTopologyException {

assert (base != null);
if (base == null) {
throw new InvalidTopologyException("Cannot computeExecutors: StormBase parameter value is null");
}

Map<String, Integer> compToExecutors = base.get_component_executors();
List<List<Integer>> ret = new ArrayList<>();
Expand Down Expand Up @@ -2677,7 +2689,9 @@ private int getTopologyLaunchHeartbeatTimeoutSec(String topoId) {
private void startTopology(String topoName, String topoId, TopologyStatus initStatus, String owner,
String principal, Map<String, Object> topoConf, StormTopology stormTopology)
throws InvalidTopologyException {
assert (TopologyStatus.ACTIVE == initStatus || TopologyStatus.INACTIVE == initStatus);
if (TopologyStatus.ACTIVE != initStatus && TopologyStatus.INACTIVE != initStatus) {
throw new InvalidTopologyException("Cannot startTopology: initStatus should be ACTIVE or INACTIVE, not " + initStatus.name());
}
Map<String, Integer> numExecutors = new HashMap<>();
StormTopology topology = StormCommon.systemTopology(topoConf, stormTopology);
for (Entry<String, Object> entry : StormCommon.allComponents(topology).entrySet()) {
Expand Down Expand Up @@ -3195,7 +3209,9 @@ public void submitTopologyWithOpts(String topoName, String uploadedJarLocation,
try {
submitTopologyWithOptsCalls.mark();
assertIsLeader();
assert (options != null);
if (options == null) {
throw new InvalidTopologyException("Cannot submitTopologyWithOpts: SubmitOptions parameter value is null");
}
validateTopologyName(topoName);
checkAuthorization(topoName, null, "submitTopology");
assertTopoActive(topoName, false);
Expand Down Expand Up @@ -5321,10 +5337,12 @@ private class ClusterSummaryMetricSet implements Runnable {
ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) {
this.metricsRegistry = metricsRegistry;
//Break the code if out of sync to thrift protocol
assert ClusterSummary._Fields.values().length == 3
&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS
&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES
&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES;
if (ClusterSummary._Fields.values().length != 3
|| ClusterSummary._Fields.findByName("supervisors") != ClusterSummary._Fields.SUPERVISORS
|| ClusterSummary._Fields.findByName("topologies") != ClusterSummary._Fields.TOPOLOGIES
|| ClusterSummary._Fields.findByName("nimbuses") != ClusterSummary._Fields.NIMBUSES) {
throw new AssertionError("Out of sync with thrift protocol");
}

final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ public BasicContainer(ContainerType type, Map<String, Object> conf, String super
AdvancedFSOps ops, String profileCmd) throws IOException {
super(type, conf, supervisorId, supervisorPort, port, assignment,
resourceIsolationManager, workerId, topoConf, ops, metricsRegistry, containerMemoryTracker);
assert (localState != null);
if (localState == null) {
throw new IOException("LocalState parameter value is null");
}
this.localState = localState;

if (type.isRecovery() && !type.isOnlyKillable()) {
Expand Down Expand Up @@ -209,7 +211,11 @@ public static String getStormVersionFor(final Map<String, Object> conf, final St
*/
protected void createNewWorkerId() {
type.assertFull();
assert (workerId == null);
if (workerId != null) {
String err = "Incorrect usage of createNewWorkerId(), current workerId is " + workerId + ", expecting null";
LOG.error(err);
throw new AssertionError(err);
}
synchronized (localState) {
workerId = Utils.uuid();
Map<String, Integer> workerToPort = localState.getApprovedWorkers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,15 @@ protected Container(ContainerType type, Map<String, Object> conf, String supervi
int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
String workerId, Map<String, Object> topoConf, AdvancedFSOps ops,
StormMetricsRegistry metricsRegistry, ContainerMemoryTracker containerMemoryTracker) throws IOException {
assert (type != null);
assert (conf != null);
assert (supervisorId != null);
if (type == null) {
throw new IOException("ContainerType parameter is null");
}
if (conf == null) {
throw new IOException("conf parameter value is null");
}
if (supervisorId == null) {
throw new IOException("SupervisorId parameter value is null");
}

symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);

Expand All @@ -137,14 +143,24 @@ protected Container(ContainerType type, Map<String, Object> conf, String supervi
}

if (this.type.isOnlyKillable()) {
assert (this.assignment == null);
assert (this.port <= 0);
assert (this.workerId != null);
if (this.assignment != null) {
throw new IOException("With ContainerType==OnlyKillable, expecting LocalAssignment member variable to be null");
}
if (this.port > 0) {
throw new IOException("With ContainerType==OnlyKillable, expecting port member variable <=0 but found " + this.port);
}
if (this.workerId == null) {
throw new IOException("With ContainerType==OnlyKillable, expecting WorkerId member variable to be assigned");
}
topologyId = null;
this.topoConf = null;
} else {
assert (assignment != null);
assert (port > 0);
if (this.assignment == null) {
throw new IOException("With ContainerType!=OnlyKillable, expecting LocalAssignment member variable to be assigned");
}
if (this.port <= 0) {
throw new IOException("With ContainerType!=OnlyKillable, expecting port member variable >0 but found " + this.port);
}
topologyId = assignment.get_topology_id();
if (!this.ops.doRequiredTopoFilesExist(this.conf, topologyId)) {
LOG.info(
Expand Down Expand Up @@ -175,7 +191,9 @@ public String toString() {
}

protected Map<String, Object> readTopoConf() throws IOException {
assert (topologyId != null);
if (topologyId == null) {
throw new IOException("Cannot readTopoConf: member variable topologyId is null");
}
return ConfigUtils.readSupervisorStormConf(conf, topologyId);
}

Expand Down
Loading