taskHelper.js

"use strict";

var lib = require("requirefrom")("lib");
var Mesos = require("./mesos")().getMesos();
var Builder = lib("builder");

/**
 * Represents a TaskHelper object
 * @constructor
 * @param {object} scheduler - The scheduler object.
 */
function TaskHelper(scheduler) {
    if (!(this instanceof TaskHelper)) {
        return new TaskHelper(scheduler);
    }
    var self = this;
    self.zkClient = scheduler.zkClient;
    self.scheduler = scheduler;
    self.logger = scheduler.logger;
    self.zkServicePath = self.scheduler.zkServicePath;
}

/**
 * Load the task nodes belonging to the framework from ZooKeeper.
 */
TaskHelper.prototype.loadTasks = function() {
    var self = this;
    self.zkClient.getChildren(self.zkServicePath + "/tasks", function (error, children, stat) {
        if (error) {
            self.logger.error("Could not load task information.");
            // We're ready to subscribe
            self.scheduler.emit("ready");
        } else if (children && children.length) {
            var childStates = {};
            children.forEach(function (child) {
                self.zkClient.getData(self.zkServicePath + "/tasks/" + child, function (error, data, stat) {
                    if (error || !data) {
                        self.logger.error("Could not load task information for " + child);
                        if (!error) {
                            self.deleteTask(child);
                        }
                        childStates[child] = {'loaded': false};
                        self.logger.debug("childStates length " + Object.keys(childStates).length.toString() + " children.length " + children.length.toString());
                        if (Object.keys(childStates).length === children.length) {
                            // We're ready to subscribe
                            self.scheduler.emit("ready");
                        }
                        return;
                    }
                    var pending = self.scheduler.pendingTasks;
                    self.scheduler.pendingTasks = [];
                    var task = JSON.parse(data.toString());
                    self.logger.debug("Loading task: " + JSON.stringify(task));
                    var found = false;
                    var i = 0;
                    var pendingTask;
                    function addVars(variable) {
                        // Check if variable name is either HOST or PORT# -> Set by this framework when starting a task - copy it to the loaded task
                        if (variable.name.match(/^HOST$/) !== null || variable.name.match(/^PORT[0-9]+/) !== null) {
                            // Add all matching (non-user-defined) environment variables
                            pendingTask.commandInfo.environment.variables.push(variable);
                        }
                    }
                    for (i = 0; i < pending.length; i += 1) {
                        pendingTask = pending[i];
                        self.logger.debug("Pending task: \"" + JSON.stringify(pendingTask) + "\"");
                        if (pendingTask.name === task.name) {
                            if (task.runtimeInfo && task.runtimeInfo.agentId && (task.runtimeInfo.state === "TASK_RUNNING" || task.runtimeInfo.state === "TASK_STAGING")) {
                                pendingTask.runtimeInfo = task.runtimeInfo;
                                pendingTask.taskId = task.taskId;
                                if (task.commandInfo && task.commandInfo.environment && task.commandInfo.environment.variables && task.commandInfo.environment.variables.length > 0) {
                                    if (!pendingTask.commandInfo) {
                                        pendingTask.commandInfo = new Builder("mesos.CommandInfo")
                                            .setEnvironment(new Mesos.Environment([]))
                                            .setShell(false);
                                    }
                                    if (!pendingTask.commandInfo.environment) {
                                        pendingTask.commandInfo.environment = new Mesos.Environment([]);
                                    }
                                    // Iterate over all environment variables
                                    task.commandInfo.environment.variables.forEach(addVars);
                                }
                                self.scheduler.launchedTasks.push(pendingTask);
                                pending.splice(i, 1);
                                self.scheduler.reconcileTasks.push(pendingTask);
                            } else {
                                self.deleteTask(task.taskId);
                            }
                            found = true;
                            break;
                        }
                    }
                    if (!found) {
                        self.logger.info("Setting task ID " + task.taskId + " to be killed");
                        self.scheduler.killTasks.push(task);
                    }
                    self.scheduler.pendingTasks = pending;
                    childStates[child] = {'loaded': true};
                    self.logger.debug("childStates length " + Object.keys(childStates).length.toString() + " children.length " + children.length.toString());
                    if (Object.keys(childStates).length === children.length) {
                        // We're ready to subscribe
                        self.scheduler.emit("ready");
                    }
                });
            });
        } else {
            // We're ready to subscribe - no tasks
            self.scheduler.emit("ready");
        }
    });
};

/**
 * Save task nodes from ZooKeeper.
 * @param {object} task - The task object which should be persisted to ZooKeeper.
 */
TaskHelper.prototype.saveTask = function (task) {
    var self = this;
    var data = new Buffer(JSON.stringify(task));
    // Seperating path creation from data save due to various client bugs.
    self.zkClient.mkdirp(self.zkServicePath+"/tasks/" + task.taskId, function (error, stat){
        if (error) {
            self.logger.error("Got error when creating task node in ZK " + task.name + " ID " + task.taskId + " data: " + error);
            return;
        }
        self.zkClient.setData(self.zkServicePath+"/tasks/" + task.taskId, data, function (error, stat) {
            if (error) {
                self.logger.error("Got error when saving task " + task.name + " ID " + task.taskId + " data: " + error);
                return;
            }
            self.logger.debug("Saved task " + task.name + " ID " + task.taskId);
        });
    });
};

/**
 * Delete task nodes from ZooKeeper.
 * @param {string} taskId - The id of the task which should be deleted from ZooKeeper.
 */
TaskHelper.prototype.deleteTask = function (taskId) {
    var self = this;
    self.zkClient.remove(self.zkServicePath + "/tasks/" + taskId, function (error) {
        if (error) {
            self.logger.error("Error deleting task ID " + taskId + " from zookeeper");
        } else {
            self.logger.debug("Deleted task " + taskId + " from zookeeper");
        }
    });
};

module.exports = TaskHelper;