diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4c78009b..5dff0fb0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,9 +13,9 @@ jobs: steps: - uses: actions/checkout@v2 - name: install debian-packaged dependencies - run: sudo apt install -y libgtest-dev libbenchmark-dev libfmt-dev tidy git python3 python3-dateutil python3-pip + run: sudo apt install -y libgtest-dev libbenchmark-dev libfmt-dev tidy git python3 python3-dateutil python3-pip inotify-tools - name: install pypi-packaged dependencies - run: sudo pip3 install pytest black conan + run: sudo pip3 install pytest black conan inotify # NOTE: since we later run "make" using the normal "builder" user, we must use Conan using that same user (so avoid the "sudo"!!) - name: install Conan-packaged dependencies diff --git a/Makefile b/Makefile index 56aeedcf..1091a9da 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ all: centos_install_prereq: # this is just the list present in "BuildRequires" field of the RPM spec file: - yum install gcc-c++ make gtest-devel fmt-devel git + yum install gcc-c++ make gtest-devel fmt-devel git inotify-tools test: $(MAKE) -C collector test diff --git a/README.md b/README.md index 620e6957..9f1b14cc 100644 --- a/README.md +++ b/README.md @@ -428,7 +428,6 @@ cmonitor_collector \ --output-filename=pod-performances.json ``` - ### Connecting with InfluxDB and Grafana The `cmonitor_collector` can be connected to an [InfluxDB](https://www.influxdata.com/) deployment to store collected data (this can happen @@ -469,6 +468,28 @@ cmonitor_collector \ The Prometheus instance can then be used as data source for graphing tools like [Grafana](https://grafana.com/) which allow you to create nice interactive dashboards (see examples in InfluxDB section). +### CMonitor helper tool: +cmonitor_launcher tool can be used to automate the monitoring the Kubernetes PODs. + +It will perform following steps: + + - Watch all files below a directory and notify an event for changes of a Pod restart or creation of a new Pod. + - Check the process name against the white-list given in the filter list. + - Execute command to launch CMonitor if the process name matches with the filter. + +``` +Example: + cmonitor_launcher --path /sys/fs/cgroup/memory/kubepods/burstable/ + --filter process_1 process_2 + --ip-port 172.0.0.1:9090 172.0.0.2:9090 + --command "./cmonitor_collector --num-samples=until-cgroup-alive + --deep-collect --collect=cgroup_threads,cgroup_cpu,cgroup_memory,cgroup_network + --score-threshold=0 --sampling-interval=3 --output-directory=/home + --allow-multiple-instances --remote prometheus" + --log /home + --timeout 20 +``` +In the above example, cmonitor_collector will be launched automatically for process_1 and process_2 with Prometheus instance at 172.0.0.1:9090 and 172.0.0.2:9090 respectively. ### Reference Manual diff --git a/tools/Makefile b/tools/Makefile index 465197b7..838f2015 100644 --- a/tools/Makefile +++ b/tools/Makefile @@ -12,11 +12,13 @@ include $(ROOT_DIR)/Constants.mk TOOLS = \ chart/cmonitor_chart.py \ filter/cmonitor_filter.py \ - statistics/cmonitor_statistics.py + statistics/cmonitor_statistics.py \ + launcher/cmonitor_launcher.py SYMLINKS = \ chart/cmonitor_chart \ filter/cmonitor_filter \ - statistics/cmonitor_statistics + statistics/cmonitor_statistics \ + launcher/cmonitor_launcher # cmonitor_version.py has to be listed explicitly because it does not exist yet # when the $(wilcard) runs (after a clean checkout) @@ -55,6 +57,7 @@ endif test: $(MAKE) -C filter test $(MAKE) -C statistics test + $(MAKE) -C launcher test # FIXME: # $(MAKE) -C chart test diff --git a/tools/common-code/cmonitor_watcher.py b/tools/common-code/cmonitor_watcher.py new file mode 100644 index 00000000..493811ca --- /dev/null +++ b/tools/common-code/cmonitor_watcher.py @@ -0,0 +1,196 @@ +#!/usr/bin/python3 + +# +# cmonitor_watcher.py +# +# Author: Satyabrata Bharati +# Created: April 2022 +# +import inotify.adapters +import queue +import os +import time +import logging +from datetime import datetime + +exit_flag = False +# ======================================================================================================= +# CgroupWatcher : Basic inotify class +# ======================================================================================================= +class CgroupWatcher: + """ + - Watch all files below a directory and notify an event for changes. + - Retrieves all the process and extract the process name "/proc//stat. + - check the process name against the white-list given in the filter list. + - store the events in Queue. + """ + + def __init__(self, path, filter, timeout): + """Initialize CgroupWatcher + Args: + path: path to watch for events. + filter: white-list against which the process-event is filtered. + + """ + self.path = path + self.filter = filter + self.timeout = timeout + self.myFileList = {} + + def __get_cgroup_version(self): + """ + Detect the cgroup version. + """ + proc_self_mount = "/proc/self/mounts" + ncgroup_v1 = 0 + ncgroup_v2 = 0 + with open(proc_self_mount) as file: + for line in file: + row = line.split() + fs_spec = row[0] + fs_file = row[1] + fs_vfstype = row[2] + if (fs_spec == "cgroup" or fs_spec == "cgroup2") and fs_vfstype == "cgroup2": + ncgroup_v2 += 1 + else: + ncgroup_v1 += 1 + + if ncgroup_v1 == 0 and ncgroup_v2 > 0: + cgroup_versopn = "v2" + return cgroup_version + else: + cgroup_version = "v1" + return cgroup_version + + def __get_process_name(self, pid): + """Returns the process name for the process id. + Args: + pid: process id. + + Returns: + The process name. + + """ + cgroup_version = self.__get_cgroup_version() + if cgroup_version == "v1": + proc_filename = "/proc" + "/" + pid + "/stat" + else: + proc_filename = "/proc" + "/" + pid + "/cgroup.procs" + with open(proc_filename) as file: + for line in file: + parts = line.split() + process_name = parts[1].strip("()") + return process_name + + def __get_pid_list(self, filename): + """Get the list of the process ids belong to a tasks file. + Args: + filename: the tasks file. + + Returns: + The list of PIDs within the tasks file. + + """ + list = [] + with open(filename) as file: + for line in file: + list.append(line.strip()) + return list + + def __get_list_of_files(self, dir): + """Returns the list of the files created for the event within the watched dir. + Args: + filename: dir to be watched. + + Returns: + The list of files created within the watched dir. + + """ + listOfFiles = os.listdir(dir) + allFiles = list() + for entry in listOfFiles: + fullpath = os.path.join(dir, entry) + if os.path.isdir(fullpath): + allFiles = allFiles + self.__get_list_of_files(fullpath) + else: + allFiles.append(fullpath) + + return allFiles + + def __process_task_files(self, dir): + """Process all the files for triggered-event within the watched dir. + Finds the process Ids and filter out the process name against the + provided white-list. If the process Id matches the whilte-listing + process from command-line , it store and return the file anlog with the process-name. + The process name later will be used to get the ip and port from the + command-line for the specific process. + Args: + dir: dir to be watched. + + Returns: + The file along with the process name which will be used to launch cmonitor. + + """ + # time.sleep(20) + time.sleep(self.timeout) + logging.info(f"watcher process file sleep: {self.timeout}") + allFiles = self.__get_list_of_files(dir) + for file in allFiles: + if file.endswith("tasks"): + list = self.__get_pid_list(file) + if list: + for pid in list: + process_name = self.__get_process_name(pid) + logging.info(f"processing task file: {file} with pid: {pid}, process name: {process_name}") + match = self.__check_filter(process_name) + if match is True: + logging.info(f"Found match: {process_name}") + self.myFileList = {file: process_name} + return self.myFileList + + def __check_filter(self, process_name): + """Check process name against the whilte-list. + Args: + process_name: process name to be matched against the whilte-list from command-line. + + Returns: + True if process_name matches with the white-list. + + """ + for e in self.filter: + if process_name in e: + return True + + def inotify_events(self, queue): + """Main thread function for notifying events. + Monitored events that match with the white-list provided will be stored in this queue. + The events from this queue will be processed by cMonitorLauncher threading function to + launch cMonitor with appropriate command input + Args: + queue: monitored events will be stored in this queue. + + Returns: + + """ + logging.info(f"CgroupWatcher calling inotify_event") + i = inotify.adapters.Inotify() + i.add_watch(self.path) + try: + for event in i.event_gen(): + if event is not None: + if "IN_CREATE" in event[1]: + (header, type_names, path, filename) = event + logging.info(f"CgroupWatcher event triggered:{path},{filename}") + dir = path + filename + logging.info(f"CgroupWatcher event created:{filename}") + fileList = self.__process_task_files(dir) + if fileList: + logging.info(f"CgroupWatcher event in Queue:{fileList}") + queue.put(fileList) + # global exit_flag + if exit_flag is True: + logging.info(f"CgroupWatcher exit_flag {exit_flag}") + exit(1) + + finally: + i.remove_watch(path) diff --git a/tools/launcher/Makefile b/tools/launcher/Makefile new file mode 100644 index 00000000..ea7a8768 --- /dev/null +++ b/tools/launcher/Makefile @@ -0,0 +1,11 @@ +ROOT_DIR:=$(shell readlink -f ../..) +PYTHON_COMMON_CODE=$(ROOT_DIR)/tools/common-code + +run: + export PYTHONPATH=$(PYTHON_COMMON_CODE) ; \ + ./cmonitor_launcher.py $(ARGS) + +test: + cd tests && \ + export PYTHONPATH=$(PYTHON_COMMON_CODE) && \ + pytest --capture=no -vv diff --git a/tools/launcher/cmonitor_launcher.py b/tools/launcher/cmonitor_launcher.py new file mode 100644 index 00000000..364c2486 --- /dev/null +++ b/tools/launcher/cmonitor_launcher.py @@ -0,0 +1,237 @@ +#!/usr/bin/python3 + +# +# cmonitor_launcher.py +# +# Author: Satyabrata Bharati +# Created: April 2022 +# + +import concurrent.futures +from concurrent.futures import ProcessPoolExecutor +import subprocess +from subprocess import Popen +import argparse +import queue +import os +import sys +import time +import logging +from datetime import datetime + +import signal + +import cmonitor_watcher +from cmonitor_watcher import CgroupWatcher +from argparse import RawTextHelpFormatter + +queue = queue.Queue() +logger = logging.getLogger(__name__) + +# default sleep timeout +default_sleep_timeout = 20 +# ======================================================================================================= +# CmonitorLauncher +# ======================================================================================================= +class CmonitorLauncher: + """ + - Retrieves all the events from the Queue. + - Lauch cMonitor with appropriate command. + """ + + def __init__(self, path, filter, ip, command, timeout): + self.path = path + self.filter = filter + self.ip = ip + self.command = command + self.timeout = timeout + self.process_host_dict = {} + self.monitored_processes = {} + + """ + Should add the list of IPs as key to the dictionary + """ + tmp_ip = self.ip + for key in self.filter: + for value in tmp_ip: + self.process_host_dict[key] = value + tmp_ip.remove(value) + # Printing resultant dictionary + print("Input [filter-host]: " + str(self.process_host_dict)) + logging.info(f"Input [filter-host] {str(self.process_host_dict)}") + + def process_events(self, event): + """Main thread function for processing input events from the queue. + Args: + event: events to read from this queue. + The events from this queue will be processed by this threading function to + launch cMonitor with appropriate command input. + + """ + try: + entry = 1 + while True: + if not event.empty(): + fileList = event.get() + for key, value in fileList.items(): + filename = key + process_name = value + logging.info(f"In processing event from the Queue - event: {entry},file: {filename},process_name: {process_name}") + logging.info(f"Launching cMonitor with: {filename} with IP :{self.process_host_dict.get(process_name)}") + # self.__launch_cmonitor(filename, self.process_host_dict.get(process_name)) + self.__launch_cmonitor(filename, process_name) + entry = entry + 1 + else: + time.sleep(self.timeout) + logging.info(f"In processing event Queue is empty - sleeping: {self.timeout} sec") + if cmonitor_watcher.exit_flag is True: + logging.info(f"In processing event_flag set to {cmonitor_watcher.exit_flag}") + exit(1) + + except event.Empty(): + pass + + def __launch_cmonitor(self, filename, process_name): + """ + - Lauch cMonitor with appropriate command. + """ + + for c in self.command: + cmd = c.strip() + ip = self.process_host_dict.get(process_name) + ip_port = ip.split(":") + ip = ip_port[0] + port = ip_port[1] + base_path = os.path.dirname(filename) + path = "/".join(base_path.split("/")[5:]) + monitor_cmd = f"{cmd} --cgroup-name={path} --remote-ip {ip} --remote-port {port}" + monitor_cmd = [ + f"{cmd}", + f"--cgroup-name={path}", + f"--remote-ip {ip}", + f"--remote-port {port}", + ] + print("Launch cMonitor with command:", monitor_cmd) + logging.info(f"Launch cMonitor with command: {monitor_cmd }") + # monitor_process = subprocess.Popen(monitor_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + monitor_process = subprocess.Popen(monitor_cmd, shell=True) + self.monitored_processes[process_name] = monitor_process + + def handler(self, signum, frame): + for process_name, monitor_process in self.monitored_processes.items(): + logging.info(f"Stopping cmonitor_collector from pid '{monitor_process.pid}' of container '{process_name}'") + # monitor_process is a subprocess.Popen object: + cmonitor_watcher.exit_flag = True + time.sleep(10) + monitor_process.terminate() + monitor_process.wait() + exit(1) + + +def parse_command_line(): + """Parses the command line and returns the configuration as dictionary object.""" + parser = argparse.ArgumentParser( + description="Utility to Lauch cMonitor with appropriate command.", + epilog=""" +Example: + cmonitor_launcher.py --path /sys/fs/cgroup/memory/kubepods/burstable/ + --filter process_1 process_2 + --ip-port 172.0.0.1:9090 172.0.0.2:9099 + --command "./cmonitor_collector --num-samples=until-cgroup-alive + --deep-collect --collect=cgroup_threads,cgroup_cpu,cgroup_memory,cgroup_network + --score-threshold=0 --sampling-interval=3 --output-directory=/home + --allow-multiple-instances --remote prometheus" + --log /home + --timeout 20 +""", + formatter_class=RawTextHelpFormatter, + ) + parser.add_argument("-p", "--path", help="path to watch", default=None) + parser.add_argument( + "-f", + "--filter", + nargs="*", + help="cmonitor triggers for matching pattern", + default=None, + ) + parser.add_argument( + "-c", + "--command", + nargs="*", + help="cmonitor input command parameters", + default=None, + ) + parser.add_argument("-i", "--ip-port", nargs="*", help="cmonitor input ", default=None) + parser.add_argument("-l", "--log", help="path to logfile") + parser.add_argument("-t", "--timeout", type=int, help="sleep time interval") + args = parser.parse_args() + + if args.path is None: + print("Please provide the input path to watch for iNotify events to be monitored") + parser.print_help() + sys.exit(os.EX_USAGE) + + if args.filter is None: + print("Please provide the input filter for white-listing events") + parser.print_help() + sys.exit(os.EX_USAGE) + + if args.command is None: + print("Please provide the input comamnd to launch cMonitor with") + parser.print_help() + sys.exit(os.EX_USAGE) + + return { + "input_path": args.path, + "input_filter": args.filter, + "input_command": args.command, + "input_ip": args.ip_port, + "input_log": args.log, + "input_timeout": args.timeout, + } + + +def main(): + config = parse_command_line() + # default sleep timeout + timeout = default_sleep_timeout + + # command line inputs + input_path = config["input_path"] + print("Input [path]:", input_path) + filter = config["input_filter"] + command = config["input_command"] + ip = config["input_ip"] + log_dir = config["input_log"] + timeout = config["input_timeout"] + + if log_dir: + log_file_name = os.path.join(log_dir, datetime.now().strftime("cmonitor_launcher_%Y%m%d_%H%M%S.log")) + else: + log_file_name = datetime.now().strftime("cmonitor_launcher_%Y%m%d_%H%M%S.log") + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%a, %d %b %Y %H:%M:%S", + filename=log_file_name, + filemode="w", + ) + logging.info("Started cMonitor Launcher") + logging.info(f"timeout set for sleep: {timeout}") + + # flag has to be set in case inotify_events() needed to be unblocked + # default False : keep blocking + # exit_flag = False + + cGroupWatcher = CgroupWatcher(input_path, filter, timeout) + cMonitorLauncher = CmonitorLauncher(input_path, filter, ip, command, timeout) + + signal.signal(signal.SIGINT, cMonitorLauncher.handler) + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + executer1 = executor.submit(cGroupWatcher.inotify_events, queue) + executer2 = executor.submit(cMonitorLauncher.process_events, queue) + + +if __name__ == "__main__": + main() diff --git a/tools/launcher/tests/test_cgroup_watcher.py b/tools/launcher/tests/test_cgroup_watcher.py new file mode 100644 index 00000000..6c5bd1b3 --- /dev/null +++ b/tools/launcher/tests/test_cgroup_watcher.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" + Author: Satyabrata Bharati + Verify the behavior of CmonitorWatcher InotifyEvent +""" + +import pytest +import queue +import os +import time +import sys +import subprocess +from subprocess import Popen +import concurrent.futures +from concurrent.futures import ProcessPoolExecutor + +# import cmonitor_watcher +import cmonitor_watcher +from cmonitor_watcher import CgroupWatcher + +# import dateutil.parser as datetime_parser + +queue = queue.Queue() +myDict = {} + +test_list = [ + # run0 + { + "expected_task_file": "/tmp/unit_test_cmonitor/cgroup_memory_kubepods/task/tasks", + "expected_process_name": "python3", + }, +] + + +def create_task_file(path, pid): + cmd1 = f"rm -rf {path}/task" + os.system(cmd1) + + cmd2 = f"mkdir {path}/task" + os.system(cmd2) + + # create the task file with the process id of the dummy process + # /tmp/unit_test_cmonitor/task/tasks + cmd3 = f"cd {path}/task;rm -rf tasks;echo {pid} >> tasks" + os.system(cmd3) + + filename = os.path.join(path, "task/tasks") + return filename + + +def process_task_file(path, queue): + # create the dummy process + cmd = "python3 -c 'time.sleep(5)'" + p = Popen(cmd.split()) + # process id of the dummy process + pid = p.pid + + filename = create_task_file(path, pid) + print(f"Created: task file {filename} with process Id: {pid}") + + d = queue.get() + for k, v in d.items(): + # print(k, v) + process_name = v + print(f"Read from Queue filename :{k}, process_name {process_name}") + + # store the task file and the process name in the dictionary + global myDict + myDict = d.copy() + if queue.empty(): + print("Queue is Empty") + cmonitor_watcher.exit_flag = True + # terminate the dummy process + p.terminate() + return + + +@pytest.mark.parametrize("testrun_idx", range(len(test_list))) +def test_outputCmonitorWatcherInotifyEvent(testrun_idx): + global test_list + testrun = test_list[testrun_idx] + + path = "/tmp/unit_test_cmonitor/cgroup_memory_kubepods/" + filter = ["python3"] + if not os.path.exists(path): + os.makedirs(path) + print("Directory '% s' created" % path) + + watcher = CgroupWatcher(path, filter, 10) + # flag = True + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future1 = executor.submit(watcher.inotify_events, queue) + future2 = executor.submit(process_task_file, path, queue) + + # both threads completely executed + print("Done!") + + for k, v in myDict.items(): + # print(k, v) + assert k == testrun["expected_task_file"] + assert v == testrun["expected_process_name"] diff --git a/tools/spec/tools.spec b/tools/spec/tools.spec index f9393fca..c575a666 100644 --- a/tools/spec/tools.spec +++ b/tools/spec/tools.spec @@ -12,7 +12,7 @@ Source0: cmonitor-tools-__RPM_VERSION__.tar.gz # these are the requirements that we need on COPR builds: # IMPORTANT: python3-devel provide macros like %{python3_sitelib} -BuildRequires: gcc-c++, make, python3-devel +BuildRequires: gcc-c++, make, python3-devel, inotify-tools # cmonitor_filter uses dateutil library to parse dates.. of course to make our life easier the same python library # RPM has different names on different distro versions...