Python 第三方模块之 splunklib - 连接 splunk

Splunk SDK for Python

通过 Splunk SDK 我们可以和 Splunk 引擎进行交互。Splunk SDK 是基于 REST API 的,因此通过简短的代码实现我们想要的功能。

官方文档:https://dev.splunk.com/enterprise/docs/devtools/python/sdk-python/

安装

1
pip install splunk-sdk

模块介绍

主要有4个模块:

  • binding: 基于HTTP的抽象层
  • client: 基于REST API的抽象层,其中Service类是其最重要的类,并且client模块比binding模块有更多的好处。
  • results: 对splunk返回的数据进行处理
  • data: 将 Atom Feed data 转换为 python 格式

常用的模块是 client 和 results 模块。

搜索模式

搜索以不同的模式运行,确定何时以及如何检索结果:

  • 正常:正常搜索异步运行。它会立即返回一个搜索作业。轮询作业以确定其状态。您可以在搜索完成后检索结果。如果启用“预览”,您还可以预览结果。普通模式适用于实时搜索。
  • 阻塞:阻塞搜索同步运行。在搜索完成之前它不会返回搜索作业,因此无需轮询状态。阻止模式不适用于实时搜索。
  • 一次性:一次性搜索是计划立即运行的阻塞搜索。不返回搜索作业,而是在完成后返回搜索结果。因为这是一个阻塞搜索,所以在搜索完成之前结果不可用。
  • 导出:导出搜索是另一种立即运行的搜索操作,不返回搜索作业,并立即开始流式传输结果。

对于那些产生搜索作业(正常和阻塞)的搜索,搜索结果会在服务器上保存一段时间,并且可以根据请求进行检索。
对于流式传输结果(一次性和导出)的那些搜索,搜索结果不会保留在服务器上。如果流因任何原因被中断,如果不再次运行搜索,则无法恢复结果。

代码示例

与 Splunk 建立连接

1
2
3
4
5
6
from splunklib import client
service = client.connect(host=HOST, port=PORT, username=USERNAME, password=PASSWORD)

# Print installed apps to the console to verify login
for app in service.apps:
print(app.name)

处理 job 的类

  • 用于收集搜索作业的splunklib.client.Jobs类。
  • 单个搜索作业的splunklib.client.Job类。

列出当前用户的搜索作业

此示例显示如何使用splunklib.client.Jobs类来检索当前用户可用的作业集合:

1
2
3
4
5
# Get the collection of jobs and find out how many there are
jobs = service.jobs

print "There are", len(jobs), "jobs available to the current user"
print "\nSearch IDs:\n " + "\n ".join([job.sid for job in jobs])

创建 block job 和显示属性

运行阻塞搜索会创建一个搜索作业并以“阻塞”模式同步运行搜索。搜索完成后返回作业,所有结果都在里面。

创建搜索作业时,需要将作业的参数设置为键值对字典。有关所有可能参数的列表,请参阅Splunk Enterprise REST API 参考手册中的搜索/作业端点的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import sys
import splunklib.results as results
import splunklib.client as client
service = client.connect(username="admin", password="yourpassword")
jobs = service.jobs

kwargs_blockingsearch = {
"exec_mode": "blocking",
"earliest_time": "-24h@h",
"latest_time": "now",
}
searchquery_blocking = "search * | head 100"
print "Wait for the search to finish..."

job = jobs.create(searchquery_blocking, **kwargs_blockingsearch)
print "...done!\n"

print "Search job properties"
print "Search job ID: ", job["sid"]
print "The number of events: ", job["eventCount"]
print "The number of results:", job["resultCount"]
print "Search duration: ", job["runDuration"], "seconds"
print "This job expires in: ", job["ttl"], "seconds"

# 默认情况下搜索最多会返回 100 个事件,即使搜索结果中有超过 100 个事件也是如此,如果想要不限制,可以将 count 设置 为 0
for result in results.JSONResultsReader(job.results(output_mode='json', count=0)):
print(result)

创建普通搜索、轮询完成并显示结果

运行普通搜索会创建一个搜索作业并立即返回搜索 ID,因此您需要轮询该 job 以了解搜索何时完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import sys
from time import sleep
import splunklib.results as results
import splunklib.client as client
service = client.connect(username="admin", password="yourpassword")

searchquery_normal = "search * | head 10"
kwargs_normalsearch = {"exec_mode": "normal"}
job = service.jobs.create(searchquery_normal, **kwargs_normalsearch)

while True:
while not job.is_ready():
pass
stats = {
"isDone": job["isDone"],
"doneProgress": float(job["doneProgress"])*100,
"scanCount": int(job["scanCount"]),
"eventCount": int(job["eventCount"]),
"resultCount": int(job["resultCount"])
}

status = ("\r%(doneProgress)03.1f%% %(scanCount)d scanned "
"%(eventCount)d matched %(resultCount)d results") % stats

sys.stdout.write(status)
sys.stdout.flush()
if stats["isDone"] == "1":
sys.stdout.write("\n\nDone!\n\n")
break
sleep(2)

# 默认情况下搜索最多会返回 100 个事件,即使搜索结果中有超过 100 个事件也是如此,如果想要不限制,可以将 count 设置 为 0
for result in results.JSONResultsReader(job.results(output_mode='json', count=0)):
print result

job.cancel()
sys.stdout.write('\n')

创建基本的一次性搜索并显示结果

从 Splunk Enterprise 中获取数据的最简单方法是使用 一次性 搜索,它会创建同步搜索。与正常搜索或阻塞搜索不同,一次性搜索不会创建并返回搜索作业,而是会阻塞直到搜索完成,然后返回包含事件的流。要设置搜索属性(例如,指定要搜索的时间范围),请使用属性键值对创建字典。一些常见的属性是:

  • output_mode:指定结果的输出格式(XML、JSON、JSON_COLS、JSON_ROWS、CSV、ATOM 或 RAW)。
  • early_time:指定要搜索的时间范围内的最早时间。时间字符串可以是 UTC 时间(带小数秒)、相对时间说明符(到现在)或格式化的时间字符串。
  • latest_time:指定要搜索的时间范围内的最晚时间。时间字符串可以是 UTC 时间(带小数秒)、相对时间说明符(到现在)或格式化的时间字符串。
  • rf:指定一个或多个要添加到搜索的字段。
  • count: 默认情况下,一次性搜索最多会返回 100 个事件,即使搜索结果中有超过 100 个事件也是如此。要返回 100 多个事件,把 count 参数设置为零时,表示对要返回的事件的数量没有限制。

如果没有返回任何搜索结果,则表示指定时间范围内没有任何搜索结果。只需根据数据集的需要修改日期和时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import splunklib.results as results


kwargs_oneshot = {
"earliest_time": "2014-06-19T12:00:00.000-07:00",
"latest_time": "2014-06-20T12:00:00.000-07:00",
"output_mode": 'json'
}
searchquery_oneshot = "search * | head 10"

oneshotsearch_results = service.jobs.oneshot(searchquery_oneshot, **kwargs_oneshot)

reader = results.JSONResultsReader(oneshotsearch_results)
for item in reader:
print(item)

创建基本的导出搜索并显示结果

导出搜索是返回大量结果的最可靠方法,因为导出以流的形式返回结果,而不是作为保存在服务器上的搜索作业。因此,任何服务器端对可以返回的结果数量的限制都不适用于导出搜索。

您可以在正常和实时模式下运行导出搜索。导出搜索立即运行,并立即开始流式传输数据。运行导出搜索比运行预览搜索更有效,因为它直接将结果流式传输给您,而不必将它们写入磁盘以供以后使用。一旦结果准备好,您就会收到它们。

与一次性搜索一样,导出搜索会立即运行,不为搜索创建作业,并立即开始流式传输结果。要设置搜索属性(例如,指定要搜索的时间范围),请使用属性键值对创建字典。一些常见的属性是:

  • search_mode:指定搜索模式(_正常_或_实时_)。
  • early_time:指定要搜索的时间范围内的最早时间。时间字符串可以是 UTC 时间(带小数秒)、相对时间说明符(到现在)或格式化的时间字符串。
  • latest_time:指定要搜索的时间范围内的最晚时间。时间字符串可以是 UTC 时间(带小数秒)、相对时间说明符(到现在)或格式化的时间字符串。
  • rf:指定一个或多个要添加到搜索的字段。
  • count: 默认情况下,一次性搜索最多会返回 100 个事件,即使搜索结果中有超过 100 个事件也是如此。要返回 100 多个事件,把 count 参数设置为零时,表示对要返回的事件的数量没有限制。

如果您没有看到任何搜索结果,则表示指定时间范围内没有任何搜索结果。只需根据数据集的需要修改日期和时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 此示例通过调用export方法在过去一小时内运行您的内部索引的导出搜索。

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "lib"))
import splunklib.results as results

kwargs_export = {
"earliest_time": "-1h", # - Search everything in the last hour
"latest_time": "now",
"search_mode": "normal", # - Run a normal-mode search
"output_mode": "json"
}
searchquery_export = "search index=_internal"

exportsearch_results = service.jobs.export(searchquery_export, **kwargs_export)

reader = results.JSONResultsReader(exportsearch_results)
for result in reader:
if isinstance(result, dict):
print "Result: %s" % result
elif isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
print "Message: %s" % result

print "is_preview = %s " % reader.is_preview

参数

以下参数可用于搜索作业。

采集参数

默认情况下,检索集合时会返回所有条目。但是通过使用下面的参数,您还可以指定要返回的实体数量以及如何对它们进行排序。每当您检索集合时,这些参数都可用。

范围 描述
count 一个数字,指示要返回的最大实体数。“0”值表示没有最大值。
offset 一个数字,指定要返回的第一个实体的索引。
search 一个字符串,它指定用于过滤响应的搜索表达式,将字段值与搜索表达式匹配。例如,“search=foo”匹配字段中包含“foo”作为子字符串的任何对象,而“search=field_name%3Dfield_value”将匹配限制为单个字段。
sort_dir 一个枚举值,指定如何对实体进行排序。有效值为“asc”(升序)和“desc”(降序)。
sort_key 指定要排序的字段的字符串。
sort_mode 一个枚举值,指定如何对实体进行排序。有效值为“auto”、“alpha”(按字母顺序)、“alpha_case”(按字母顺序,区分大小写)或“num”(按数字顺序)。

搜索工作参数

有关完整列表,请参阅Splunk Enterprise REST API 参考手册中的搜索/作业端点的参数。


Python 第三方模块之 splunklib - 连接 splunk
https://flepeng.github.io/021-Python-31-Python-第三方模块-Python-第三方模块之-splunklib-连接-splunk/
作者
Lepeng
发布于
2021年4月27日
许可协议