【问题标题】:Return Number of Errors From Splunk Search in Python从 Python 中的 Splunk 搜索返回错误数
【发布时间】:2023-09-26 06:13:01
【问题描述】:

是否有任何方法可以获取在使用 splunklib.results 模块或任何 splunklib 模块进行 Splunk 搜索期间发生的错误数?

下面是我目前的代码:

#purpose of script: To connect to Splunk, execute a query, and write the query results out to an excel file.
#query results = multiple dynamic # of rows. 7 columns. 

#!/usr/bin/env python
import splunklib.client as client #splunklib.client class is used to connect to splunk, authenticate, and maintain session
import splunklib.results as results #module for returning results and printing/writing them out

listOfAppIDs = []
#open file to read each line and add each line in file to an array. These are our appID's to search
with open('filelocation.txt', 'r') as fi:
    for line in fi:
        listOfAppIDs.append(line.rstrip('\n'))
print listOfAppIDs

#identify variables used to log in
HOST = "8.8.8.8"
PORT = 8089
USERNAME = "uName"
PASSWORD = "pWord"

startPoint = "appID1" #initial start point in array

outputCsv = open('filelocation.csv', 'wb')
fieldnames = ['Application ID', 'transport', 'dst_port', 'Average Throughput per Month','Total Sessions Allowed', 'Unique Source IPs', 'Unique Destination IPs']
writer = csv.DictWriter(outputCsv, fieldnames=fieldnames)
writer.writeheader();

def connect():
    global startPoint , item
    print "startPoint: " + startPoint

    #Create a service instance by using the connect function and log in
    service = client.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD,
        autologin=True
    )   
    jobs = service.jobs# Get the collection of jobs/searches
    kwargs_blockingsearch = {"exec_mode": "normal"}

    try:
        for item in listOfAppIDs:
            errorCount=0
            print "item: " + item
            if (item >= startPoint):    
                searchquery_blocking = "search splunkQery"
                print item + ':'
                job = jobs.create(searchquery_blocking, **kwargs_blockingsearch) # A blocking search returns query result. Search executes here
                print "Splunk query for appID " , item , " completed! \n"
                resultCount = job["resultCount"] #number of results this job (splunk query) returned
                print "result count " , resultCount
                rr = results.ResultsReader(job.results())
                for result in rr:
                    if isinstance(result, results.Message):
                        # Diagnostic messages may be returned in the results
                        # Check the type and do something.
                        if result.type == log_type:
                            print '%s: %s' % (result.type, result.message)
                            errorCount+=1
                    elif isinstance(result, dict):
                        # Normal events are returned as dicts
                        # Do something with them if required.
                        print result
                        writer.writerow([result + errorCount])
                        pass
                assert rr.is_preview == False
    except:
        print "\nexcept\n"
        startPoint = item #returh to connect function but start where startPoint is at in array
        connect()

   print "done!"    

connect()

上面的代码出现以下错误:

'OrderedDict' object has no attribute 'messages'

【问题讨论】:

  • 哈哈哈 HOST = "8.8.8.8"。这是谷歌的 DNS 服务器 ;-)

标签: python splunk


【解决方案1】:
from splunklib import results
my_feed=results.ResultsReader(open("results.xml"))

log_type='ERROR'

n_errors=0
for result in my_feed.results:
    if isinstance(result, results.Message):
       if result.type==log_type:
          print result.message
          n_errors+=1

您可能对 data.load() 有疑问,因为它需要一个带有单个根节点的 xml。如果您在一个提要中有多个结果节点,则可以解决此包装提要的问题,即:"<root>+open("feed.xml").read()</root>"

如果您可以访问原始提要而不是数据对象,则可以使用 lxml insted of splunk lib

len( lxml.etree.parse("results.xml").findall("//messages/msg[@type='ERROR']") )

以下是基于 splunklib 文档的完整示例。 ResultsReader 解析原子提要并为您调用每个结果上的data.load()

      import splunklib.client as client
      import splunklib.results as results
      from time import sleep

      log_type='ERROR'

      service = client.connect(...)
      job = service.jobs.create("search * | head 5")
      while not job.is_done():
          sleep(.2)
      rr = results.ResultsReader(job.results())
      for result in rr:
          if isinstance(result, results.Message):
              # Diagnostic messages may be returned in the results
              # Check the type and do something.
              if result.type == log_type:
                 print '%s: %s' % (result.type, result.message)
          elif isinstance(result, dict):
              # Normal events are returned as dicts
              # Do something with them if required.
              pass
      assert rr.is_preview == False

【讨论】:

  • 在哪里可以找到我的feed.xml 文档?
  • 添加了一个完整的示例...检查客户端文档以获取不同的 client.connect() 参数。您的 xml 是通过作业中的 rest api 检索的...创建一个新作业(如示例),或者通过 sid 使用 client.job() 方法检索计划的作业。
  • 否则,计划作业可能会将其结果存储在 splunk 服务器上的 xml 文件中。然后就可以直接访问了。
  • 您在哪种模式下运行此脚本的搜索?正常、阻塞、单发、导出?
  • 默认正常,查看dev.splunk.com/view/python-sdk/SP-CAAAEE5各模式的示例代码