【问题标题】:Graphite / Carbon / Ceres node overlap石墨/碳/谷神星节点重叠
【发布时间】:2012-11-12 18:53:58
【问题描述】:

我正在使用 Carbon 和 Ceres 作为存储方法进行 Graphite 监控。我在纠正不良数据时遇到了一些问题。看来(由于各种问题)我最终得到了重叠的文件。也就是说,由于 Carbon / Ceres 将数据存储为 timestamp@interval.slice,我可以拥有两个或多个时间范围重叠的文件。

有两种重叠:

File A:  +------------+        orig file
File B:      +-----+           subset
File C:          +---------+   overlap

这会导致问题,因为现有的可用工具(ceres-maintenance defrag 和 rollup)无法处理这些重叠。相反,他们跳过目录并继续前进。这显然是个问题。

【问题讨论】:

    标签: slice overlap graphite graphite-carbon


    【解决方案1】:

    我创建了一个脚本来解决这个问题,如下:

    1. 对于子集,只需删除子集文件即可。

    2. 对于重叠,在下一个文件开始的位置对原始文件使用文件系统“截断”。虽然可以切断重叠文件的开头并正确重命名,但我认为这充满了危险。

    我发现有两种方法可以做到这一点:

    1. 遍历目录并遍历文件,边走边修复,找到文件子集,删除它们;

    2. 在继续之前遍历目录并修复目录中的所有问题。这是迄今为止更快的方法,因为 dir walk 非常耗时。

    代码:

    #!/usr/bin/env python2.6
    ################################################################################
    
    import io
    import os
    import time
    import sys
    import string
    import logging
    import unittest
    import datetime
    import random
    import zmq
    import json
    import socket
    import traceback
    import signal
    import select
    import simplejson
    import cPickle as pickle
    import re
    import shutil
    import collections
    from pymongo import Connection
    from optparse import OptionParser
    from pprint import pprint, pformat
    
    ################################################################################
    
    class SliceFile(object):
        def __init__(self, fname):
            self.name       = fname
            basename        = fname.split('/')[-1]
            fnArray         = basename.split('@')
            self.timeStart  = int(fnArray[0])
            self.freq       = int(fnArray[1].split('.')[0])
            self.size       = None
            self.numPoints  = None
            self.timeEnd    = None
            self.deleted    = False
    
        def __repr__(self):
            out = "Name: %s, tstart=%s tEnd=%s, freq=%s, size=%s, npoints=%s." % (
                self.name, self.timeStart, self.timeEnd, self.freq, self.size, self.numPoints)
            return out
    
        def setVars(self):
            self.size       = os.path.getsize(self.name)
            self.numPoints  = int(self.size / 8)
            self.timeEnd    = self.timeStart + (self.numPoints * self.freq)
    
    ################################################################################
    
    class CeresOverlapFixup(object):
    
        def __del__(self):
            import datetime
            self.writeLog("Ending at %s" % (str(datetime.datetime.today())))
            self.LOGFILE.flush()
            self.LOGFILE.close()
    
        def __init__(self):
            self.verbose            = False
            self.debug              = False
            self.LOGFILE            = open("ceresOverlapFixup.log", "a")
            self.badFilesList       = set()
            self.truncated          = 0
            self.subsets            = 0
            self.dirsExamined       = 0            
            self.lastStatusTime     = 0
    
        def getOptionParser(self):
            return OptionParser()
    
        def getOptions(self):
            parser = self.getOptionParser()
            parser.add_option("-d", "--debug",      action="store_true",                 dest="debug",   default=False, help="debug mode for this program, writes debug messages to logfile." )
            parser.add_option("-v", "--verbose",    action="store_true",                 dest="verbose", default=False, help="verbose mode for this program, prints a lot to stdout." )
            parser.add_option("-b", "--basedir",    action="store",      type="string",  dest="basedir", default=None,  help="base directory location to start converting." )
            (options, args)     = parser.parse_args()
            self.debug          = options.debug
            self.verbose        = options.verbose
            self.basedir        = options.basedir
            assert self.basedir, "must provide base directory."
    
        # Examples:
        # ./updateOperations/1346805360@60.slice
        # ./updateOperations/1349556660@60.slice
        # ./updateOperations/1346798040@60.slice
    
        def getFileData(self, inFilename):
            ret = SliceFile(inFilename)
            ret.setVars()
            return ret
    
        def removeFile(self, inFilename):
            os.remove(inFilename)
            #self.writeLog("removing file: %s" % (inFilename))
            self.subsets += 1
    
        def truncateFile(self, fname, newSize):
            if self.verbose:
                self.writeLog("Truncating file, name=%s, newsize=%s" % (pformat(fname), pformat(newSize)))
            IFD = None
            try:
                IFD = os.open(fname, os.O_RDWR|os.O_CREAT)
                os.ftruncate(IFD, newSize)
                os.close(IFD)
                self.truncated += 1
            except:
                self.writeLog("Exception during truncate: %s" % (traceback.format_exc()))
            try:
                os.close(IFD)
            except:
                pass
            return
    
        def printStatus(self):
            now = self.getNowTime()
            if ((now - self.lastStatusTime) > 10):
                self.writeLog("Status: time=%d, Walked %s dirs, subsetFilesRemoved=%s, truncated %s files." % (now, self.dirsExamined, self.subsets, self.truncated))
                self.lastStatusTime = now
    
        def fixupThisDir(self, inPath, inFiles):
    
            # self.writeLog("Fixing files in dir: %s" % (inPath))
            if not '.ceres-node' in inFiles:
                # self.writeLog("--> Not a slice directory, skipping.")
                return
    
            self.dirsExamined += 1            
    
            sortedFiles = sorted(inFiles)
            sortedFiles = [x for x in sortedFiles if ((x != '.ceres-node') and (x.count('@') > 0)) ]
            lastFile    = None
            fileObjList = []
            for thisFile in sortedFiles:
                wholeFilename = os.path.join(inPath, thisFile)
                try:
                    curFile = self.getFileData(wholeFilename)
                    fileObjList.append(curFile)
                except:
                    self.badFilesList.add(wholeFilename)
                    self.writeLog("ERROR: file %s, %s" % (wholeFilename, traceback.format_exc()))
    
            # name is timeStart, really.
            fileObjList = sorted(fileObjList, key=lambda thisObj: thisObj.name)
    
            while fileObjList:
    
                self.printStatus()
    
                changes = False
                firstFile = fileObjList[0]
                removedFiles = []
                for curFile in fileObjList[1:]:
                    if (curFile.timeEnd <= firstFile.timeEnd):
                        # have subset file. elim.
                        self.removeFile(curFile.name)
                        removedFiles.append(curFile.name)
                        self.subsets += 1
                        changes = True
                        if self.verbose:
                            self.writeLog("Subset file situation.  First=%s, overlap=%s" % (firstFile, curFile))
                fileObjList = [x for x in fileObjList if x.name not in removedFiles]
                if (len(fileObjList) < 2):
                    break
                secondFile = fileObjList[1]
    
                # LT is right.  FirstFile's timeEnd is always the first open time after first is done.
                # so, first starts@100, len=2, end=102, positions used=100,101. second start@102 == OK.
                if (secondFile.timeStart < firstFile.timeEnd):
                    # truncate first file.
                    # file_A (last):    +---------+
                    # file_B (curr):         +----------+ 
                    # solve by truncating previous file at startpoint of current file.
                    newLenFile_A_seconds = int(secondFile.timeStart - firstFile.timeStart)
                    newFile_A_datapoints = int(newLenFile_A_seconds / firstFile.freq)
                    newFile_A_bytes      = int(newFile_A_datapoints) * 8
                    if (not newFile_A_bytes):
                        fileObjList = fileObjList[1:]
                        continue
                    assert newFile_A_bytes, "Must have size.  newLenFile_A_seconds=%s, newFile_A_datapoints=%s, newFile_A_bytes=%s." % (newLenFile_A_seconds, newFile_A_datapoints, newFile_A_bytes)
                    self.truncateFile(firstFile.name, newFile_A_bytes)
                    if self.verbose:
                        self.writeLog("Truncate situation.  First=%s, overlap=%s" % (firstFile, secondFile))
                    self.truncated += 1
                    fileObjList = fileObjList[1:]
                    changes = True
    
                if not changes:
                    fileObjList = fileObjList[1:]
    
    
        def getNowTime(self):
            return time.time()
    
    
        def walkDirStructure(self):
    
            startTime           = self.getNowTime()
            self.lastStatusTime = startTime
            updateStatsDict     = {}
            self.okayFiles      = 0
            emptyFiles          = 0 
    
            for (thisPath, theseDirs, theseFiles) in os.walk(self.basedir):
                self.printStatus()
                self.fixupThisDir(thisPath, theseFiles)
                self.dirsExamined += 1
    
            endTime = time.time()
            # time.sleep(11)
            self.printStatus()
            self.writeLog( "now = %s, started at %s, elapsed time = %s seconds." % (startTime, endTime, endTime - startTime))
            self.writeLog( "Done.")
    
    
        def writeLog(self, instring):
            print instring
            print >> self.LOGFILE, instring
            self.LOGFILE.flush()
    
        def main(self):
            self.getOptions()
            self.walkDirStructure()
    

    【讨论】:

    • 是的。可以清理,同意。但是,whatchagonnado?我可以花很长时间把它弄得更紧。如果您有更好的版本,请发布。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多