Show / Hide Table of Contents

Splunk KV Store

Splunk also uses KV Store for lookup, please read this page to understand Splunk lookup

Also visit this page this page for what is Splunk KV Store collection

Here we will build a small Splunk App that will create REST API for all KV Store collection.

Before start

  1. Make sure you have good understanding of ReST API & Splunking and also keep handy bash shell to try out the API using curl command

  2. This App will work only on simple KV Store.

  3. You need to customize this App according to your need.

  4. You may found some issue in Python code, in this case you need to fix the issue


Problem

I want to create or extend REST API for my all KV Store collection


Solution

Here I am assuming you know the Splunk development, basic splunk deployment and working knowledge of CRUD operation

Splunk Configuration Files

Location of these files must be in apps default directory

Here is my two KV Store collection in collection.conf

# customer collection
[customer]
enforceTypes    = true
field.name      = number
field.age       = number
field.address   = string
field.city      = string
field.country   = string

# simple collection of student
[student]
enforceTypes= true
field.name  = string
field.age   = number
field.class = string

restmap.conf

# ----------------------------------------------------------------------
#
# ReST API Interface
#
[script:customer]
match                 = /customer
script                = collection_api_rest_handler.py
scripttype            = persist
handler               = collection_api_rest_handler.CollectionHandler
requireAuthentication = true
output_modes          = json
passPayload           = true
passHttpHeaders       = true
passHttpCookies       = true

[script:student]
match                 = /student
script                = collection_api_rest_handler.py
scripttype            = persist
handler               = collection_api_rest_handler.CollectionHandler
requireAuthentication = true
output_modes          = json
passPayload           = true
passHttpHeaders       = true
passHttpCookies       = true

web.conf

[expose:customer]
pattern=/customer
methods=GET, POST, PUT, DELETE

[expose:student]
pattern=/student
methods=GET, POST, PUT, DELETE

NOTE: If you want to add more API you need to expose the endpoints into web.conf and set the endpoint name in restmap.conf

Python Code

Location of these files must be in apps bin directory

Now we are going to add custom REST handler for these

collection_api_rest_handler.py

This is the main handler class - This class will intercept all the request from your collection mentioned in restmap.conf file

# System import
import os
import sys
import json
import logging
import time

# Append PYTHONPATH so script will load corresponding library
splunk_home = os.getenv('SPLUNK_HOME')
sys.path.append(splunk_home + '/etc/apps/collection_api_example/bin/')
sys.path.append(splunk_home + '/etc/apps/collection_api_example/bin/utils/')
sys.path.append(
    splunk_home + '/etc/apps/collection_api_example/bin/splunklib/')


# Splunk Import
from splunk.persistconn.application import PersistentServerConnectionApplication
from splunklib.binding import ResponseReader
from utils import parse

# Local import
from query_parameters import QueryParameters
from handle_delete import HandleDelete
from handle_get import HandleGet
from handle_put import HandlePut
from handle_post import HandlePost
from handle_error import HandleError
from handle_service import ServiceHandle


__author__ = 'Manoj Jangid'
__version__ = '0.0.1'
__description__ = 'Collection ReST Resource'


if sys.platform == "win32":
    import msvcrt
    # Binary mode is required for persistent mode on Windows.
    msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
    msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
    msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)


class CollectionHandler(PersistentServerConnectionApplication):
    collection = None

    def __init__(self, command_line, command_arg):
        PersistentServerConnectionApplication.__init__(self)

    def handle(self, payload):
        logging.debug('begin=handle_method request=%s' % (payload))
        response = {}
        try:
            # get the http method
            json_data = json.loads(payload)
            method = json_data['method']

            # get the collection name
            try:
                rest_path = json_data['rest_path']
                collection_name = rest_path.replace("/", "")

            except:
                logging.error('error_message=%s' % (sys.exc_info()[0]))

            try:
                service_handle = ServiceHandle(collection_name)
                collection = service_handle.get_collection_handle()
            except:
                logging.error(
                    'Unable to get service handle, please contact to support')

        except:
            raise Exception('handle: %s' % (sys.exc_info()[0]))

        if method == 'GET':
            get = HandleGet(collection, json_data)
            response = get.execute()
        elif method == 'PUT':
            put = HandlePut(collection, json_data)
            response = put.execute()
        elif method == 'POST':
            post = HandlePost(collection, json_data)
            response = post.execute()
        elif method == 'DELETE':
            delete = HandleDelete(collection, json_data)
            response = delete.execute()
        else:
            error = HandleError(collection, json_data)
            response = error.execute()

        logging.debug('end=handle_method')

        return response

Finally we are going to add some more custom class for CRUD operation

handle_service.py

Service handle modify the code according to your configuration

# System Import
import sys
import logging

# Splunk import
from splunklib.client import connect
from utils import parse

__author__ = 'Manoj Jangid'
__version__ = '0.0.1'
__description__ = 'This is class is responsible for service authorization'


class ServiceHandle(object):
    def __init__(self, collection_name):
        try:
            '''
            There is multiple way to authenticate this
            '''
            logging.debug('Service Handle Initialized')
            opts = parse(sys.argv[1:], {}, '.splunkrc')
            opts.kwargs["owner"] = 'nobody'
            opts.kwargs["app"] = 'collection_api_example'
            opts.kwargs["username"] = 'admin'
            opts.kwargs["password"] = 'admin'
            self.service = connect(**opts.kwargs)
            self.collection = self.service.kvstore[collection_name]
        except:
            raise Exception('Initialization Error: %s' % (sys.exc_info()[0]))

    def get_collection_handle(self):
        return self.collection

    def get_service_handle(self):
        return self.service

query_parameters.py

Handle query parameter

import json
import logging

__author__ = 'Manoj Jangid'
__version__ = '0.0.1'
__description__ = 'Custom Python class to handle the query parameter and return json array'


class QueryParameters(object):
    json_qp = {}

    def __init__(self, qp):
        logging.info('Query parameter class initialized with query_parameter=%s type=%s count=%s' % (
            qp, type(qp), len(qp)))
        self.json_qp = {}
        query_parameters = qp[0]

        qp_size = len(query_parameters)

        for count in range(0, qp_size):
            if count == 0:
                if str(query_parameters[count]) == str('query'):
                    logging.info('query_keyword=true filter=%s' %
                                 (query_parameters[0]))
                else:
                    logging.info('invalid query parameter %s=%s' %
                                 (str(query_parameters[0]), str('query')))
                    raise Exception(
                        'Error in Query parameters, please use correct way to pass')
            else:
                filters = query_parameters[count]
                filters = filters.encode("utf-8")
                coma_sep = filters.split(',')
                for filter in coma_sep:
                    key, value = filter.split(':')
                    self.json_qp[key] = value
                logging.info('Formatted Query String Parameters=%s' %
                             (json.dumps(self.json_qp)))

    def get(self):
        return json.dumps(self.json_qp)

    def getValue(self, key):
        return self.json_qp[key]

handle_get.py

Class for GET operation

# System Import
import sys
import json
import logging

# Splunk import
from splunklib.binding import ResponseReader

# Application import
from query_parameters import QueryParameters
from handle_error import HandleError


__author__ = 'Manoj Jangid'
__version__ = '0.0.1'
__description__ = 'This is class is responsible for all Get operation'

class HandleGet(object):

    def __init__(self, collection, request):
        logging.info('Initialize HandleGet class with request=%s' % (request))
        self.request = request
        self.collection = collection
        self.query_params = request['query']

    def get_filters(self):
        data = {}
        params = QueryParameters(self.query_params)
        param_list = json.loads(params.get())

        for param in param_list:
            data[param] = param_list[param]

        logging.info('filters=%s' % (json.dumps(data)))

        return json.dumps(data)

    def get(self):
        try:
            if len(self.query_params) > 0:
                response = self.collection.data.query(query=self.get_filters())
            else:
                response = self.collection.data.query()
            return response
        except:
            raise Exception(sys.exc_info()[0])

    # Customize your response
    def execute(self):
        try:
            self.request['body'] = self.get()        
            return {'payload': self.request, 'status': 200}
        except:
            error = HandleError(self.collection, self.request)
            return error.execute()

handle_post.py

Class for POST operation

# System Import
import sys
import json
import logging

# Application import
from query_parameters import QueryParameters


from handle_error import HandleError


__author__ = 'Manoj Jangid'
__version__ = '0.0.1'
__description__ = 'This is class is responsible for all Post operation'


class HandlePost(object):

    def __init__(self, collection, request):
        logging.debug(
            'Initialize HandlePost class with request=%s' % (request))
        self.request = request
        self.collection = collection

    def post(self):
        data = {}
        raw = self.request['payload']
        fields = raw.split('&')
        for keyVal in fields:
            key, value = keyVal.split('=')
            data[key] = value

        try:
            response = self.collection.data.insert(json.dumps(data))
        except:
            error = HandleError(self.collection, self.request)
            return error.execute()

        data['_key'] = response['_key']
        self.request['body'] = data

        return {'payload': self.request, 'status': 200}

    def execute(self):
        return self.post()

handle_put.py

class for PUT operation

# System Import
import sys
import json
import logging

# Application import
from handle_error import HandleError

__author__ = 'Manoj Jangid'
__version__ = '0.0.1'
__description__ = 'This is class is responsible for all Post operation'


class HandlePut(object):

    def __init__(self, collection, request):
        self.request = request
        self.collection = collection

    def put(self):
        req = self.request
        _key, _value = None, None
        json_obj = {}
        try:
            form_data = req['form']
            for data in form_data:
                key = data[0]
                val = data[1]
                if key == '_key':
                    _key = key
                    _value = val
                else:
                    json_obj[key] = val

            if _key == None:
                logging.debug('invalid input (_key is missing) data=%s' %
                              (json.dumps(json_obj)))
                error = HandleError(self.collection, self.request)
                return error.execute()

            query = {}
            query['_key'] = _value.encode('utf-8')

            response = self.collection.data.query(query=json.dumps(query))[0]

            logging.debug('New Value=%s' % (json.dumps(json_obj)))
            logging.debug('Old Value=%s' % (json.dumps(response)))

            try:
                logging.debug('Updating KV Store _key=%s data=%s' %
                              (_value, json.dumps(json_obj)))
                response = self.collection.data.update(
                    _value, json.dumps(json_obj))
            except:
                error = HandleError(self.collection, self.request)
                return error.execute()

            self.request['body'] = response

            logging.debug('end=put_operation')
            return {'payload': self.request, 'status': 200}

        except:
            logging.debug('error=%s' % (sys.exc_info()[0]))

    def execute(self):
        return self.put()

handle_delete.py

This is custom DELETE operation for resource

# System Import
import json
import logging

# Splunk import
from splunklib.binding import ResponseReader as RR

# Application import
from handle_error import HandleError


__author__ = 'Manoj Jangid'
__version__ = '0.0.1'
__description__ = 'This is class is responsible for all delete operation'


class HandleDelete(object):

    def __init__(self, collection, request):
        logging.debug(
            'Initialize HandleDelete class with request=%s' % (request))
        self.request = request
        self.collection = collection

    def delete(self):
        value = self.request.get('path_info')
        if value:
            logging.debug('path_info=%s' % self.request.get('path_info'))
        else:
            logging.debug('path_info key is missing')

        data = {}
        data['_key'] = value
        try:
            response = self.collection.data.delete(json.dumps(data))
        except:
            error = HandleError(self.collection, self.request)
            return error.execute()

        return {'payload': self.request, 'status': response['status']}

    def execute(self):
        return self.delete()

handle_error.py

You can implement your custom error handler in this class

# System Import
import logging
__author__ = 'Manoj Jangid'
__version__ = '0.0.1'
__description__ = 'Custom Error Handler'


class HandleError(object):

    def __init__(self, collection, request):
        logging.debug('Initialize HandleError class with request=%s' % (request))
        self.request = request
        self.collection = collection

    def error(self):
        return {'payload': 'Server Error - Contact to Support', 'status': 500}

    def execute(self):
        self.error()

How to use these API

The default resource URL for these API is:

  1. https://localhost:9089/services/customer
  2. https://localhost:9089/services/student

customer and student is the KV Store collection name

curl example

>

GET Operation

curl -k -u username:password https://localhost:9089/services/customer


>

POST Operation

curl -k -u username:password -X POST https://localhost:9089/services/customer -d field_name1=field_value1 -d field_name2=field_value2


>

PUT Operation

curl -k -u username:password -X PUT https://localhost:9089/services/customer -d _key={_key} -d field_name1=field_value1 -d field_name2=field_value2


>

DELETE Operation

curl -k -u username:password -X DELETE https://localhost:9089/services/customer/{_key}

Download Source

Fork Me On Github

https://github.com/mjangid/collection_api_example#fork-destination-box

Question OR Feedback?

Note

If you want to ask anything related this post, please tweet your question or feedback @JangidUK I will try to respond as soon as possible.

instagram linkedin youtube twitter facebook