Motr  M0
motr_mini_prov.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2021 Seagate Technology LLC and/or its Affiliates
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 # For any questions about this software or licensing,
18 # please email opensource@seagate.com or cortx-questions@seagate.com.
19 #
20 import sys
21 import errno
22 import os
23 import re
24 import subprocess
25 import logging
26 import glob
27 import time
28 import yaml
29 import psutil
30 from typing import List, Dict, Any
31 from cortx.utils.conf_store import Conf
32 from cortx.utils.cortx import Const
33 
34 MOTR_SERVER_SCRIPT_PATH = "/usr/libexec/cortx-motr/motr-start"
35 MOTR_MKFS_SCRIPT_PATH = "/usr/libexec/cortx-motr/motr-mkfs"
36 MOTR_FSM_SCRIPT_PATH = "/usr/libexec/cortx-motr/motr-free-space-monitor"
37 MOTR_CONFIG_SCRIPT = "/opt/seagate/cortx/motr/libexec/motr_cfg.sh"
38 MOTR_MINI_PROV_LOGROTATE_SCRIPT = "/opt/seagate/cortx/motr/libexec/motr_mini_prov_logrotate.sh"
39 CROND_DIR = "/etc/cron.hourly"
40 LNET_CONF_FILE = "/etc/modprobe.d/lnet.conf"
41 LIBFAB_CONF_FILE = "/etc/libfab.conf"
42 SYS_CLASS_NET_DIR = "/sys/class/net/"
43 MOTR_SYS_CFG = "/etc/sysconfig/motr"
44 MOTR_WORKLOAD_DIR = "/opt/seagate/cortx/motr/workload"
45 FSTAB = "/etc/fstab"
46 LOGFILE = "/var/log/seagate/motr/mini_provisioner"
47 LOGDIR = "/var/log/seagate/motr"
48 LOGGER = "mini_provisioner"
49 IVT_DIR = "/var/log/seagate/motr/ivt"
50 MOTR_LOG_DIR = "/var/motr"
51 TIMEOUT_SECS = 120
52 MACHINE_ID_LEN = 32
53 MOTR_LOG_DIRS = [LOGDIR, MOTR_LOG_DIR]
54 BE_LOG_SZ = 4*1024*1024*1024 #4G
55 BE_SEG0_SZ = 128 * 1024 *1024 #128M
56 ALLIGN_SIZE = 4096
57 MACHINE_ID_FILE = "/etc/machine-id"
58 TEMP_FID_FILE = "/opt/seagate/cortx/motr/conf/service_fid.yaml"
59 CMD_RETRY_COUNT = 5
60 MEM_THRESHOLD = 4*1024*1024*1024
61 CVG_COUNT_KEY = "num_cvg"
62 
64  """ Generic Exception with error code and output """
65 
66  def __init__(self, rc, message, *args):
67  self._rc = rc
68  self._desc = message % (args)
69 
70  def __str__(self):
71  return f"error[{self._rc}]: {self._desc}"
72 
73 def execute_command_without_log(cmd, timeout_secs = TIMEOUT_SECS,
74  verbose = False, retries = 1, stdin = None, logging=False):
75  ps = subprocess.Popen(cmd, stdin=subprocess.PIPE,
76  stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
77  shell=True)
78  if stdin:
79  ps.stdin.write(stdin.encode())
80  stdout, stderr = ps.communicate(timeout=timeout_secs);
81  stdout = str(stdout, 'utf-8')
82 
83  time.sleep(1)
84  if ps.returncode != 0:
85  raise MotrError(ps.returncode, f"\"{cmd}\" command execution failed")
86 
87 #TODO: logger config(config_log) takes only self as argument so not configurable,
88 # need to make logger configurable to change formater, etc and remove below
89 # duplicate code,
90 def execute_command_console(self, command):
91  logger = logging.getLogger("console")
92  if not os.path.exists(LOGDIR):
93  try:
94  os.makedirs(LOGDIR, exist_ok=True)
95  with open(f'{self.logfile}', 'w'): pass
96  except:
97  raise MotrError(errno.EINVAL, f"{self.logfile} creation failed\n")
98  else:
99  if not os.path.exists(self.logfile):
100  try:
101  with open(f'{self.logfile}', 'w'): pass
102  except:
103  raise MotrError(errno.EINVAL, f"{self.logfile} creation failed\n")
104  logger.setLevel(logging.DEBUG)
105  # create file handler which logs debug message in log file
106  fh = logging.FileHandler(self.logfile)
107  fh.setLevel(logging.DEBUG)
108  # create console handler to log messages ERROR and above
109  ch = logging.StreamHandler()
110  ch.setLevel(logging.INFO)
111  formatter = logging.Formatter('%(asctime)s - %(message)s')
112  fh.setFormatter(formatter)
113  ch.setFormatter(formatter)
114  logger.addHandler(fh)
115  logger.addHandler(ch)
116  logger.info(f"executing command {command}")
117  try:
118  process = subprocess.Popen(command, stdin=subprocess.PIPE,
119  stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
120  shell=True)
121  except Exception as e:
122  logger.error("ERROR {} when running {} with exception {}".format(sys.exc_info()[1],
123  command, e.message))
124  return None
125  while True:
126  stdout = process.stdout.readline()
127  if process.poll() is not None:
128  break
129  if stdout:
130  logger.info(stdout.strip().decode())
131  rc = process.poll()
132  return rc
133 
134 
135 def execute_command(self, cmd, timeout_secs = TIMEOUT_SECS, verbose = False,
136  retries = 1, stdin = None, logging=True):
137  # logging flag is set False when we execute any command
138  # before logging is configured.
139  # If logging is False, we use print instead of logger
140  # verbose(True) and logging(False) can not be set simultaneously.
141 
142  for i in range(retries):
143  if logging == True:
144  self.logger.info(f"Retry: {i}. Executing cmd: '{cmd}'")
145 
146  ps = subprocess.Popen(cmd, stdin=subprocess.PIPE,
147  stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
148  shell=True)
149  if stdin:
150  ps.stdin.write(stdin.encode())
151  stdout, stderr = ps.communicate(timeout=timeout_secs);
152  stdout = str(stdout, 'utf-8')
153 
154  if logging == True:
155  self.logger.info(f"ret={ps.returncode}\n")
156 
157  if (self._debug or verbose) and (logging == True):
158  self.logger.debug(f"[CMD] {cmd}\n")
159  self.logger.debug(f"[OUT]\n{stdout}\n")
160  self.logger.debug(f"[RET] {ps.returncode}\n")
161  if ps.returncode == 0:
162  break
163  time.sleep(1)
164  if ps.returncode != 0:
165  raise MotrError(ps.returncode, f"\"{cmd}\" command execution failed")
166  return stdout, ps.returncode
167 
168 # For normal command, we execute command for CMD_RETRY_COUNT(5 default) times and for each retry timeout is of TIMEOUT_SECS(120s default).
169 # For daemon(e.g. m0d services), retry_count is 1 and tmeout is 0 so that we just execute this daemon command only once without timeout.
170 def execute_command_verbose(self, cmd, timeout_secs = TIMEOUT_SECS, verbose = False, set_timeout=True, retry_count = CMD_RETRY_COUNT):
171  self.logger.info(f"Executing cmd : '{cmd}' \n")
172  # For commands without timeout
173  if set_timeout == False:
174  timeout_secs = None
175  retry_count = 1
176  cmd_retry_delay = 1
177  for cmd_retry_count in range(retry_count):
178  ps = subprocess.run(cmd, stdin=subprocess.PIPE,
179  stdout=subprocess.PIPE, timeout=timeout_secs,
180  stderr=subprocess.PIPE, shell=True)
181  self.logger.info(f"ret={ps.returncode}")
182  self.logger.debug(f"Executing {cmd_retry_count} time")
183  stdout = ps.stdout.decode('utf-8')
184  self.logger.debug(f"[OUT]{stdout}")
185  self.logger.debug(f"[ERR]{ps.stderr.decode('utf-8')}")
186  self.logger.debug(f"[RET] {ps.returncode}")
187  if ps.returncode != 0:
188  time.sleep(cmd_retry_delay)
189  continue
190  return stdout, ps.returncode
191  return
192 
193 def execute_command_without_exception(self, cmd, timeout_secs = TIMEOUT_SECS, retries = 1):
194  for i in range(retries):
195  self.logger.info(f"Retry: {i}. Executing cmd : '{cmd}'\n")
196  ps = subprocess.run(list(cmd.split(' ')), timeout=timeout_secs)
197  self.logger.info(f"ret={ps.returncode}\n")
198  if ps.returncode == 0:
199  break
200  time.sleep(1)
201  return ps.returncode
202 
203 def check_type(var, vtype, msg):
204  if not isinstance(var, vtype):
205  raise MotrError(errno.EINVAL, f"Invalid {msg} type. Expected: {vtype}")
206  if not bool(var):
207  raise MotrError(errno.EINVAL, f"Empty {msg}.")
208 
209 def configure_machine_id(self, phase):
210  if Conf.machine_id:
211  self.machine_id = Conf.machine_id
212  if not os.path.exists(f"{MACHINE_ID_FILE}"):
213  if phase == "start":
214  with open(f"{MACHINE_ID_FILE}", "w") as fp:
215  fp.write(f"{self.machine_id}\n")
216  else:
217  op = execute_command(self, f"cat {MACHINE_ID_FILE}", logging=False)[0].strip("\n")
218  if op != self.machine_id:
219  raise MotrError(errno.EINVAL, "machine id does not match")
220  else:
221  raise MotrError(errno.ENOENT, "machine id not available in conf")
222 
223 def get_server_node(self):
224  """Get current node name using machine-id."""
225  try:
226  machine_id = self.machine_id;
227  server_node = Conf.get(self._index, 'node')[machine_id]
228  except:
229  raise MotrError(errno.EINVAL, f"MACHINE_ID {machine_id} does not exist in ConfStore")
230 
231  check_type(server_node, dict, "server_node")
232  return server_node
233 
234 def calc_size(self, sz):
235  ret = -1
236  suffixes = ['K', 'Ki', 'Kib', 'M', 'Mi', 'Mib', 'G', 'Gi', 'Gib']
237  sz_map = {
238  "K": 1024, "M": 1024*1024, "G": 1024*1024*1024,
239  "Ki": 1024, "Mi": 1024*1024, "Gi": 1024*1024*1024,
240  "Kib": 1024, "Mib": 1024*1024, "Gib": 1024*1024*1024 }
241 
242  # Check if sz ends with proper suffixes. It matches only one suffix.
243  temp = list(filter(sz.endswith, suffixes))
244  if len(temp) > 0:
245  suffix = temp[0]
246  num_sz = re.sub(r'[^0-9]', '', sz) # Ex: If sz is 128MiB then num_sz=128
247  map_val = sz_map[suffix] # Ex: If sz is 128MiB then map_val = 1024*1024*1024
248  ret = int(num_sz) * int(map_val)
249  return ret
250  else:
251  self.logger.error(f"Invalid format of mem limit: {sz}\n")
252  self.logger.error("Please use valid format Ex: 1024, 1Ki, 1Mi, 1Gi etc..\n")
253  return ret
254 
255 def set_setup_size(self, service):
256  ret = False
257  sevices_limits = Conf.get(self._index, 'cortx>motr>limits')['services']
258 
259  # Default self.setup_size is "small"
260  self.setup_size = "small"
261 
262  # For services other then ioservice and confd, return True
263  # It will set default setup size i.e. small
264  if service not in ["ioservice", "ios", "io", "all", "confd"]:
265  self.setup_size = "small"
266  self.logger.info(f"service is {service}. So seting setup size to {self.setup_size}\n")
267  return True
268 
269  #Provisioner passes io as parameter to motr_setup.
270  #Ex: /opt/seagate/cortx/motr/bin/motr_setup config --config yaml:///etc/cortx/cluster.conf --services io
271  #But in /etc/cortx/cluster.conf io is represented by ios. So first get the service names right
272  if service in ["io", "ioservice"]:
273  svc = "ios"
274  else:
275  svc = service
276  for arr_elem in sevices_limits:
277  # For ios, confd we check for setup size according to mem size
278  if arr_elem['name'] == svc:
279  min_mem = arr_elem['memory']['min']
280 
281  if min_mem.isnumeric():
282  sz = int(min_mem)
283  else:
284  sz = calc_size(self, min_mem)
285 
286  self.logger.info(f"mem limit in config is {min_mem} i.e. {sz}\n")
287 
288  # Invalid min mem format
289  if sz < 0:
290  ret = False
291  break
292  # If mem limit in ios > 4G then it is large setup size
293  elif sz > MEM_THRESHOLD:
294  self.setup_size = "large"
295  self.logger.info(f"setup_size set to {self.setup_size}\n")
296  ret = True
297  break
298  else:
299  self.setup_size = "small"
300  self.logger.info(f"setup_size set to {self.setup_size}\n")
301  ret = True
302  break
303  if ret == False:
304  raise MotrError(errno.EINVAL, f"Setup size is not set properly for service {service}."
305  f"Please update valid mem limits for {service}")
306  else:
307  self.logger.info(f"service={service} and setup_size={self.setup_size}\n")
308  return ret
309 
310 def get_value(self, key, key_type):
311  """Get data."""
312  try:
313  val = Conf.get(self._index, key)
314  except:
315  raise MotrError(errno.EINVAL, "{key} does not exist in ConfStore")
316 
317  check_type(val, key_type, key)
318  return val
319 
321  """Get logical_node_class."""
322  try:
323  logical_node_class = self.cluster['logical_node_class']
324  except:
325  raise MotrError(errno.EINVAL, f"{logical_node_class} does not exist in ConfStore")
326  check_type(logical_node_class, list, "logical_node_class")
327  return logical_node_class
328 
329 def restart_services(self, services):
330  for service in services:
331  self.logger.info(f"Restarting {service} service\n")
332  cmd = f"systemctl stop {service}"
333  execute_command(self, cmd)
334  cmd = f"systemctl start {service}"
335  execute_command(self, cmd)
336  cmd = f"systemctl status {service}"
337  execute_command(self, cmd)
338 
339 def validate_file(file):
340  if not os.path.exists(file):
341  raise MotrError(errno.ENOENT, f"{file} does not exist")
342 
343 # Check if file paths are valid
344 def validate_files(files):
345  for file in files:
346  if not os.path.exists(file):
347  raise MotrError(errno.ENOENT, f"{file} does not exist")
348 
349 # Create directories
350 def create_dirs(self, dirs):
351  for entry in dirs:
352  if not os.path.exists(entry):
353  cmd = f"mkdir -p {entry}"
354  execute_command(self, cmd, logging=False)
355 
356 def is_hw_node(self):
357  try:
358  node_type = self.server_node['type']
359  except:
360  raise MotrError(errno.EINVAL, "node_type not found")
361 
362  check_type(node_type, str, "node type")
363  if node_type == "HW":
364  return True
365  else:
366  return False
367 
369  '''
370  1. check m0tr.ko exists in current kernel modules
371  2. check /etc/sysconfig/motr
372  '''
373  cmd = "uname -r"
374  cmd_res = execute_command(self, cmd)
375  op = cmd_res[0]
376  kernel_ver = op.replace('\n', '')
377  check_type(kernel_ver, str, "kernel version")
378 
379  kernel_module = f"/lib/modules/{kernel_ver}/kernel/fs/motr/m0tr.ko"
380  self.logger.info(f"Checking for {kernel_module}\n")
381  validate_file(kernel_module)
382 
383  self.logger.info(f"Checking for {MOTR_SYS_CFG}\n")
384  validate_file(MOTR_SYS_CFG)
385 
386 def update_config_file(self, fname, kv_list):
387  lines = []
388  # Get all lines of file in buffer
389  with open(f"{MOTR_SYS_CFG}", "r") as fp:
390  for line in fp:
391  lines.append(line)
392  num_lines = len(lines)
393  self.logger.info(f"Before update, in file {fname}, num_lines={num_lines}\n")
394 
395  #Check for keys in file
396  for (k, v) in kv_list:
397  found = False
398  for lno in range(num_lines):
399  # If found, update inline.
400  if lines[lno].startswith(f"{k}="):
401  lines[lno] = f"{k}={v}\n"
402  found = True
403  break
404  # If not found, append
405  if not found:
406  lines.append(f"{k}={v}\n")
407  found = False
408 
409  num_lines = len(lines)
410  self.logger.info(f"After update, in file {fname}, num_lines={num_lines}\n")
411 
412  # Write buffer to file
413  with open(f"{MOTR_SYS_CFG}", "w+") as fp:
414  for line in lines:
415  fp.write(f"{line}")
416 
418  local_path = self.local_path
419  log_path = self.log_path
420  machine_id = self.machine_id
421  validate_files([MOTR_SYS_CFG, local_path, log_path])
422  MOTR_M0D_DATA_DIR = f"{local_path}/motr"
423  if not os.path.exists(MOTR_M0D_DATA_DIR):
424  create_dirs(self, [f"{MOTR_M0D_DATA_DIR}"])
425  MOTR_LOCAL_SYSCONFIG_DIR = f"{MOTR_M0D_DATA_DIR}/sysconfig"
426  if not os.path.exists(MOTR_LOCAL_SYSCONFIG_DIR):
427  create_dirs(self, [f"{MOTR_LOCAL_SYSCONFIG_DIR}"])
428 
429  MOTR_M0D_CONF_DIR = f"{MOTR_LOCAL_SYSCONFIG_DIR}/{machine_id}"
430  MOTR_M0D_CONF_XC = f"{MOTR_M0D_CONF_DIR}/confd.xc"
431  MOTR_M0D_ADDB_STOB_DIR = f"{log_path}/motr/{machine_id}/addb"
432  MOTR_M0D_TRACE_DIR = f"{log_path}/motr/{machine_id}/trace"
433  # Skip MOTR_CONF_XC
434  dirs = [MOTR_M0D_DATA_DIR, MOTR_M0D_ADDB_STOB_DIR, MOTR_M0D_TRACE_DIR, MOTR_M0D_CONF_DIR]
435  create_dirs(self, dirs)
436 
437  # Update new config keys to config file /etc/sysconfig/motr
438  config_kvs = [("MOTR_M0D_CONF_DIR", f"{MOTR_M0D_CONF_DIR}"),
439  ("MOTR_M0D_DATA_DIR", f"{MOTR_M0D_DATA_DIR}"),
440  ("MOTR_M0D_CONF_XC", f"{MOTR_M0D_CONF_XC}"),
441  ("MOTR_M0D_ADDB_STOB_DIR", f"{MOTR_M0D_ADDB_STOB_DIR}"),
442  ("MOTR_M0D_TRACE_DIR", f"{MOTR_M0D_TRACE_DIR}")]
443 
444  update_config_file(self, f"{MOTR_SYS_CFG}", config_kvs)
445 
446  # Copy config file to new path
447  cmd = f"cp {MOTR_SYS_CFG} {MOTR_M0D_CONF_DIR}"
448  execute_command(self, cmd)
449 
450 # Get lists of metadata disks from Confstore of all cvgs
451 # Input: node_info
452 # Output: [['/dev/sdc'], ['/dev/sdf']]
453 # where ['/dev/sdc'] is list of metadata disks of cvg[0]
454 # ['/dev/sdf'] is list of metadata disks of cvg[1]
455 def get_md_disks_lists(self, node_info):
456  md_disks_lists = []
457  cvg_count = node_info[CVG_COUNT_KEY]
458  cvg = node_info['cvg']
459  for i in range(cvg_count):
460  temp_cvg = cvg[i]
461  if temp_cvg['devices']['metadata']:
462  md_disks_lists.append(temp_cvg['devices']['metadata'])
463  self.logger.info(f"md_disks lists on node = {md_disks_lists}\n")
464  return md_disks_lists
465 
466 # Get metada disks from list of lists of metadata disks of
467 # different cvgs of node
468 # Input: [['/dev/sdc'], ['/dev/sdf']]
469 # where ['/dev/sdc'] is ist of metadata disks of cvg[0]
470 # ['/dev/sdf'] is list of metadata disks of cvg[1]
471 # Output: ['/dev/sdc', '/dev/sdf']
472 def get_mdisks_from_list(self, md_lists):
473  md_disks = []
474  md_len_outer = len(md_lists)
475  for i in range(md_len_outer):
476  md_len_innner = len(md_lists[i])
477  for j in range(md_len_innner):
478  md_disks.append(md_lists[i][j])
479  self.logger.info(f"md_disks on node = {md_disks}\n")
480  return md_disks
481 
482 # Update metadata disk entries to motr-hare confstore
483 def update_to_file(self, index, url, machine_id, md_disks):
484  ncvgs = len(md_disks)
485  for i in range(ncvgs):
486  md = md_disks[i]
487  len_md = len(md)
488  for j in range(len_md):
489  md_disk = md[j]
490  self.logger.info(f"setting key server>{machine_id}>cvg[{i}]>m0d[{j}]>md_seg1"
491  f" with value {md_disk} in {url}")
492  Conf.set(index, f"server>{machine_id}>cvg[{i}]>m0d[{j}]>md_seg1",f"{md_disk}")
493  Conf.save(index)
494 
495 # populate self.storage_nodes with machine_id for all storage_nodes
496 def get_data_nodes(self):
497  machines: Dict[str,Any] = self.nodes
498  storage_nodes: List[str] = []
499  services = Conf.search(self._index, 'node', 'services', Const.SERVICE_MOTR_IO.value)
500  for machine_id in machines.keys():
501  result = [svc for svc in services if machine_id in svc]
502  # skipped control , HA and server pod
503  if result:
504  storage_nodes.append(machine_id)
505  return storage_nodes
506 
507 def update_motr_hare_keys(self, nodes):
508  # key = machine_id value = node_info
509  for machine_id in self.storage_nodes:
510  node_info = nodes.get(machine_id)
511  md_disks_lists = get_md_disks_lists(self, node_info)
512  update_to_file(self, self._index_motr_hare, self._url_motr_hare, machine_id, md_disks_lists)
513 
514 # Write below content to /etc/cortx/motr/mini_prov_logrotate.conf file so that mini_mini_provisioner
515 # log file will be rotated hourly and retained recent max 4 files. Max size of log file is 10M.
516 
517 # Content:
518 # /etc/cortx/log/motr/<machine-id>/mini_provisioner {
519 # hourly
520 # size 10M
521 # rotate 4
522 # delaycompress
523 # copytruncate
524 # }
526  MOTR_M0D_DATA_DIR = f"{self.local_path}/motr"
527  validate_files([MOTR_M0D_DATA_DIR])
528  mini_prov_conf_file = f"{MOTR_M0D_DATA_DIR}/mini_prov_logrotate.conf"
529 
530  indent=' '*4
531  lines=["{a} {b}\n".format(a=self.logfile, b='{'),
532  f"{indent}hourly\n",
533  f"{indent}size 10M\n",
534  f"{indent}rotate 4\n",
535  f"{indent}delaycompress\n",
536  f"{indent}copytruncate\n",
537  "{a}\n".format(a='}')]
538  with open(f"{mini_prov_conf_file}", 'w+') as fp:
539  for line in lines:
540  fp.write(line)
541 
542 def motr_config_k8(self):
543  if not verify_libfabric(self):
544  raise MotrError(errno.EINVAL, "libfabric is not up.")
545 
546  # To rotate mini_provisioner log file
548 
549  if self.machine_id not in self.storage_nodes:
550  # Modify motr config file
552  return
553 
554  # If setup_size is large i.e.HW, read the (key,val)
555  # from /opt/seagate/cortx/motr/conf/motr.conf and
556  # update to /etc/sysconfig/motr
557  if self.setup_size == "large":
558  cmd = "{} {}".format(MOTR_CONFIG_SCRIPT, " -c")
559  execute_command(self, cmd, verbose = True)
560 
561  update_motr_hare_keys(self, self.nodes)
562  execute_command(self, MOTR_CONFIG_SCRIPT, verbose = True)
563 
564  # Update be_seg size only for storage node
565  update_bseg_size(self)
566 
567  # Modify motr config file
569  return
570 
571 def motr_config(self):
572  # Just to check if lnet is working properly
573  try:
574  transport_type = self.server_node['network']['data']['transport_type']
575  except:
576  raise MotrError(errno.EINVAL, "transport_type not found")
577 
578  check_type(transport_type, str, "transport_type")
579 
580  if transport_type == "lnet":
581  if not verify_lnet(self):
582  raise MotrError(errno.EINVAL, "lent is not up.")
583  elif transport_type == "libfabric":
584  if not verify_libfabric(self):
585  raise MotrError(errno.EINVAL, "libfabric is not up.")
586 
587  is_hw = is_hw_node(self)
588  if is_hw:
589  self.logger.info(f"Executing {MOTR_CONFIG_SCRIPT}")
590  execute_command(self, MOTR_CONFIG_SCRIPT, verbose = True)
591 
592 def configure_net(self):
593  """Wrapper function to detect lnet/libfabric transport."""
594  try:
595  transport_type = Conf.get(self._index, 'cortx>motr>transport_type')
596  except:
597  raise MotrError(errno.EINVAL, "transport_type not found")
598 
599  check_type(transport_type, str, "transport_type")
600 
601  if transport_type == "lnet":
602  configure_lnet(self)
603  elif transport_type == "libfab":
604  configure_libfabric(self)
605  else:
606  raise MotrError(errno.EINVAL, "Unknown data transport type\n")
607 
608 def configure_lnet(self):
609  '''
610  Get iface and /etc/modprobe.d/lnet.conf params from
611  conf store. Configure lnet. Start lnet service
612  '''
613  try:
614  iface = self.server_node['network']['data']['private_interfaces'][0]
615  except:
616  raise MotrError(errno.EINVAL, "private_interfaces[0] not found\n")
617 
618  self.logger.info(f"Validate private_interfaces[0]: {iface}\n")
619  cmd = f"ip addr show {iface}"
620  execute_command(self, cmd)
621 
622  try:
623  iface_type = self.server_node['network']['data']['interface_type']
624  except:
625  raise MotrError(errno.EINVAL, "interface_type not found\n")
626 
627  lnet_config = (f"options lnet networks={iface_type}({iface}) "
628  f"config_on_load=1 lnet_peer_discovery_disabled=1\n")
629  self.logger.info(f"lnet config: {lnet_config}")
630 
631  with open(LNET_CONF_FILE, "w") as fp:
632  fp.write(lnet_config)
633 
634  execute_command(self, "systemctl enable lnet")
635  restart_services(self, ["lnet"])
636  time.sleep(2)
637  # Ping to nid
638  self.logger.info("Doing ping to nids\n")
639  ret = lnet_self_ping(self)
640  if not ret:
641  raise MotrError(errno.EINVAL, "lent self ping failed\n")
642 
644  cmd = "fi_info"
645  execute_command(self, cmd, verbose=True)
646 
648  cmd = "fi_info"
649  execute_command(self, cmd)
650  return True
651 
652 def swap_on(self):
653  cmd = "swapon -a"
654  execute_command(self, cmd)
655 
656 def swap_off(self):
657  cmd = "swapoff -a"
658  execute_command(self, cmd, retries=3)
659 
660 def add_swap_fstab(self, dev_name):
661  '''
662  1. check swap entry found in /etc/fstab
663  2. if found, do nothing
664  3. if not found, add swap entry in /etc/fstab
665  '''
666  swap_entry = f"{dev_name} swap swap defaults 0 0\n"
667  swap_found = False
668  swap_off(self)
669 
670  try:
671  with open(FSTAB, "r") as fp:
672  lines = fp.readlines()
673  for line in lines:
674  ret = line.find(dev_name)
675  if ret == 0:
676  swap_found = True
677  self.logger.info(f"Swap entry found: {swap_entry}\n")
678  except:
679  swap_on(self)
680  raise MotrError(errno.EINVAL, f"Cant read f{FSTAB}\n")
681 
682  try:
683  if not swap_found:
684  with open(FSTAB, "a") as fp:
685  fp.write(swap_entry)
686  self.logger.info(f"Swap entry added: {swap_entry}\n")
687  except:
688  raise MotrError(errno.EINVAL, f"Cant append f{FSTAB}\n")
689  finally:
690  swap_on(self)
691 
692 def del_swap_fstab_by_vg_name(self, vg_name):
693  swap_off(self)
694 
695  cmd = f"sed -i '/{vg_name}/d' {FSTAB}"
696  execute_command(self, cmd)
697 
698  swap_on(self)
699 
700 def create_swap(self, swap_dev):
701  self.logger.info(f"Make swap of {swap_dev}\n")
702  cmd = f"mkswap -f {swap_dev}"
703  execute_command(self, cmd)
704 
705  self.logger.info(f"Test {swap_dev} swap device\n")
706  cmd = f"test -e {swap_dev}"
707  execute_command(self, cmd)
708 
709  self.logger.info(f"Adding {swap_dev} swap device to {FSTAB}\n")
710  add_swap_fstab(self, swap_dev)
711 
712 
713 def create_lvm(self, index, metadata_dev):
714  '''
715  1. validate /etc/fstab
716  2. validate metadata device file
717  3. check requested volume group exist
718  4. if exist, remove volume group and swap related with it.
719  because if user request same volume group with different device.
720  5. If not exist, create volume group and lvm
721  6. create swap from lvm
722  '''
723  try:
724  cmd = f"fdisk -l {metadata_dev}2"
725  execute_command(self, cmd)
726  except MotrError:
727  pass
728  else:
729  metadata_dev = f"{metadata_dev}2"
730 
731  try:
732  cmd = f"pvdisplay {metadata_dev}"
733  out = execute_command(self, cmd)
734  except MotrError:
735  pass
736  else:
737  self.logger.warning(f"Volumes are already created on {metadata_dev}\n{out[0]}\n")
738  return False
739 
740  index = index + 1
741  node_name = self.server_node['name']
742  vg_name = f"vg_{node_name}_md{index}"
743  lv_swap_name = f"lv_main_swap{index}"
744  lv_md_name = f"lv_raw_md{index}"
745  swap_dev = f"/dev/{vg_name}/{lv_swap_name}"
746 
747  self.logger.info(f"metadata device: {metadata_dev}\n")
748 
749  self.logger.info(f"Checking for {FSTAB}\n")
750  validate_file(FSTAB)
751 
752  self.logger.info(f"Checking for {metadata_dev}\n")
753  validate_file(metadata_dev)
754 
755  cmd = f"fdisk -l {metadata_dev}"
756  execute_command(self, cmd)
757 
758  try:
759  cmd = f"vgs {vg_name}"
760  execute_command(self, cmd)
761  except MotrError:
762  pass
763  else:
764  self.logger.info(f"Removing {vg_name} volume group\n")
765 
766  del_swap_fstab_by_vg_name(self, vg_name)
767 
768  cmd = f"vgchange -an {vg_name}"
769  execute_command(self, cmd)
770 
771  cmd = f"vgremove {vg_name} -ff"
772  execute_command(self, cmd)
773 
774  self.logger.info(f"Creating physical volume from {metadata_dev}\n")
775  cmd = f"pvcreate {metadata_dev} --yes"
776  execute_command(self, cmd)
777 
778  self.logger.info(f"Creating {vg_name} volume group from {metadata_dev}\n")
779  cmd = f"vgcreate {vg_name} {metadata_dev}"
780  execute_command(self, cmd)
781 
782  self.logger.info(f"Adding {node_name} tag to {vg_name} volume group\n")
783  cmd = f"vgchange --addtag {node_name} {vg_name}"
784  execute_command(self, cmd)
785 
786  self.logger.info("Scanning volume group\n")
787  cmd = "vgscan --cache"
788  execute_command(self, cmd)
789 
790  self.logger.info(f"Creating {lv_swap_name} lvm from {vg_name}\n")
791  cmd = f"lvcreate -n {lv_swap_name} {vg_name} -l 51%VG --yes"
792  execute_command(self, cmd)
793 
794  self.logger.info(f"Creating {lv_md_name} lvm from {vg_name}\n")
795  cmd = f"lvcreate -n {lv_md_name} {vg_name} -l 100%FREE --yes"
796  execute_command(self, cmd)
797 
798  swap_check_cmd = "free -m | grep Swap | awk '{print $2}'"
799  free_swap_op = execute_command(self, swap_check_cmd)
800  allocated_swap_size_before = int(float(free_swap_op[0].strip(' \n')))
801  create_swap(self, swap_dev)
802  allocated_swap_op = execute_command(self, swap_check_cmd)
803  allocated_swap_size_after = int(float(allocated_swap_op[0].strip(' \n')))
804  if allocated_swap_size_before >= allocated_swap_size_after:
805  raise MotrError(errno.EINVAL, f"swap size before allocation"
806  f"({allocated_swap_size_before}M) must be less than "
807  f"swap size after allocation({allocated_swap_size_after}M)\n")
808  else:
809  self.logger.info(f"swap size before allocation ={allocated_swap_size_before}M\n")
810  self.logger.info(f"swap_size after allocation ={allocated_swap_size_after}M\n")
811  return True
812 
813 def calc_lvm_min_size(self, lv_path, lvm_min_size):
814  cmd = f"lsblk --noheadings --bytes {lv_path} | " "awk '{print $4}'"
815  res = execute_command(self, cmd)
816  lv_size = res[0].rstrip("\n")
817  lv_size = int(lv_size)
818  self.logger.info(f"{lv_path} size = {lv_size} \n")
819  if lvm_min_size is None:
820  lvm_min_size = lv_size
821  return lvm_min_size
822  lvm_min_size = min(lv_size, lvm_min_size)
823  return lvm_min_size
824 
826  try:
827  cvg_cnt = self.server_node[CVG_COUNT_KEY]
828  except:
829  raise MotrError(errno.EINVAL, "cvg_cnt not found\n")
830 
831  check_type(cvg_cnt, str, CVG_COUNT_KEY)
832 
833  try:
834  cvg = self.server_node['cvg']
835  except:
836  raise MotrError(errno.EINVAL, "cvg not found\n")
837 
838  # Check if cvg type is list
839  check_type(cvg, list, "cvg")
840 
841  # Check if cvg is non empty
842  if not cvg:
843  raise MotrError(errno.EINVAL, "cvg is empty\n")
844  return cvg_cnt, cvg
845 
847  check_type(storage, list, "storage")
848  for elem in storage:
849  check_type(elem, dict, "storage element")
850  for key, val in elem.items():
851  if key=="name":
852  val_type=str
853  check_type(val, val_type, key)
854  if key=="type":
855  val_type=str
856  check_type(val, val_type, key)
857  if key=="metadata_devices":
858  val_type=list
859  check_type(val, val_type, key)
860  sz = len(val)
861  for i in range(sz):
862  check_type(val[i], str, f"metadata_devices[{i}]")
863  if key=="data_devices":
864  val_type=list
865  check_type(val, val_type, key)
866  sz = len(val)
867  for i in range(sz):
868  check_type(val[i], str, f"data_devices[{i}]")
869 
870 def align_val(val, size):
871  return (int(val/size) * size)
872 
874  dev_count = 0
875  lvm_min_size = None
876 
877  md_disks_list = get_md_disks_lists(self, self.node)
878  md_disks = get_mdisks_from_list(self, md_disks_list)
879  md_len = len(md_disks)
880  for i in range(md_len):
881  lvm_min_size = calc_lvm_min_size(self, md_disks[i], lvm_min_size)
882  if lvm_min_size:
883  align_val(lvm_min_size, ALLIGN_SIZE)
884  self.logger.info(f"setting MOTR_M0D_IOS_BESEG_SIZE to {lvm_min_size}\n")
885  cmd = f'sed -i "/MOTR_M0D_IOS_BESEG_SIZE/s/.*/MOTR_M0D_IOS_BESEG_SIZE={lvm_min_size}/" {MOTR_SYS_CFG}'
886  execute_command(self, cmd)
887  return
888 
889 def config_lvm(self):
890  dev_count = 0
891  lvm_min_size = None
892  lvm_min_size = None
893 
894  cvg_cnt, cvg = get_cvg_cnt_and_cvg(self)
895  for i in range(int(cvg_cnt)):
896  cvg_item = cvg[i]
897  try:
898  metadata_devices = cvg_item["metadata_devices"]
899  except:
900  raise MotrError(errno.EINVAL, "metadata devices not found\n")
901  check_type(metadata_devices, list, "metadata_devices")
902  self.logger.info(f"\nlvm metadata_devices: {metadata_devices}\n\n")
903 
904  for device in metadata_devices:
905  ret = create_lvm(self, dev_count, device)
906  if ret == False:
907  continue
908  dev_count += 1
909  lv_md_name = f"lv_raw_md{dev_count}"
910  cmd = f"lvs -o lv_path | grep {lv_md_name}"
911  res = execute_command(self, cmd)
912  lv_path = res[0].rstrip("\n")
913  lvm_min_size = calc_lvm_min_size(self, lv_path, lvm_min_size)
914  if lvm_min_size:
915  self.logger.info(f"setting MOTR_M0D_IOS_BESEG_SIZE to {lvm_min_size}\n")
916  cmd = f'sed -i "/MOTR_M0D_IOS_BESEG_SIZE/s/.*/MOTR_M0D_IOS_BESEG_SIZE={lvm_min_size}/" {MOTR_SYS_CFG}'
917  execute_command(self, cmd)
918 
919 def get_lnet_xface() -> str:
920  """Get lnet interface."""
921  lnet_xface = None
922  try:
923  with open(LNET_CONF_FILE, 'r') as f:
924  # Obtain interface name
925  for line in f.readlines():
926  if len(line.strip()) <= 0: continue
927  tokens = re.split(r'\W+', line)
928  if len(tokens) > 4:
929  lnet_xface = tokens[4]
930  break
931  except:
932  raise MotrError(errno.EINVAL, f"Cant parse {LNET_CONF_FILE}")
933 
934  if lnet_xface == None:
935  raise MotrError(errno.EINVAL,
936  f"Cant obtain iface details from {LNET_CONF_FILE}")
937  if lnet_xface not in os.listdir(SYS_CLASS_NET_DIR):
938  raise MotrError(errno.EINVAL,
939  f"Invalid iface {lnet_xface} in lnet.conf")
940  return lnet_xface
941 
942 def check_pkgs(self, pkgs):
943  """Check rpm packages."""
944  for pkg in pkgs:
945  ret = 1
946  cmd = f"rpm -q {pkg}"
947 
948  try:
949  cmd_res = execute_command(self, cmd)
950  ret = cmd_res[1]
951  except MotrError:
952  pass
953 
954  if ret == 0:
955  self.logger.info(f"rpm found: {pkg}\n")
956  else:
957  raise MotrError(errno.ENOENT, f"Missing rpm: {pkg}")
958 
959 def get_nids(self, nodes):
960  """Get lnet nids of all available nodes in cluster."""
961  nids = []
962  myhostname = self.server_node["hostname"]
963 
964  for node in nodes:
965  if (myhostname == node):
966  cmd = "lctl list_nids"
967  else:
968  cmd = (f"ssh {node}"
969  " lctl list_nids")
970 
971  op = execute_command(self, cmd)
972  nids.append(op[0].rstrip("\n"))
973 
974  return nids
975 
976 def get_nodes(self):
977  nodes_info = Conf.get(self._index, 'server_node')
978  nodes= []
979  for value in nodes_info.values():
980  nodes.append(value["hostname"])
981  return nodes
982 
983 def lnet_ping(self):
984  """Lnet lctl ping on all available nodes in cluster."""
985  nodes = get_nodes(self)
986  # nodes is a list of hostnames
987  nids = get_nids(self, nodes)
988  self.logger.info("lnet pinging on all nodes in cluster\n")
989  for nid in nids:
990  cmd = f"lctl ping {nid}"
991  self.logger.info(f"lctl ping on: {nid}\n")
992  execute_command(self, cmd)
993 
994 def test_lnet(self):
995  '''
996  1. check lustre rpm
997  2. validate lnet interface which was configured in init
998  3. ping on lnet interface
999  4. lctl ping on all nodes in cluster. motr_setup post_install and prepare
1000  MUST be performed on all nodes before executing this step.
1001  '''
1002  self.logger.info("post_install and prepare phases MUST be performed "
1003  "on all nodes before executing test phase\n")
1004  search_lnet_pkgs = ["kmod-lustre-client", "lustre-client"]
1005  check_pkgs(self, search_lnet_pkgs)
1006 
1007  lnet_xface = get_lnet_xface()
1008  self.logger.info(f"lnet interface found: {lnet_xface}\n")
1009 
1010  cmd = f"ip addr show {lnet_xface}"
1011  cmd_res = execute_command(self, cmd)
1012  ip_addr = cmd_res[0]
1013 
1014  try:
1015  ip_addr = ip_addr.split("inet ")[1].split("/")[0]
1016  self.logger.info(f"lnet interface ip: {ip_addr}\n")
1017  except:
1018  raise MotrError(errno.EINVAL, f"Cant parse {lnet_xface} ip addr")
1019 
1020  self.logger.info(f"ping on: {ip_addr}\n")
1021  cmd = f"ping -c 3 {ip_addr}"
1022  execute_command(self, cmd)
1023 
1024  lnet_ping(self)
1025 
1026 def test_libfabric(self):
1027  search_libfabric_pkgs = ["libfabric"]
1028  check_pkgs(self, search_libfabric_pkgs)
1029 
1031  dev_count = 0
1032  cvg_cnt, cvg = get_cvg_cnt_and_cvg(self)
1033  for i in range(int(cvg_cnt)):
1034  cvg_item = cvg[i]
1035  try:
1036  metadata_devices = cvg_item["metadata_devices"]
1037  except:
1038  raise MotrError(errno.EINVAL, "metadata devices not found\n")
1039  check_type(metadata_devices, list, "metadata_devices")
1040  self.logger.info(f"\nlvm metadata_devices: {metadata_devices}\n\n")
1041 
1042  for device in metadata_devices:
1043  dev_count += 1
1044  return dev_count
1045 
1046 def lvm_exist(self):
1047  metadata_disks_count = get_metadata_disks_count(self)
1048  node_name = self.server_node['name']
1049 
1050  # Fetch lvm paths of existing lvm's e.g. /dev/vg_srvnode-1_md1/lv_raw_md1
1051  lv_list = execute_command(self, "lvdisplay | grep \"LV Path\" | awk \'{ print $3 }\'")[0].split('\n')
1052  lv_list = lv_list[0:len(lv_list)-1]
1053 
1054  # Check if motr lvms are already created.
1055  # If all are already created, return
1056  for i in range(1, metadata_disks_count+1):
1057  md_lv_path = f'/dev/vg_{node_name}_md{i}/lv_raw_md{i}'
1058  swap_lv_path = f'/dev/vg_{node_name}_md{i}/lv_main_swap{i}'
1059 
1060  if md_lv_path in lv_list:
1061  if swap_lv_path in lv_list:
1062  continue
1063  else:
1064  self.logger.warning(f"{swap_lv_path} does not exist. Need to create lvm\n")
1065  return False
1066  else:
1067  self.logger.warning(f"{md_lv_path} does not exist. Need to create lvm\n")
1068  return False
1069  return True
1070 
1071 def cluster_up(self):
1072  cmd = '/usr/bin/hctl status'
1073  self.logger.info(f"Executing cmd : '{cmd}'\n")
1074  ret = execute_command_without_exception(self, cmd)
1075  if ret == 0:
1076  return True
1077  else:
1078  return False
1079 
1080 def pkg_installed(self, pkg):
1081  cmd = f'/usr/bin/yum list installed {pkg}'
1082  ret = execute_command_without_exception(self, cmd)
1083  if ret == 0:
1084  self.logger.info(f"{pkg} is installed\n")
1085  return True
1086  else:
1087  self.logger.info(f"{pkg} is not installed\n")
1088  return False
1089 
1090 def test_io(self):
1091  mix_workload_path = f"{MOTR_WORKLOAD_DIR}/mix_workload.yaml"
1092  m0worklaod_path = f"{MOTR_WORKLOAD_DIR}/m0workload"
1093  m0crate_path = f"{MOTR_WORKLOAD_DIR}/m0crate_workload_batch_1_file1.yaml"
1094  if (
1095  os.path.isfile(m0worklaod_path) and
1096  os.path.isfile(mix_workload_path) and
1097  os.path.isfile(m0crate_path)
1098  ):
1099  cmd = f"{m0worklaod_path} -t {mix_workload_path}"
1100  out = execute_command(self, cmd, timeout_secs=1000)
1101  self.logger.info(f"{out[0]}\n")
1102  else:
1103  self.logger.error("workload files are missing\n")
1104 
1105  # Configure motr mini provisioner logger.
1106  # File to log motr mini prov logs: /var/log/seagate/motr/mini_provisioner.
1107  # Currently we log to both console and /var/log/seagate/motr/mini_provisioner.
1108  # Firstly check if /var/log/seagate/motr exist. If not, create it.
1109 
1110 def config_logger(self):
1111  logger = logging.getLogger(LOGGER)
1112  if not os.path.exists(LOGDIR):
1113  try:
1114  os.makedirs(LOGDIR, exist_ok=True)
1115  with open(f'{self.logfile}', 'w'): pass
1116  except:
1117  raise MotrError(errno.EINVAL, f"{self.logfile} creation failed\n")
1118  else:
1119  if not os.path.exists(self.logfile):
1120  try:
1121  with open(f'{self.logfile}', 'w'): pass
1122  except:
1123  raise MotrError(errno.EINVAL, f"{self.logfile} creation failed\n")
1124  logger.setLevel(logging.DEBUG)
1125  # create file handler which logs debug message in log file
1126  fh = logging.FileHandler(self.logfile)
1127  fh.setLevel(logging.DEBUG)
1128  # create console handler to log messages ERROR and above
1129  ch = logging.StreamHandler()
1130  ch.setLevel(logging.ERROR)
1131  formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
1132  fh.setFormatter(formatter)
1133  ch.setFormatter(formatter)
1134  logger.addHandler(fh)
1135  logger.addHandler(ch)
1136  return logger
1137 
1138 def remove_dirs(self, log_dir, patterns):
1139  if not os.path.exists(os.path.dirname(log_dir)):
1140  self.logger.warning(f"{log_dir} does not exist")
1141  return
1142 
1143  if len(patterns) == 0:
1144  self.logger.info(f"Removing {log_dir}")
1145  execute_command(self, f"rm -rf {log_dir}")
1146  return
1147 
1148  for pattern in patterns:
1149  removed_dirs = []
1150 
1151  # Search directories for files/dirs with pattern in their names and remove it.
1152  # e.g. removes addb* dirs from /var/motr
1153  # search_pat=/var/motr/addb*
1154 
1155  search_pat = "{}/{}*".format(log_dir, pattern)
1156  for dname in glob.glob(search_pat, recursive=True):
1157  removed_dirs.append(dname)
1158  execute_command(self, f"rm -rf {dname}")
1159  if len(removed_dirs) > 0:
1160  self.logger.info(f"Removed below directories of pattern {pattern} from {log_dir}.\n{removed_dirs}")
1161 
1162 def remove_logs(self, patterns):
1163  for log_dir in MOTR_LOG_DIRS:
1164  if os.path.exists(log_dir):
1165  remove_dirs(self, log_dir, patterns)
1166  else:
1167  self.logger.warning(f"{log_dir} does not exist")
1168  if os.path.exists(IVT_DIR):
1169  self.logger.info(f"Removing {IVT_DIR}")
1170  execute_command(self, f"rm -rf {IVT_DIR}")
1171 
1172 def check_services(self, services):
1173  for service in services:
1174  self.logger.info(f"Checking status of {service} service\n")
1175  cmd = f"systemctl status {service}"
1176  execute_command(self, cmd)
1177  ret = execute_command_without_exception(self, cmd)
1178  if ret != 0:
1179  return False
1180  return True
1181 
1182 def verify_lnet(self):
1183  self.logger.info("Doing ping to nids.\n")
1184  ret = lnet_self_ping(self)
1185  if not ret:
1186  # Check if lnet is up. If lnet is not up, restart lnet and try ping nid.
1187  # Else, ping nid after some delay since lnet is already up.
1188  if not check_services(self, ["lnet"]):
1189  self.logger.info("lnet is not up. Restaring lnet.\n")
1190  restart_services(self, ["lnet"])
1191  self.logger.info("Doing ping to nids after 5 seconds.\n")
1192  else:
1193  self.logger.warning("lnet is up. Doing ping to nids after 5 seconds.\n")
1194  execute_command_without_exception(self, "sleep 5")
1195  ret = lnet_self_ping(self)
1196  return ret
1197 
1198 def lnet_self_ping(self):
1199  nids = []
1200 
1201  op = execute_command(self, "lctl list_nids")
1202  nids.append(op[0].strip("\n"))
1203  self.logger.info(f"nids= {nids}\n")
1204  for nid in nids:
1205  cmd = f"lctl ping {nid}"
1206  self.logger.info(f"lctl ping on: {nid}\n")
1207  ret = execute_command_without_exception(self, cmd)
1208  if ret != 0:
1209  return False
1210  return True
1211 
1213  hostname = self.server_node["hostname"]
1214  nodes_info = Conf.get(self._index, 'server_node')
1215  retry_count = 60
1216  retry_delay = 2
1217  for value in nodes_info.values():
1218  host = value["hostname"]
1219  cvg_count = value[CVG_COUNT_KEY]
1220  name = value["name"]
1221  self.logger.info(f"update_motr_hare_keys for {host}\n")
1222  for i in range(int(cvg_count)):
1223  lv_path = None
1224  lv_md_name = f"lv_raw_md{i + 1}"
1225 
1226  if (hostname == value["hostname"]):
1227  cmd = ("lvs -o lv_path")
1228  res = execute_command_verbose(self, cmd)
1229  r = re.compile(f".*{lv_md_name}")
1230  try:
1231  lvm_find = list(filter(r.match,res[0].split()))
1232  lv_path = lvm_find[0].strip()
1233  except Exception as e:
1234  self.logger.info(f"exception pass {e}\n")
1235  else:
1236  cmd = (f"ssh {host}"
1237  f" \"lvs -o lv_path\"")
1238  for retry in range(1, retry_count):
1239  self.logger.info(f"Getting LVM data for {host}, attempt: {retry}\n")
1240  res = execute_command_verbose(self, cmd)
1241  r = re.compile(f".*{lv_md_name}")
1242  try:
1243  lvm_find = list(filter(r.match,res[0].split()))
1244  lv_path = lvm_find[0].strip()
1245  except Exception as e:
1246  self.logger.info(f"exception pass {e}\n")
1247  if lv_path:
1248  self.logger.info(f"found lvm {lv_path} after {retry} count")
1249  break
1250  else:
1251  time.sleep(retry_delay)
1252  if not lv_path:
1253  raise MotrError(res[1], f"[ERR] {lv_md_name} not found on {host}\n")
1254  self.logger.info(f"setting key server>{name}>cvg[{i}]>m0d[0]>md_seg1"
1255  f" with value {lv_path} in {self._motr_hare_conf}")
1256  Conf.set(self._index_motr_hare,f"server>{name}>cvg[{i}]>m0d[0]>md_seg1",f"{lv_path.strip()}")
1257  Conf.save(self._index_motr_hare)
1258 
1259  for value in nodes_info.values():
1260  if (hostname == value["hostname"]):
1261  continue
1262  else:
1263  host = value["hostname"]
1264  cmd = (f"scp {self._motr_hare_conf}"
1265  f" {host}:{self._motr_hare_conf}")
1266  execute_command(self, cmd)
1267 
1268 # Get voulme groups created on metadata devices mentioned in config file
1269 def get_vol_grps(self):
1270  cvg_cnt, cvg = get_cvg_cnt_and_cvg(self)
1271 
1272  vol_grps = []
1273  for i in range(int(cvg_cnt)):
1274  cvg_item = cvg[i]
1275  try:
1276  metadata_devices = cvg_item["metadata_devices"]
1277  except:
1278  raise MotrError(errno.EINVAL, "metadata devices not found\n")
1279  check_type(metadata_devices, list, "metadata_devices")
1280  self.logger.info(f"lvm metadata_devices: {metadata_devices}")
1281 
1282  for device in metadata_devices:
1283  cmd = f"pvs | grep {device} " "| awk '{print $2}'"
1284  ret = execute_command(self, cmd)
1285  if ret[0]:
1286  vol_grps.append(ret[0].strip())
1287  return vol_grps
1288 
1289 def lvm_clean(self):
1290  self.logger.info("Removing cortx lvms")
1291  vol_grps = get_vol_grps(self)
1292  if (len(vol_grps) == 0):
1293  self.logger.info("No cortx volume groups (e.g. vg_srvnode-1_md1) are found \n")
1294  return
1295  self.logger.info(f"Volume groups found: {vol_grps}")
1296  self.logger.info("Executing swapoff -a")
1297  swap_off(self)
1298  self.logger.info(f"Removing cortx LVM entries from {FSTAB}")
1299  execute_command(self, f"sed -i.bak '/vg_srvnode/d' {FSTAB}")
1300  for vg in vol_grps:
1301  cmd = f"pvs|grep {vg} |" "awk '{print $1}'"
1302  pv_names = execute_command(self, cmd)[0].split('\n')[0:-1]
1303  cmd = f"lvs|grep {vg} |" "awk '{print $1}'"
1304  lv_names = execute_command(self, cmd)[0].split('\n')[0:-1]
1305 
1306  # Removing logical volumes
1307  for lv in lv_names:
1308  lv_path = f"/dev/{vg}/{lv}"
1309  self.logger.info(f"Executing lvchange -an {lv_path}")
1310  execute_command(self, f"lvchange -an {lv_path}")
1311  self.logger.info(f"Executing lvremove {lv_path}")
1312  execute_command(self, f"lvremove {lv_path}")
1313  if os.path.exists(lv_path):
1314  self.logger.info("Removing dmsetup entries using cmd "
1315  f"\'dmsetup remove {lv_path}\'")
1316  execute_command(self, f"dmsetup remove {lv_path}")
1317 
1318  # Removing volume groups
1319  self.logger.info(f"Executing vgchange -an {vg}")
1320  execute_command(self, f"vgchange -an {vg}")
1321  self.logger.info(f"Executing vgremove {vg}")
1322  execute_command(self, f"vgremove {vg}")
1323 
1324  # Removing physical volumes
1325  for pv in pv_names:
1326  self.logger.info(f"Executing pvremove {pv}")
1327  execute_command(self, f"pvremove {pv}")
1328  self.logger.info(f"Executing wipefs -a {pv}")
1329  execute_command(self, f"wipefs -a {pv}")
1330 
1331  # In some case, we still have dm entries even though all VG, LV, PV
1332  # are removed. This is observed in hw setups where lvms are not cleaned up
1333  remove_dm_entries(self)
1334 
1336  cmd = "ls -l /dev/vg_srvnode*/* | awk '{print $9}'"
1337  lv_paths = execute_command(self, cmd)[0].split('\n')
1338  for lv_path in lv_paths:
1339  if lv_path == '':
1340  continue
1341  else:
1342  if os.path.exists(lv_path):
1343  self.logger.info(f"dmsetup remove {lv_path}")
1344  execute_command(self, f"dmsetup remove {lv_path}")
1345 
1346 def get_disk_size(self, device):
1347  cmd = f"fdisk -l {device} |" f"grep {device}:" "| awk '{print $5}'"
1348  ret = execute_command(self, cmd)
1349  return ret[0].strip()
1350 
1351 def read_config(file):
1352  fp = open(file, "r")
1353  file_data = fp.read()
1354  config_dict = {}
1355  for line in file_data.splitlines():
1356  if line.startswith('#') or (len(line.strip()) == 0):
1357  continue
1358  entry = line.split('=',1)
1359  config_dict[entry[0]] = entry[1]
1360  return config_dict
1361 
1362 def part_clean(self):
1363  cvg_cnt, cvg = get_cvg_cnt_and_cvg(self)
1364  dev_count = 1
1365  ret = 0
1366  for i in range(int(cvg_cnt)):
1367  cvg_item = cvg[i]
1368  try:
1369  metadata_devices = cvg_item["metadata_devices"]
1370  except:
1371  raise MotrError(errno.EINVAL, "metadata devices not found\n")
1372  check_type(metadata_devices, list, "metadata_devices")
1373  self.logger.info(f"\nlvm metadata_devices: {metadata_devices}\n\n")
1374  for device in metadata_devices:
1375  ret = delete_parts(self, dev_count, device)
1376  #if ret != 0:
1377  # return ret
1378  dev_count += 1
1379  return ret
1380 
1381 # It will give num of partitions + 1.
1382 # To get partition numbers, subract 1 from output
1383 def get_part_count(self, device):
1384  fname = os.path.split(device)
1385  cmd = f"lsblk -o name | grep -c {fname}"
1386  ret = int(execute_command(self, cmd, verbose=True)[0]) - 1
1387  return ret
1388 
1389 def delete_parts(self, dev_count, device):
1390  # Delete 2 partitions be_log, raw_md
1391  total_parts = get_part_count(self, device)
1392  if total_parts == 0:
1393  self.logger.info(f"No partitions found on {device}")
1394  return
1395  self.logger.info(f"No. of partitions={total_parts} on {device}")
1396  for i in range(int(total_parts)):
1397  part_num = i + 1
1398  ret = delete_part(self, device, part_num)
1399  if ret != 0:
1400  self.logger.error(f"Deletion of partition({part_num}) failed on {device}")
1401  return ret
1402  time.sleep(2)
1403 
1404 def delete_part(self, device, part_num):
1405  cmd = f"fdisk {device}"
1406  stdin_str = str("d\n"+f"{part_num}"+"\n" + "w\n")
1407  ret = execute_command(self, cmd, stdin=stdin_str, verbose=True)
1408  return ret[1]
1409 
1410 def get_fid(self, fids, service, idx):
1411  fids_list = []
1412  len_fids_list = len(fids)
1413 
1414  # Prepare list of all fids of matching service
1415  for i in range(len_fids_list):
1416  if fids[i]["name"] == service:
1417  fids_list.append(fids[i]["fid"])
1418 
1419  num_fids = len(fids_list)
1420 
1421  # --idx argument is started from index 1, to read fetch-fids from index 0
1422  idx = int(idx) - 1
1423 
1424  if num_fids > 0:
1425  if idx < num_fids:
1426  return fids_list[idx]
1427  else:
1428  self.logger.error(f"Invalid index({idx}) of service({service})"
1429  f"Valid index should be in range [0-{num_fids-1}]."
1430  "Returning -1.")
1431  return -1
1432  else:
1433  self.logger.error(f"No fids for service({service}). Returning -1.")
1434  return -1
1435 
1436 # Fetch fid of service using command 'hctl fetch-fids'
1437 # First populate a yaml file with the output of command 'hctl fetch-fids'
1438 # Use this yaml file to get proper fid of required service.
1439 def fetch_fid(self, service, idx):
1440  hare_lib_path = f"{self.local_path}/hare/config/{self.machine_id}"
1441  cmd = f"hctl fetch-fids --conf-dir {hare_lib_path}"
1442  out = execute_command(self, cmd)
1443  self.logger.info(f"Available fids:\n{out[0]}\n")
1444  fp = open(TEMP_FID_FILE, "w")
1445  fp.write(out[0])
1446  fp.close()
1447  fp = open(TEMP_FID_FILE, "r")
1448  fids = yaml.safe_load(fp)
1449  if len(fids) == 0:
1450  self.logger.error("No fids returned by 'hctl fetch-fids'. Returning -1.\n")
1451  return -1
1452  fid = get_fid(self, fids, service, idx)
1453  return fid
1454 
1456  '''
1457  Get list of running m0d process
1458  '''
1459  listOfProc = []
1460  # Iterate over the list
1461  for proc in psutil.process_iter():
1462  try:
1463  # Fetch process details as dict
1464  pinfo = proc.as_dict(attrs=['pid', 'name', 'username'])
1465  if pinfo.get('name') == "m0d":
1466  # Append dict to list
1467  listOfProc.append(pinfo);
1468  except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
1469  pass
1470  return listOfProc
1471 
1472 def receiveSigTerm(signalNumber, frame):
1473  for proc in getListOfm0dProcess():
1474  cmd=f"KILL -SIGTERM {proc.get('pid')}"
1476  return
1477 
1478 # If service is one of [ios,confd,hax] then we expect fid to start the service
1479 # and start services using motr-mkfs and motr-server.
1480 # For other services like 'motr-free-space-mon' we do nothing.
1481 def start_service(self, service, idx):
1482  self.logger.info(f"service={service}\nidx={idx}\n")
1483 
1484  if service in ["fsm", "client", "motr_client"]:
1485  cmd = f"{MOTR_FSM_SCRIPT_PATH}"
1486  execute_command_verbose(self, cmd, set_timeout=False)
1487  return
1488 
1489  # Copy mini prov logrotate script
1490  cmd = f"cp {MOTR_MINI_PROV_LOGROTATE_SCRIPT} {CROND_DIR}"
1491  execute_command(self, cmd)
1492  # Start crond service
1493  cmd = "/usr/sbin/crond start"
1494  execute_command(self, cmd, timeout_secs=120)
1495 
1496  # Copy confd_path to /etc/sysconfig
1497  # confd_path = MOTR_M0D_CONF_DIR/confd.xc
1498  confd_path = f"{self.local_path}/motr/sysconfig/{self.machine_id}/confd.xc"
1499  create_dirs(self, ["/etc/motr"])
1500 
1501  cmd = f"cp -f {confd_path} /etc/motr/"
1502  execute_command(self, cmd)
1503 
1504  cmd = f"cp -v {self.local_path}/motr/sysconfig/{self.machine_id}/motr /etc/sysconfig/"
1505  execute_command(self, cmd)
1506 
1507  fid = fetch_fid(self, service, idx)
1508  if fid == -1:
1509  return -1
1510 
1511  #Start motr services
1512  cmd = f"{MOTR_SERVER_SCRIPT_PATH} m0d-{fid}"
1513  execute_command_console(self, cmd)
1514  return
def receiveSigTerm(signalNumber, frame)
def add_swap_fstab(self, dev_name)
def create_lvm(self, index, metadata_dev)
static void split(m0_bindex_t offset, int nr, bool commit)
Definition: extmap.c:230
static struct m0_list list
Definition: list.c:144
def update_config_file(self, fname, kv_list)
def config_lvm(self)
def motr_config(self)
def verify_libfabric(self)
def get_disk_size(self, device)
def motr_config_k8(self)
def check_type(var, vtype, msg)
def get_fid(self, fids, service, idx)
static int error
Definition: mdstore.c:64
def update_copy_motr_config_file(self)
def execute_command_console(self, command)
def update_bseg_size(self)
static void decode(struct m0_xcode_obj *obj)
Definition: xcode.c:487
def calc_lvm_min_size(self, lv_path, lvm_min_size)
def get_md_disks_lists(self, node_info)
def execute_command_without_exception(self, cmd, timeout_secs=TIMEOUT_SECS, retries=1)
def test_libfabric(self)
def configure_machine_id(self, phase)
def restart_services(self, services)
def calc_size(self, sz)
Definition: filter.py:1
def validate_storage_schema(storage)
def test_lnet(self)
def delete_parts(self, dev_count, device)
def check_pkgs(self, pkgs)
def execute_command_verbose(self, cmd, timeout_secs=TIMEOUT_SECS, verbose=False, set_timeout=True, retry_count=CMD_RETRY_COUNT)
def lnet_ping(self)
def delete_part(self, device, part_num)
def validate_motr_rpm(self)
def validate_file(file)
def verify_lnet(self)
def configure_net(self)
def create_dirs(self, dirs)
def get_cvg_cnt_and_cvg(self)
def __init__(self, rc, message, args)
def align_val(val, size)
def create_swap(self, swap_dev)
def remove_logs(self, patterns)
def config_logger(self)
def get_metadata_disks_count(self)
def set_setup_size(self, service)
def configure_lnet(self)
def validate_files(files)
def part_clean(self)
def lvm_exist(self)
def cluster_up(self)
def get_part_count(self, device)
def get_vol_grps(self)
def remove_dirs(self, log_dir, patterns)
def get_nids(self, nodes)
def fetch_fid(self, service, idx)
format
Definition: hist.py:128
def read_config(file)
def update_motr_hare_keys_for_all_nodes(self)
static long long min(long long a, long long b)
Definition: crate.c:191
def execute_command(self, cmd, timeout_secs=TIMEOUT_SECS, verbose=False, retries=1, stdin=None, logging=True)
def del_swap_fstab_by_vg_name(self, vg_name)
def lvm_clean(self)
def swap_off(self)
def execute_command_without_log(cmd, timeout_secs=TIMEOUT_SECS, verbose=False, retries=1, stdin=None, logging=False)
def pkg_installed(self, pkg)
def get_data_nodes(self)
def get_server_node(self)
def update_to_file(self, index, url, machine_id, md_disks)
def add_entry_to_logrotate_conf_file(self)
def lnet_self_ping(self)
def get_value(self, key, key_type)
def configure_libfabric(self)
def get_nodes(self)
def start_service(self, service, idx)
def get_mdisks_from_list(self, md_lists)
def remove_dm_entries(self)
def swap_on(self)
def check_services(self, services)
def is_hw_node(self)
def get_logical_node_class(self)
def update_motr_hare_keys(self, nodes)