From 66dcbbe502aa3438b2a1b107d146e25de058f82c Mon Sep 17 00:00:00 2001 From: Srikanth Viswanathan Date: Sat, 17 Mar 2018 00:08:35 -0400 Subject: [PATCH 1/2] Reap previously launched logviewer tasks Reap previously launched logviewer tasks if `mesos.logviewer.sidecar.enabled` is now `false` --- storm/src/main/storm/mesos/MesosNimbus.java | 92 +++++++++---------- .../storm/mesos/NimbusMesosScheduler.java | 55 ++++++++--- 2 files changed, 87 insertions(+), 60 deletions(-) diff --git a/storm/src/main/storm/mesos/MesosNimbus.java b/storm/src/main/storm/mesos/MesosNimbus.java index edabaa386..bd755ed88 100644 --- a/storm/src/main/storm/mesos/MesosNimbus.java +++ b/storm/src/main/storm/mesos/MesosNimbus.java @@ -43,11 +43,11 @@ import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.TaskInfo; +import org.apache.mesos.Protos.TaskState; import org.apache.mesos.Protos.TaskStatus; import org.apache.mesos.Protos.Value.Range; import org.apache.mesos.Protos.Value.Ranges; import org.apache.mesos.Protos.Value.Scalar; -import org.apache.mesos.Protos.TaskState; import org.apache.mesos.SchedulerDriver; import org.json.simple.JSONValue; import org.slf4j.Logger; @@ -60,8 +60,8 @@ import storm.mesos.resources.ResourceEntry; import storm.mesos.resources.ResourceNotAvailableException; import storm.mesos.resources.ResourceType; -import storm.mesos.schedulers.StormSchedulerImpl; import storm.mesos.schedulers.IMesosStormScheduler; +import storm.mesos.schedulers.StormSchedulerImpl; import storm.mesos.shims.CommandLineShimFactory; import storm.mesos.shims.ICommandLineShim; import storm.mesos.shims.LocalStateShim; @@ -91,8 +91,8 @@ import java.util.TimerTask; import static storm.mesos.util.PrettyProtobuf.offerIDListToString; -import static storm.mesos.util.PrettyProtobuf.offerToString; import static storm.mesos.util.PrettyProtobuf.offerMapToString; +import static storm.mesos.util.PrettyProtobuf.offerToString; import static storm.mesos.util.PrettyProtobuf.taskInfoListToString; import static storm.mesos.util.PrettyProtobuf.taskStatusListToTaskIDsString; @@ -222,22 +222,9 @@ void initializeMesosStormConf(Map conf, String localDir) { _disallowedHosts = listIntoSet((List) conf.get(CONF_MESOS_DISALLOWED_HOSTS)); _enabledLogviewerSidecar = MesosCommon.enabledLogviewerSidecar(conf); - if (_enabledLogviewerSidecar) { - Set zkServerSet = listIntoSet((List) conf.get(Config.STORM_ZOOKEEPER_SERVERS)); - String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT)); - _logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers"; - LOG.info("Logviewer information will be stored under {}", _logviewerZkDir); - - if (zkPort == null || zkServerSet == null) { - throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT); - } else { - List zkConnectionList = new ArrayList<>(); - for (String server : zkServerSet) { - zkConnectionList.add(String.format("%s:%s", server, zkPort)); - } - _zkClient = new ZKClient(StringUtils.join(zkConnectionList, ',')); - } - } + initializeZkClient(conf); + _logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers"; + LOG.info("Logviewer ZK path: {}", _logviewerZkDir); Boolean preferReservedResources = (Boolean) conf.get(CONF_MESOS_PREFER_RESERVED_RESOURCES); if (preferReservedResources != null) { @@ -245,7 +232,7 @@ void initializeMesosStormConf(Map conf, String localDir) { } _container = Optional.fromNullable((String) conf.get(CONF_MESOS_CONTAINER_DOCKER_IMAGE)); - _mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir); + _mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir, _enabledLogviewerSidecar); // Generate YAML to be served up to clients _generatedConfPath = Paths.get( @@ -272,6 +259,21 @@ void initializeMesosStormConf(Map conf, String localDir) { } } + private void initializeZkClient(Map conf) { + Set zkServerSet = listIntoSet((List) conf.get(Config.STORM_ZOOKEEPER_SERVERS)); + String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT)); + + if (zkPort == null || zkServerSet == null) { + throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT); + } else { + List zkConnectionList = new ArrayList<>(); + for (String server : zkServerSet) { + zkConnectionList.add(String.format("%s:%s", server, zkPort)); + } + _zkClient = new ZKClient(StringUtils.join(zkConnectionList, ',')); + } + } + @SuppressWarnings("unchecked") protected void startLocalHttpServer() throws Exception { createLocalServerPort(); @@ -286,34 +288,32 @@ public void doRegistration(final SchedulerDriver driver, Protos.FrameworkID id) _state.put(FRAMEWORK_ID, id.getValue()); _offers = new HashMap(); - if (_enabledLogviewerSidecar) { - - _timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - // performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks - // in the framework scheduler's statusUpdate() method - List taskStatuses = new ArrayList(); - List logviewerPaths = _zkClient.getChildren(_logviewerZkDir); - if (logviewerPaths == null) { - _driver.reconcileTasks(taskStatuses); - return; - } - for (String path : logviewerPaths) { - TaskID logviewerTaskId = TaskID.newBuilder() - .setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path)))) - .build(); - TaskStatus logviewerTaskStatus = TaskStatus.newBuilder() - .setTaskId(logviewerTaskId) - .setState(TaskState.TASK_RUNNING) - .build(); - taskStatuses.add(logviewerTaskStatus); - } + _timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + // performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks + // in the framework scheduler's statusUpdate() method + List taskStatuses = new ArrayList(); + List logviewerPaths = _zkClient.getChildren(_logviewerZkDir); + if (logviewerPaths == null || !_enabledLogviewerSidecar) { _driver.reconcileTasks(taskStatuses); - LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses)); + return; } - }, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes - } + + for (String path : logviewerPaths) { + TaskID logviewerTaskId = TaskID.newBuilder() + .setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path)))) + .build(); + TaskStatus logviewerTaskStatus = TaskStatus.newBuilder() + .setTaskId(logviewerTaskId) + .setState(TaskState.TASK_RUNNING) + .build(); + taskStatuses.add(logviewerTaskStatus); + } + _driver.reconcileTasks(taskStatuses); + LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses)); + } + }, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes } public void shutdown() throws Exception { diff --git a/storm/src/main/storm/mesos/NimbusMesosScheduler.java b/storm/src/main/storm/mesos/NimbusMesosScheduler.java index 61c74a7e3..531107c38 100644 --- a/storm/src/main/storm/mesos/NimbusMesosScheduler.java +++ b/storm/src/main/storm/mesos/NimbusMesosScheduler.java @@ -17,13 +17,7 @@ */ package storm.mesos; -import org.apache.mesos.Protos.ExecutorID; -import org.apache.mesos.Protos.FrameworkID; -import org.apache.mesos.Protos.MasterInfo; -import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.SlaveID; -import org.apache.mesos.Protos.TaskStatus; +import org.apache.mesos.Protos.*; import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; import org.slf4j.Logger; @@ -42,12 +36,14 @@ public class NimbusMesosScheduler implements Scheduler { private ZKClient zkClient; private String logviewerZkDir; private CountDownLatch _registeredLatch = new CountDownLatch(1); + private boolean enableLogViewers; public static final Logger LOG = LoggerFactory.getLogger(MesosNimbus.class); - public NimbusMesosScheduler(MesosNimbus mesosNimbus, ZKClient zkClient, String logviewerZkDir) { + public NimbusMesosScheduler(MesosNimbus mesosNimbus, ZKClient zkClient, String logviewerZkDir, boolean enableLogViewers) { this.mesosNimbus = mesosNimbus; this.zkClient = zkClient; this.logviewerZkDir = logviewerZkDir; + this.enableLogViewers = enableLogViewers; } public void waitUntilRegistered() throws InterruptedException { @@ -98,6 +94,7 @@ public void statusUpdate(SchedulerDriver driver, TaskStatus status) { if (status.getTaskId().getValue().contains("logviewer")) { updateLogviewerState(status); } + switch (status.getState()) { case TASK_STAGING: case TASK_STARTING: @@ -125,25 +122,34 @@ private void updateLogviewerState(TaskStatus status) { LOG.error("updateLogviewerState: taskId for logviewer, {}, isn't formatted correctly so ignoring task update", taskId); return; } + String nodeId = taskId.split("\\" + MesosCommon.MESOS_COMPONENT_ID_DELIMITER)[1]; String logviewerZKPath = String.format("%s/%s", logviewerZkDir, nodeId); + + if (!enableLogViewers) { + LOG.info("Logviewers are disabled. Reaping existing logviewer task {}", taskId); + reapLogviewerTask(logviewerZKPath, status); + return; + } + switch (status.getState()) { case TASK_STAGING: - checkRunningLogviewerState(logviewerZKPath); + ensureZNodeExists(logviewerZKPath); return; case TASK_STARTING: - checkRunningLogviewerState(logviewerZKPath); + ensureZNodeExists(logviewerZKPath); return; case TASK_RUNNING: - checkRunningLogviewerState(logviewerZKPath); + ensureZNodeExists(logviewerZKPath); return; case TASK_LOST: // this status update can be triggered by the explicit kill and isn't terminal, do not kill again break; default: - // explicitly kill the logviewer task to ensure logviewer is terminated + // explicitly kill the logviewer task to ensure it is terminated mesosNimbus._driver.killTask(status.getTaskId()); } + // if it gets to this point it means logviewer terminated; update ZK with new logviewer state if (zkClient.nodeExists(logviewerZKPath)) { LOG.info("updateLogviewerState: Remove logviewer state in zk at {} for logviewer task {}", logviewerZKPath, taskId); @@ -154,13 +160,34 @@ private void updateLogviewerState(TaskStatus status) { } } - private void checkRunningLogviewerState(String logviewerZKPath) { + private void ensureZNodeExists(String logviewerZKPath) { if (!zkClient.nodeExists(logviewerZKPath)) { - LOG.error("checkRunningLogviewerState: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper"); + LOG.warn("ensureZNodeExists: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper"); zkClient.createNode(logviewerZKPath); } } + private void reapLogviewerTask(String logviewerZKPath, TaskStatus status) { + String taskId = status.getTaskId().getValue(); + if (zkClient.nodeExists(logviewerZKPath)) { + LOG.info("reapLogviewerTask: Remove logviewer state in zk at {} for logviewer task {}", logviewerZKPath, taskId); + zkClient.deleteNode(logviewerZKPath); + } + + switch (status.getState()) { + case TASK_FAILED: + case TASK_FINISHED: + case TASK_KILLED: + case TASK_LOST: + // terminal states + break; + default: + // explicitly kill the logviewer task to ensure it is terminated + LOG.info("reapLogviewerTask: Killing logviewer mesos task {}", logviewerZKPath, taskId); + mesosNimbus._driver.killTask(status.getTaskId()); + } + } + @Override public void frameworkMessage(SchedulerDriver driver, ExecutorID executorId, SlaveID slaveId, byte[] data) { } From 8fd4ba71d789abc89c475deaa0bc4f424deec093 Mon Sep 17 00:00:00 2001 From: Srikanth Viswanathan Date: Tue, 20 Mar 2018 23:54:54 -0400 Subject: [PATCH 2/2] Address review feedback --- .../storm/mesos/NimbusMesosScheduler.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/storm/src/main/storm/mesos/NimbusMesosScheduler.java b/storm/src/main/storm/mesos/NimbusMesosScheduler.java index 531107c38..6eed505a8 100644 --- a/storm/src/main/storm/mesos/NimbusMesosScheduler.java +++ b/storm/src/main/storm/mesos/NimbusMesosScheduler.java @@ -17,7 +17,13 @@ */ package storm.mesos; -import org.apache.mesos.Protos.*; +import org.apache.mesos.Protos.ExecutorID; +import org.apache.mesos.Protos.FrameworkID; +import org.apache.mesos.Protos.MasterInfo; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.OfferID; +import org.apache.mesos.Protos.SlaveID; +import org.apache.mesos.Protos.TaskStatus; import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; import org.slf4j.Logger; @@ -94,7 +100,6 @@ public void statusUpdate(SchedulerDriver driver, TaskStatus status) { if (status.getTaskId().getValue().contains("logviewer")) { updateLogviewerState(status); } - switch (status.getState()) { case TASK_STAGING: case TASK_STARTING: @@ -122,25 +127,22 @@ private void updateLogviewerState(TaskStatus status) { LOG.error("updateLogviewerState: taskId for logviewer, {}, isn't formatted correctly so ignoring task update", taskId); return; } - String nodeId = taskId.split("\\" + MesosCommon.MESOS_COMPONENT_ID_DELIMITER)[1]; String logviewerZKPath = String.format("%s/%s", logviewerZkDir, nodeId); - if (!enableLogViewers) { LOG.info("Logviewers are disabled. Reaping existing logviewer task {}", taskId); reapLogviewerTask(logviewerZKPath, status); return; } - switch (status.getState()) { case TASK_STAGING: - ensureZNodeExists(logviewerZKPath); + ensureLogviewerZNodeExists(logviewerZKPath); return; case TASK_STARTING: - ensureZNodeExists(logviewerZKPath); + ensureLogviewerZNodeExists(logviewerZKPath); return; case TASK_RUNNING: - ensureZNodeExists(logviewerZKPath); + ensureLogviewerZNodeExists(logviewerZKPath); return; case TASK_LOST: // this status update can be triggered by the explicit kill and isn't terminal, do not kill again @@ -160,9 +162,9 @@ private void updateLogviewerState(TaskStatus status) { } } - private void ensureZNodeExists(String logviewerZKPath) { + private void ensureLogviewerZNodeExists(String logviewerZKPath) { if (!zkClient.nodeExists(logviewerZKPath)) { - LOG.warn("ensureZNodeExists: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper"); + LOG.warn("ensureLogviewerZNodeExists: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper"); zkClient.createNode(logviewerZKPath); } }