diff --git a/.github/workflows/appinspect.yaml b/.github/workflows/appinspect.yaml new file mode 100644 index 0000000..e88a79d --- /dev/null +++ b/.github/workflows/appinspect.yaml @@ -0,0 +1,57 @@ +name: App inspect tests +on: + push: + pull_request: + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.9.16' + + - name: Install Splunk Packaging Toolkit + run: | + curl https://download.splunk.com/misc/packaging-toolkit/splunk-packaging-toolkit-1.0.1.tar.gz -o /tmp/spl.tar.gz + pip install /tmp/spl.tar.gz + + - name: Create Splunk App Package + run: | + rm -rf .git .github .gitignore + slim package . + cp crowdsec-splunk-app-*.tar.gz /tmp/crowdsec-splunk-app.tar.gz + + - name: Retrieve App Inspect Report + run: | + TOKEN=$(curl -u '${{ secrets.SPLUNKBASE_USERNAME }}:${{ secrets.SPLUNKBASE_PASSWORD }}' --url 'https://api.splunk.com/2.0/rest/login/splunk' | jq -r .data.token) + echo "::add-mask::$TOKEN" + REPORT_HREF=$(curl -X POST \ + -H "Authorization: bearer $TOKEN" \ + -H "Cache-Control: no-cache" \ + -F "app_package=@/tmp/crowdsec-splunk-app.tar.gz" \ + --url "https://appinspect.splunk.com/v1/app/validate"| jq -r .links[1].href) + REPORT_URL="https://appinspect.splunk.com$REPORT_HREF" + sleep 10 + curl -X GET \ + -H "Authorization: bearer $TOKEN" \ + --url $REPORT_URL > /tmp/report + + - name: Upload App Inspect Report + uses: actions/upload-artifact@v2 + with: + name: report + path: /tmp/report + + - name: Check App Inspect Report Results + run: | + if grep -q '"result": "failure"' /tmp/report; then + echo "::error::App inspect check failed" + exit 1 + else + exit 0 + fi diff --git a/bin/splunklib/__init__.py b/bin/splunklib/__init__.py old mode 100755 new mode 100644 index 929a631..31787bd --- a/bin/splunklib/__init__.py +++ b/bin/splunklib/__init__.py @@ -16,5 +16,20 @@ from __future__ import absolute_import from splunklib.six.moves import map -__version_info__ = (1, 6, 12) +import logging + +DEFAULT_LOG_FORMAT = '%(asctime)s, Level=%(levelname)s, Pid=%(process)s, Logger=%(name)s, File=%(filename)s, ' \ + 'Line=%(lineno)s, %(message)s' +DEFAULT_DATE_FORMAT = '%Y-%m-%d %H:%M:%S %Z' + + +# To set the logging level of splunklib +# ex. To enable debug logs, call this method with parameter 'logging.DEBUG' +# default logging level is set to 'WARNING' +def setup_logging(level, log_format=DEFAULT_LOG_FORMAT, date_format=DEFAULT_DATE_FORMAT): + logging.basicConfig(level=level, + format=log_format, + datefmt=date_format) + +__version_info__ = (1, 7, 3) __version__ = ".".join(map(str, __version_info__)) diff --git a/bin/splunklib/binding.py b/bin/splunklib/binding.py old mode 100755 new mode 100644 index b0ed20e..85cb8d1 --- a/bin/splunklib/binding.py +++ b/bin/splunklib/binding.py @@ -31,6 +31,7 @@ import socket import ssl import sys +import time from base64 import b64encode from contextlib import contextmanager from datetime import datetime @@ -38,8 +39,8 @@ from io import BytesIO from xml.etree.ElementTree import XML +from splunklib import __version__ from splunklib import six -from splunklib.six import StringIO from splunklib.six.moves import urllib from .data import record @@ -49,6 +50,7 @@ except ImportError as e: from xml.parsers.expat import ExpatError as ParseError +logger = logging.getLogger(__name__) __all__ = [ "AuthenticationError", @@ -70,7 +72,7 @@ def new_f(*args, **kwargs): start_time = datetime.now() val = f(*args, **kwargs) end_time = datetime.now() - logging.debug("Operation took %s", end_time-start_time) + logger.debug("Operation took %s", end_time-start_time) return val return new_f @@ -296,8 +298,7 @@ def wrapper(self, *args, **kwargs): with _handle_auth_error("Autologin failed."): self.login() with _handle_auth_error( - "Autologin succeeded, but there was an auth error on " - "next request. Something is very wrong."): + "Authentication Failed! If session token is used, it seems to have been expired."): return request_fun(self, *args, **kwargs) elif he.status == 401 and not self.autologin: raise AuthenticationError( @@ -346,7 +347,8 @@ def _authority(scheme=DEFAULT_SCHEME, host=DEFAULT_HOST, port=DEFAULT_PORT): "http://splunk.utopia.net:471" """ - if ':' in host: + # check if host is an IPv6 address and not enclosed in [ ] + if ':' in host and not (host.startswith('[') and host.endswith(']')): # IPv6 addresses must be enclosed in [ ] in order to be well # formed. host = '[' + host + ']' @@ -454,6 +456,12 @@ class Context(object): :type splunkToken: ``string`` :param headers: List of extra HTTP headers to send (optional). :type headers: ``list`` of 2-tuples. + :param retires: Number of retries for each HTTP connection (optional, the default is 0). + NOTE THAT THIS MAY INCREASE THE NUMBER OF ROUND TRIP CONNECTIONS TO THE SPLUNK SERVER AND BLOCK THE + CURRENT THREAD WHILE RETRYING. + :type retries: ``int`` + :param retryDelay: How long to wait between connection attempts if `retries` > 0 (optional, defaults to 10s). + :type retryDelay: ``int`` (in seconds) :param handler: The HTTP request handler (optional). :returns: A ``Context`` instance. @@ -471,7 +479,8 @@ class Context(object): """ def __init__(self, handler=None, **kwargs): self.http = HttpLib(handler, kwargs.get("verify", False), key_file=kwargs.get("key_file"), - cert_file=kwargs.get("cert_file")) # Default to False for backward compat + cert_file=kwargs.get("cert_file"), context=kwargs.get("context"), # Default to False for backward compat + retries=kwargs.get("retries", 0), retryDelay=kwargs.get("retryDelay", 10)) self.token = kwargs.get("token", _NoAuthenticationToken) if self.token is None: # In case someone explicitly passes token=None self.token = _NoAuthenticationToken @@ -500,13 +509,13 @@ def get_cookies(self): return self.http._cookies def has_cookies(self): - """Returns true if the ``HttpLib`` member of this instance has at least - one cookie stored. + """Returns true if the ``HttpLib`` member of this instance has auth token stored. - :return: ``True`` if there is at least one cookie, else ``False`` + :return: ``True`` if there is auth token present, else ``False`` :rtype: ``bool`` """ - return len(self.get_cookies()) > 0 + auth_token_key = "splunkd_" + return any(auth_token_key in key for key in self.get_cookies().keys()) # Shared per-context request headers @property @@ -519,23 +528,27 @@ def _auth_headers(self): :returns: A list of 2-tuples containing key and value """ + header = [] if self.has_cookies(): return [("Cookie", _make_cookie_header(list(self.get_cookies().items())))] elif self.basic and (self.username and self.password): token = 'Basic %s' % b64encode(("%s:%s" % (self.username, self.password)).encode('utf-8')).decode('ascii') - return [("Authorization", token)] elif self.bearerToken: token = 'Bearer %s' % self.bearerToken - return [("Authorization", token)] elif self.token is _NoAuthenticationToken: - return [] + token = [] else: # Ensure the token is properly formatted if self.token.startswith('Splunk '): token = self.token else: token = 'Splunk %s' % self.token - return [("Authorization", token)] + if token: + header.append(("Authorization", token)) + if self.get_cookies(): + header.append(("Cookie", _make_cookie_header(list(self.get_cookies().items())))) + + return header def connect(self): """Returns an open connection (socket) to the Splunk instance. @@ -618,7 +631,7 @@ def delete(self, path_segment, owner=None, app=None, sharing=None, **query): """ path = self.authority + self._abspath(path_segment, owner=owner, app=app, sharing=sharing) - logging.debug("DELETE request to %s (body: %s)", path, repr(query)) + logger.debug("DELETE request to %s (body: %s)", path, repr(query)) response = self.http.delete(path, self._auth_headers, **query) return response @@ -681,7 +694,7 @@ def get(self, path_segment, owner=None, app=None, headers=None, sharing=None, ** path = self.authority + self._abspath(path_segment, owner=owner, app=app, sharing=sharing) - logging.debug("GET request to %s (body: %s)", path, repr(query)) + logger.debug("GET request to %s (body: %s)", path, repr(query)) all_headers = headers + self.additional_headers + self._auth_headers response = self.http.get(path, all_headers, **query) return response @@ -724,7 +737,12 @@ def post(self, path_segment, owner=None, app=None, sharing=None, headers=None, * :type headers: ``list`` of 2-tuples. :param query: All other keyword arguments, which are used as query parameters. - :type query: ``string`` + :param body: Parameters to be used in the post body. If specified, + any parameters in the query will be applied to the URL instead of + the body. If a dict is supplied, the key-value pairs will be form + encoded. If a string is supplied, the body will be passed through + in the request unchanged. + :type body: ``dict`` or ``str`` :return: The response from the server. :rtype: ``dict`` with keys ``body``, ``headers``, ``reason``, and ``status`` @@ -754,14 +772,20 @@ def post(self, path_segment, owner=None, app=None, sharing=None, headers=None, * headers = [] path = self.authority + self._abspath(path_segment, owner=owner, app=app, sharing=sharing) - logging.debug("POST request to %s (body: %s)", path, repr(query)) + + # To avoid writing sensitive data in debug logs + endpoint_having_sensitive_data = ["/storage/passwords"] + if any(endpoint in path for endpoint in endpoint_having_sensitive_data): + logger.debug("POST request to %s ", path) + else: + logger.debug("POST request to %s (body: %s)", path, repr(query)) all_headers = headers + self.additional_headers + self._auth_headers response = self.http.post(path, all_headers, **query) return response @_authentication @_log_duration - def request(self, path_segment, method="GET", headers=None, body="", + def request(self, path_segment, method="GET", headers=None, body={}, owner=None, app=None, sharing=None): """Issues an arbitrary HTTP request to the REST path segment. @@ -790,9 +814,6 @@ def request(self, path_segment, method="GET", headers=None, body="", :type app: ``string`` :param sharing: The sharing mode of the namespace (optional). :type sharing: ``string`` - :param query: All other keyword arguments, which are used as query - parameters. - :type query: ``string`` :return: The response from the server. :rtype: ``dict`` with keys ``body``, ``headers``, ``reason``, and ``status`` @@ -821,13 +842,28 @@ def request(self, path_segment, method="GET", headers=None, body="", path = self.authority \ + self._abspath(path_segment, owner=owner, app=app, sharing=sharing) + all_headers = headers + self.additional_headers + self._auth_headers - logging.debug("%s request to %s (headers: %s, body: %s)", + logger.debug("%s request to %s (headers: %s, body: %s)", method, path, str(all_headers), repr(body)) - response = self.http.request(path, - {'method': method, - 'headers': all_headers, - 'body': body}) + + if body: + body = _encode(**body) + + if method == "GET": + path = path + UrlEncoded('?' + body, skip_encode=True) + message = {'method': method, + 'headers': all_headers} + else: + message = {'method': method, + 'headers': all_headers, + 'body': body} + else: + message = {'method': method, + 'headers': all_headers} + + response = self.http.request(path, message) + return response def login(self): @@ -1065,7 +1101,7 @@ def __init__(self, message, cause): # # Encode the given kwargs as a query string. This wrapper will also _encode -# a list value as a sequence of assignemnts to the corresponding arg name, +# a list value as a sequence of assignments to the corresponding arg name, # for example an argument such as 'foo=[1,2,3]' will be encoded as # 'foo=1&foo=2&foo=3'. def _encode(**kwargs): @@ -1132,12 +1168,14 @@ class HttpLib(object): If using the default handler, SSL verification can be disabled by passing verify=False. """ - def __init__(self, custom_handler=None, verify=False, key_file=None, cert_file=None): + def __init__(self, custom_handler=None, verify=False, key_file=None, cert_file=None, context=None, retries=0, retryDelay=10): if custom_handler is None: - self.handler = handler(verify=verify, key_file=key_file, cert_file=cert_file) + self.handler = handler(verify=verify, key_file=key_file, cert_file=cert_file, context=context) else: self.handler = custom_handler self._cookies = {} + self.retries = retries + self.retryDelay = retryDelay def delete(self, url, headers=None, **kwargs): """Sends a DELETE request to a URL. @@ -1223,6 +1261,8 @@ def post(self, url, headers=None, **kwargs): headers.append(("Content-Type", "application/x-www-form-urlencoded")) body = kwargs.pop('body') + if isinstance(body, dict): + body = _encode(**body).encode('utf-8') if len(kwargs) > 0: url = url + UrlEncoded('?' + _encode(**kwargs), skip_encode=True) else: @@ -1249,7 +1289,16 @@ def request(self, url, message, **kwargs): its structure). :rtype: ``dict`` """ - response = self.handler(url, message, **kwargs) + while True: + try: + response = self.handler(url, message, **kwargs) + break + except Exception: + if self.retries <= 0: + raise + else: + time.sleep(self.retryDelay) + self.retries -= 1 response = record(response) if 400 <= response.status: raise HTTPError(response) @@ -1285,7 +1334,10 @@ def __init__(self, response, connection=None): self._buffer = b'' def __str__(self): - return self.read() + if six.PY2: + return self.read() + else: + return str(self.read(), 'UTF-8') @property def empty(self): @@ -1344,7 +1396,7 @@ def readinto(self, byte_array): return bytes_read -def handler(key_file=None, cert_file=None, timeout=None, verify=False): +def handler(key_file=None, cert_file=None, timeout=None, verify=False, context=None): """This class returns an instance of the default HTTP request handler using the values you provide. @@ -1356,6 +1408,8 @@ def handler(key_file=None, cert_file=None, timeout=None, verify=False): :type timeout: ``integer`` or "None" :param `verify`: Set to False to disable SSL verification on https connections. :type verify: ``Boolean`` + :param `context`: The SSLContext that can is used with the HTTPSConnection when verify=True is enabled and context is specified + :type context: ``SSLContext` """ def connect(scheme, host, port): @@ -1369,6 +1423,10 @@ def connect(scheme, host, port): if not verify: kwargs['context'] = ssl._create_unverified_context() + elif context: + # verify is True in elif branch and context is not None + kwargs['context'] = context + return six.moves.http_client.HTTPSConnection(host, port, **kwargs) raise ValueError("unsupported scheme: %s" % scheme) @@ -1378,7 +1436,7 @@ def request(url, message, **kwargs): head = { "Content-Length": str(len(body)), "Host": host, - "User-Agent": "splunk-sdk-python/1.6.12", + "User-Agent": "splunk-sdk-python/%s" % __version__, "Accept": "*/*", "Connection": "Close", } # defaults diff --git a/bin/splunklib/client.py b/bin/splunklib/client.py old mode 100755 new mode 100644 index de2f53a..33156bb --- a/bin/splunklib/client.py +++ b/bin/splunklib/client.py @@ -62,6 +62,7 @@ import datetime import json import logging +import re import socket from datetime import datetime, timedelta from time import sleep @@ -75,6 +76,8 @@ namespace) from .data import record +logger = logging.getLogger(__name__) + __all__ = [ "connect", "NotSupportedError", @@ -97,6 +100,7 @@ PATH_INDEXES = "data/indexes/" PATH_INPUTS = "data/inputs/" PATH_JOBS = "search/jobs/" +PATH_JOBS_V2 = "search/v2/jobs/" PATH_LOGGER = "/services/server/logger/" PATH_MESSAGES = "messages/" PATH_MODULAR_INPUTS = "data/modular-inputs" @@ -224,7 +228,10 @@ def _load_atom_entries(response): # Load the sid from the body of the given response -def _load_sid(response): +def _load_sid(response, output_mode): + if output_mode == "json": + json_obj = json.loads(response.body.read()) + return json_obj.get('sid') return _load_atom(response).response.sid @@ -295,7 +302,7 @@ def connect(**kwargs): :type port: ``integer`` :param scheme: The scheme for accessing the service (the default is "https"). :type scheme: "https" or "http" - :param verify: Enable (True) or disable (False) SSL verrification for + :param verify: Enable (True) or disable (False) SSL verification for https connections. (optional, the default is True) :type verify: ``Boolean`` :param `owner`: The owner context of the namespace (optional). @@ -318,6 +325,13 @@ def connect(**kwargs): :type username: ``string`` :param `password`: The password for the Splunk account. :type password: ``string`` + :param retires: Number of retries for each HTTP connection (optional, the default is 0). + NOTE THAT THIS MAY INCREASE THE NUMBER OF ROUND TRIP CONNECTIONS TO THE SPLUNK SERVER. + :type retries: ``int`` + :param retryDelay: How long to wait between connection attempts if `retries` > 0 (optional, defaults to 10s). + :type retryDelay: ``int`` (in seconds) + :param `context`: The SSLContext that can be used when setting verify=True (optional) + :type context: ``SSLContext`` :return: An initialized :class:`Service` connection. **Example**:: @@ -365,7 +379,7 @@ class Service(_BaseService): :type port: ``integer`` :param scheme: The scheme for accessing the service (the default is "https"). :type scheme: "https" or "http" - :param verify: Enable (True) or disable (False) SSL verrification for + :param verify: Enable (True) or disable (False) SSL verification for https connections. (optional, the default is True) :type verify: ``Boolean`` :param `owner`: The owner context of the namespace (optional; use "-" for wildcard). @@ -384,6 +398,11 @@ class Service(_BaseService): :param `password`: The password, which is used to authenticate the Splunk instance. :type password: ``string`` + :param retires: Number of retries for each HTTP connection (optional, the default is 0). + NOTE THAT THIS MAY INCREASE THE NUMBER OF ROUND TRIP CONNECTIONS TO THE SPLUNK SERVER. + :type retries: ``int`` + :param retryDelay: How long to wait between connection attempts if `retries` > 0 (optional, defaults to 10s). + :type retryDelay: ``int`` (in seconds) :return: A :class:`Service` instance. **Example**:: @@ -401,6 +420,8 @@ class Service(_BaseService): def __init__(self, **kwargs): super(Service, self).__init__(**kwargs) self._splunk_version = None + self._kvstore_owner = None + self._instance_type = None @property def apps(self): @@ -552,6 +573,8 @@ def parse(self, query, **kwargs): :type kwargs: ``dict`` :return: A semantic map of the parsed search query. """ + if not self.disable_v2_api: + return self.post("search/v2/parser", q=query, **kwargs) return self.get("search/parser", q=query, **kwargs) def restart(self, timeout=None): @@ -673,12 +696,50 @@ def splunk_version(self): self._splunk_version = tuple([int(p) for p in self.info['version'].split('.')]) return self._splunk_version + @property + def splunk_instance(self): + if self._instance_type is None : + splunk_info = self.info; + if hasattr(splunk_info, 'instance_type') : + self._instance_type = splunk_info['instance_type'] + else: + self._instance_type = '' + return self._instance_type + + @property + def disable_v2_api(self): + if self.splunk_instance.lower() == 'cloud': + return self.splunk_version < (9,0,2209) + return self.splunk_version < (9,0,2) + + @property + def kvstore_owner(self): + """Returns the KVStore owner for this instance of Splunk. + + By default is the kvstore owner is not set, it will return "nobody" + :return: A string with the KVStore owner. + """ + if self._kvstore_owner is None: + self._kvstore_owner = "nobody" + return self._kvstore_owner + + @kvstore_owner.setter + def kvstore_owner(self, value): + """ + kvstore is refreshed, when the owner value is changed + """ + self._kvstore_owner = value + self.kvstore + @property def kvstore(self): """Returns the collection of KV Store collections. + sets the owner for the namespace, before retrieving the KVStore Collection + :return: A :class:`KVStoreCollections` collection of :class:`KVStoreCollection` entities. """ + self.namespace['owner'] = self.kvstore_owner return KVStoreCollections(self) @property @@ -699,7 +760,26 @@ class Endpoint(object): """ def __init__(self, service, path): self.service = service - self.path = path if path.endswith('/') else path + '/' + self.path = path + + def get_api_version(self, path): + """Return the API version of the service used in the provided path. + + Args: + path (str): A fully-qualified endpoint path (for example, "/services/search/jobs"). + + Returns: + int: Version of the API (for example, 1) + """ + # Default to v1 if undefined in the path + # For example, "/services/search/jobs" is using API v1 + api_version = 1 + + versionSearch = re.search('(?:servicesNS\/[^/]+\/[^/]+|services)\/[^/]+\/v(\d+)\/', path) + if versionSearch: + api_version = int(versionSearch.group(1)) + + return api_version def get(self, path_segment="", owner=None, app=None, sharing=None, **query): """Performs a GET operation on the path segment relative to this endpoint. @@ -757,10 +837,28 @@ def get(self, path_segment="", owner=None, app=None, sharing=None, **query): if path_segment.startswith('/'): path = path_segment else: + if not self.path.endswith('/') and path_segment != "": + self.path = self.path + '/' path = self.service._abspath(self.path + path_segment, owner=owner, app=app, sharing=sharing) # ^-- This was "%s%s" % (self.path, path_segment). # That doesn't work, because self.path may be UrlEncoded. + + # Get the API version from the path + api_version = self.get_api_version(path) + + # Search API v2+ fallback to v1: + # - In v2+, /results_preview, /events and /results do not support search params. + # - Fallback from v2+ to v1 if Splunk Version is < 9. + # if api_version >= 2 and ('search' in query and path.endswith(tuple(["results_preview", "events", "results"])) or self.service.splunk_version < (9,)): + # path = path.replace(PATH_JOBS_V2, PATH_JOBS) + + if api_version == 1: + if isinstance(path, UrlEncoded): + path = UrlEncoded(path.replace(PATH_JOBS_V2, PATH_JOBS), skip_encode=True) + else: + path = path.replace(PATH_JOBS_V2, PATH_JOBS) + return self.service.get(path, owner=owner, app=app, sharing=sharing, **query) @@ -813,11 +911,29 @@ def post(self, path_segment="", owner=None, app=None, sharing=None, **query): apps.get('nonexistant/path') # raises HTTPError s.logout() apps.get() # raises AuthenticationError - """ + """ if path_segment.startswith('/'): path = path_segment else: + if not self.path.endswith('/') and path_segment != "": + self.path = self.path + '/' path = self.service._abspath(self.path + path_segment, owner=owner, app=app, sharing=sharing) + + # Get the API version from the path + api_version = self.get_api_version(path) + + # Search API v2+ fallback to v1: + # - In v2+, /results_preview, /events and /results do not support search params. + # - Fallback from v2+ to v1 if Splunk Version is < 9. + # if api_version >= 2 and ('search' in query and path.endswith(tuple(["results_preview", "events", "results"])) or self.service.splunk_version < (9,)): + # path = path.replace(PATH_JOBS_V2, PATH_JOBS) + + if api_version == 1: + if isinstance(path, UrlEncoded): + path = UrlEncoded(path.replace(PATH_JOBS_V2, PATH_JOBS), skip_encode=True) + else: + path = path.replace(PATH_JOBS_V2, PATH_JOBS) + return self.service.post(path, owner=owner, app=app, sharing=sharing, **query) @@ -828,35 +944,24 @@ class Entity(Endpoint): ``Entity`` provides the majority of functionality required by entities. Subclasses only implement the special cases for individual entities. - For example for deployment serverclasses, the subclass makes whitelists and - blacklists into Python lists. + For example for saved searches, the subclass makes fields like ``action.email``, + ``alert_type``, and ``search`` available. An ``Entity`` is addressed like a dictionary, with a few extensions, - so the following all work:: - - ent['email.action'] - ent['disabled'] - ent['whitelist'] - - Many endpoints have values that share a prefix, such as - ``email.to``, ``email.action``, and ``email.subject``. You can extract - the whole fields, or use the key ``email`` to get a dictionary of - all the subelements. That is, ``ent['email']`` returns a - dictionary with the keys ``to``, ``action``, ``subject``, and so on. If - there are multiple levels of dots, each level is made into a - subdictionary, so ``email.body.salutation`` can be accessed at - ``ent['email']['body']['salutation']`` or - ``ent['email.body.salutation']``. + so the following all work, for example in saved searches:: + + ent['action.email'] + ent['alert_type'] + ent['search'] You can also access the fields as though they were the fields of a Python object, as in:: - ent.email.action - ent.disabled - ent.whitelist + ent.alert_type + ent.search However, because some of the field names are not valid Python identifiers, - the dictionary-like syntax is preferrable. + the dictionary-like syntax is preferable. The state of an :class:`Entity` object is cached, so accessing a field does not contact the server. If you think the values on the @@ -953,7 +1058,10 @@ def __getitem__(self, key): def _load_atom_entry(self, response): elem = _load_atom(response, XNAME_ENTRY) if isinstance(elem, list): - raise AmbiguousReferenceException("Fetch from server returned multiple entries for name %s." % self.name) + apps = [ele.entry.content.get('eai:appName') for ele in elem] + + raise AmbiguousReferenceException( + "Fetch from server returned multiple entries for name '%s' in apps %s." % (elem[0].entry.title, apps)) else: return elem.entry @@ -1059,8 +1167,6 @@ def content(self): def disable(self): """Disables the entity at this endpoint.""" self.post("disable") - if self.service.restart_required: - self.service.restart(120) return self def enable(self): @@ -1110,6 +1216,36 @@ def reload(self): self.post("_reload") return self + def acl_update(self, **kwargs): + """To update Access Control List (ACL) properties for an endpoint. + + :param kwargs: Additional entity-specific arguments (required). + + - "owner" (``string``): The Splunk username, such as "admin". A value of "nobody" means no specific user (required). + + - "sharing" (``string``): A mode that indicates how the resource is shared. The sharing mode can be "user", "app", "global", or "system" (required). + + :type kwargs: ``dict`` + + **Example**:: + + import splunklib.client as client + service = client.connect(...) + saved_search = service.saved_searches["name"] + saved_search.acl_update(sharing="app", owner="nobody", app="search", **{"perms.read": "admin, nobody"}) + """ + if "body" not in kwargs: + kwargs = {"body": kwargs} + + if "sharing" not in kwargs["body"]: + raise ValueError("Required argument 'sharing' is missing.") + if "owner" not in kwargs["body"]: + raise ValueError("Required argument 'owner' is missing.") + + self.post("acl", **kwargs) + self.refresh() + return self + @property def state(self): """Returns the entity's state record. @@ -1444,7 +1580,7 @@ def iter(self, offset=0, count=None, pagesize=None, **kwargs): if pagesize is None or N < pagesize: break offset += N - logging.debug("pagesize=%d, fetched=%d, offset=%d, N=%d, kwargs=%s", pagesize, fetched, offset, N, kwargs) + logger.debug("pagesize=%d, fetched=%d, offset=%d, N=%d, kwargs=%s", pagesize, fetched, offset, N, kwargs) # kwargs: count, offset, search, sort_dir, sort_key, sort_mode def list(self, count=None, **kwargs): @@ -1812,8 +1948,6 @@ class StoragePasswords(Collection): instance. Retrieve this collection using :meth:`Service.storage_passwords`. """ def __init__(self, service): - if service.namespace.owner == '-' or service.namespace.app == '-': - raise ValueError("StoragePasswords cannot have wildcards in namespace.") super(StoragePasswords, self).__init__(service, PATH_STORAGE_PASSWORDS, item=StoragePassword) def create(self, password, username, realm=None): @@ -1831,6 +1965,9 @@ def create(self, password, username, realm=None): :return: The :class:`StoragePassword` object created. """ + if self.service.namespace.owner == '-' or self.service.namespace.app == '-': + raise ValueError("While creating StoragePasswords, namespace cannot have wildcards.") + if not isinstance(username, six.string_types): raise ValueError("Invalid name: %s" % repr(username)) @@ -1862,6 +1999,9 @@ def delete(self, username, realm=None): :return: The `StoragePassword` collection. :rtype: ``self`` """ + if self.service.namespace.owner == '-' or self.service.namespace.app == '-': + raise ValueError("app context must be specified when removing a password.") + if realm is None: # This case makes the username optional, so # the full name can be passed in as realm. @@ -1872,7 +2012,7 @@ def delete(self, username, realm=None): name = UrlEncoded(realm, encode_slash=True) + ":" + UrlEncoded(username, encode_slash=True) # Append the : expected at the end of the name - if name[-1] is not ":": + if name[-1] != ":": name = name + ":" return Collection.delete(self, name) @@ -2086,10 +2226,6 @@ def submit(self, event, host=None, source=None, sourcetype=None): if source is not None: args['source'] = source if sourcetype is not None: args['sourcetype'] = sourcetype - # The reason we use service.request directly rather than POST - # is that we are not sending a POST request encoded using - # x-www-form-urlencoded (as we do not have a key=value body), - # because we aren't really sending a "form". self.service.post(PATH_RECEIVERS_SIMPLE, body=event, **args) return self @@ -2517,9 +2653,9 @@ def list(self, *kinds, **kwargs): kinds = self.kinds if len(kinds) == 1: kind = kinds[0] - logging.debug("Inputs.list taking short circuit branch for single kind.") + logger.debug("Inputs.list taking short circuit branch for single kind.") path = self.kindpath(kind) - logging.debug("Path for inputs: %s", path) + logger.debug("Path for inputs: %s", path) try: path = UrlEncoded(path, skip_encode=True) response = self.get(path, **kwargs) @@ -2630,7 +2766,14 @@ def oneshot(self, path, **kwargs): class Job(Entity): """This class represents a search job.""" def __init__(self, service, sid, **kwargs): - path = PATH_JOBS + sid + # Default to v2 in Splunk Version 9+ + path = "{path}{sid}" + # Formatting path based on the Splunk Version + if service.disable_v2_api: + path = path.format(path=PATH_JOBS, sid=sid) + else: + path = path.format(path=PATH_JOBS_V2, sid=sid) + Entity.__init__(self, service, path, skip_refresh=True, **kwargs) self.sid = sid @@ -2684,7 +2827,11 @@ def events(self, **kwargs): :return: The ``InputStream`` IO handle to this job's events. """ kwargs['segmentation'] = kwargs.get('segmentation', 'none') - return self.get("events", **kwargs).body + + # Search API v1(GET) and v2(POST) + if self.service.disable_v2_api: + return self.get("events", **kwargs).body + return self.post("events", **kwargs).body def finalize(self): """Stops the job and provides intermediate results for retrieval. @@ -2737,9 +2884,8 @@ def pause(self): return self def results(self, **query_params): - """Returns a streaming handle to this job's search results. To get a - nice, Pythonic iterator, pass the handle to :class:`splunklib.results.ResultsReader`, - as in:: + """Returns a streaming handle to this job's search results. To get a nice, Pythonic iterator, pass the handle + to :class:`splunklib.results.JSONResultsReader` along with the query param "output_mode='json'", as in:: import splunklib.client as client import splunklib.results as results @@ -2748,7 +2894,7 @@ def results(self, **query_params): job = service.jobs.create("search * | head 5") while not job.is_done(): sleep(.2) - rr = results.ResultsReader(job.results()) + rr = results.JSONResultsReader(job.results(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -2773,24 +2919,26 @@ def results(self, **query_params): :return: The ``InputStream`` IO handle to this job's results. """ query_params['segmentation'] = query_params.get('segmentation', 'none') - return self.get("results", **query_params).body + + # Search API v1(GET) and v2(POST) + if self.service.disable_v2_api: + return self.get("results", **query_params).body + return self.post("results", **query_params).body def preview(self, **query_params): """Returns a streaming handle to this job's preview search results. - Unlike :class:`splunklib.results.ResultsReader`, which requires a job to - be finished to - return any results, the ``preview`` method returns any results that have - been generated so far, whether the job is running or not. The - returned search results are the raw data from the server. Pass - the handle returned to :class:`splunklib.results.ResultsReader` to get a - nice, Pythonic iterator over objects, as in:: + Unlike :class:`splunklib.results.JSONResultsReader`along with the query param "output_mode='json'", + which requires a job to be finished to return any results, the ``preview`` method returns any results that + have been generated so far, whether the job is running or not. The returned search results are the raw data + from the server. Pass the handle returned to :class:`splunklib.results.JSONResultsReader` to get a nice, + Pythonic iterator over objects, as in:: import splunklib.client as client import splunklib.results as results service = client.connect(...) job = service.jobs.create("search * | head 5") - rr = results.ResultsReader(job.preview()) + rr = results.JSONResultsReader(job.preview(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -2816,7 +2964,11 @@ def preview(self, **query_params): :return: The ``InputStream`` IO handle to this job's preview results. """ query_params['segmentation'] = query_params.get('segmentation', 'none') - return self.get("results_preview", **query_params).body + + # Search API v1(GET) and v2(POST) + if self.service.disable_v2_api: + return self.get("results_preview", **query_params).body + return self.post("results_preview", **query_params).body def searchlog(self, **kwargs): """Returns a streaming handle to this job's search log. @@ -2905,7 +3057,12 @@ class Jobs(Collection): """This class represents a collection of search jobs. Retrieve this collection using :meth:`Service.jobs`.""" def __init__(self, service): - Collection.__init__(self, service, PATH_JOBS, item=Job) + # Splunk 9 introduces the v2 endpoint + if not service.disable_v2_api: + path = PATH_JOBS_V2 + else: + path = PATH_JOBS + Collection.__init__(self, service, path, item=Job) # The count value to say list all the contents of this # Collection is 0, not -1 as it is on most. self.null_count = 0 @@ -2941,19 +3098,19 @@ def create(self, query, **kwargs): if kwargs.get("exec_mode", None) == "oneshot": raise TypeError("Cannot specify exec_mode=oneshot; use the oneshot method instead.") response = self.post(search=query, **kwargs) - sid = _load_sid(response) + sid = _load_sid(response, kwargs.get("output_mode", None)) return Job(self.service, sid) def export(self, query, **params): - """Runs a search and immediately starts streaming preview events. - This method returns a streaming handle to this job's events as an XML - document from the server. To parse this stream into usable Python objects, - pass the handle to :class:`splunklib.results.ResultsReader`:: + """Runs a search and immediately starts streaming preview events. This method returns a streaming handle to + this job's events as an XML document from the server. To parse this stream into usable Python objects, + pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param + "output_mode='json'":: import splunklib.client as client import splunklib.results as results service = client.connect(...) - rr = results.ResultsReader(service.jobs.export("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.export("search * | head 5",output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -3002,14 +3159,14 @@ def itemmeta(self): def oneshot(self, query, **params): """Run a oneshot search and returns a streaming handle to the results. - The ``InputStream`` object streams XML fragments from the server. To - parse this stream into usable Python objects, - pass the handle to :class:`splunklib.results.ResultsReader`:: + The ``InputStream`` object streams fragments from the server. To parse this stream into usable Python + objects, pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param + "output_mode='json'" :: import splunklib.client as client import splunklib.results as results service = client.connect(...) - rr = results.ResultsReader(service.jobs.oneshot("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.oneshot("search * | head 5",output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -3157,7 +3314,7 @@ def dispatch(self, **kwargs): :return: The :class:`Job`. """ response = self.post("dispatch", **kwargs) - sid = _load_sid(response) + sid = _load_sid(response, kwargs.get("output_mode", None)) return Job(self.service, sid) @property @@ -3181,12 +3338,15 @@ def fired_alerts(self): item=AlertGroup) return c - def history(self): + def history(self, **kwargs): """Returns a list of search jobs corresponding to this saved search. + :param `kwargs`: Additional arguments (optional). + :type kwargs: ``dict`` + :return: A list of :class:`Job` objects. """ - response = self.get("history") + response = self.get("history", **kwargs) entries = _load_atom_entries(response) if entries is None: return [] jobs = [] @@ -3549,13 +3709,20 @@ class KVStoreCollections(Collection): def __init__(self, service): Collection.__init__(self, service, 'storage/collections/config', item=KVStoreCollection) - def create(self, name, indexes = {}, fields = {}, **kwargs): + def __getitem__(self, item): + res = Collection.__getitem__(self, item) + for k, v in res.content.items(): + if "accelerated_fields" in k: + res.content[k] = json.loads(v) + return res + + def create(self, name, accelerated_fields={}, fields={}, **kwargs): """Creates a KV Store Collection. :param name: name of collection to create :type name: ``string`` - :param indexes: dictionary of index definitions - :type indexes: ``dict`` + :param accelerated_fields: dictionary of accelerated_fields definitions + :type accelerated_fields: ``dict`` :param fields: dictionary of field definitions :type fields: ``dict`` :param kwargs: a dictionary of additional parameters specifying indexes and field definitions @@ -3563,10 +3730,10 @@ def create(self, name, indexes = {}, fields = {}, **kwargs): :return: Result of POST request """ - for k, v in six.iteritems(indexes): + for k, v in six.iteritems(accelerated_fields): if isinstance(v, dict): v = json.dumps(v) - kwargs['index.' + k] = v + kwargs['accelerated_fields.' + k] = v for k, v in six.iteritems(fields): kwargs['field.' + k] = v return self.post(name=name, **kwargs) @@ -3580,18 +3747,20 @@ def data(self): """ return KVStoreCollectionData(self) - def update_index(self, name, value): - """Changes the definition of a KV Store index. + def update_accelerated_field(self, name, value): + """Changes the definition of a KV Store accelerated_field. - :param name: name of index to change + :param name: name of accelerated_fields to change :type name: ``string`` - :param value: new index definition - :type value: ``dict`` or ``string`` + :param value: new accelerated_fields definition + :type value: ``dict`` :return: Result of POST request """ kwargs = {} - kwargs['index.' + name] = value if isinstance(value, basestring) else json.dumps(value) + if isinstance(value, dict): + value = json.dumps(value) + kwargs['accelerated_fields.' + name] = value return self.post(**kwargs) def update_field(self, name, value): @@ -3619,7 +3788,7 @@ def __init__(self, collection): self.service = collection.service self.collection = collection self.owner, self.app, self.sharing = collection._proper_namespace() - self.path = 'storage/collections/data/' + UrlEncoded(self.collection.name) + '/' + self.path = 'storage/collections/data/' + UrlEncoded(self.collection.name, encode_slash=True) + '/' def _get(self, url, **kwargs): return self.service.get(self.path + url, owner=self.owner, app=self.app, sharing=self.sharing, **kwargs) @@ -3640,6 +3809,11 @@ def query(self, **query): :return: Array of documents retrieved by query. :rtype: ``array`` """ + + for key, value in query.items(): + if isinstance(query[key], dict): + query[key] = json.dumps(value) + return json.loads(self._get('', **query).body.read().decode('utf-8')) def query_by_id(self, id): @@ -3652,7 +3826,7 @@ def query_by_id(self, id): :return: Document with id :rtype: ``dict`` """ - return json.loads(self._get(UrlEncoded(str(id))).body.read().decode('utf-8')) + return json.loads(self._get(UrlEncoded(str(id), encode_slash=True)).body.read().decode('utf-8')) def insert(self, data): """ @@ -3664,6 +3838,8 @@ def insert(self, data): :return: _id of inserted object :rtype: ``dict`` """ + if isinstance(data, dict): + data = json.dumps(data) return json.loads(self._post('', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8')) def delete(self, query=None): @@ -3686,7 +3862,7 @@ def delete_by_id(self, id): :return: Result of DELETE request """ - return self._delete(UrlEncoded(str(id))) + return self._delete(UrlEncoded(str(id), encode_slash=True)) def update(self, id, data): """ @@ -3700,7 +3876,9 @@ def update(self, id, data): :return: id of replaced document :rtype: ``dict`` """ - return json.loads(self._post(UrlEncoded(str(id)), headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8')) + if isinstance(data, dict): + data = json.dumps(data) + return json.loads(self._post(UrlEncoded(str(id), encode_slash=True), headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8')) def batch_find(self, *dbqueries): """ @@ -3734,4 +3912,4 @@ def batch_save(self, *documents): data = json.dumps(documents) - return json.loads(self._post('batch_save', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8')) + return json.loads(self._post('batch_save', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8')) \ No newline at end of file diff --git a/bin/splunklib/data.py b/bin/splunklib/data.py old mode 100755 new mode 100644 index dedbb33..f9ffb86 --- a/bin/splunklib/data.py +++ b/bin/splunklib/data.py @@ -161,8 +161,8 @@ def load_value(element, nametable=None): text = element.text if text is None: return None - text = text.strip() - if len(text) == 0: + + if len(text.strip()) == 0: return None return text diff --git a/bin/splunklib/modularinput/__init__.py b/bin/splunklib/modularinput/__init__.py old mode 100755 new mode 100644 diff --git a/bin/splunklib/modularinput/argument.py b/bin/splunklib/modularinput/argument.py old mode 100755 new mode 100644 diff --git a/bin/splunklib/modularinput/event.py b/bin/splunklib/modularinput/event.py old mode 100755 new mode 100644 diff --git a/bin/splunklib/modularinput/event_writer.py b/bin/splunklib/modularinput/event_writer.py old mode 100755 new mode 100644 index 4d0b21f..5f8c5aa --- a/bin/splunklib/modularinput/event_writer.py +++ b/bin/splunklib/modularinput/event_writer.py @@ -15,8 +15,7 @@ from __future__ import absolute_import import sys -from io import TextIOWrapper, TextIOBase -from splunklib.six import ensure_text +from splunklib.six import ensure_str from .event import ET try: @@ -43,15 +42,8 @@ def __init__(self, output = sys.stdout, error = sys.stderr): :param output: Where to write the output; defaults to sys.stdout. :param error: Where to write any errors; defaults to sys.stderr. """ - if isinstance(output, TextIOBase): - self._out = output - else: - self._out = TextIOWrapper(output) - - if isinstance(error, TextIOBase): - self._err = error - else: - self._err = TextIOWrapper(error) + self._out = output + self._err = error # has the opening tag been written yet? self.header_written = False @@ -63,7 +55,7 @@ def write_event(self, event): """ if not self.header_written: - self._out.write(ensure_text("")) + self._out.write("") self.header_written = True event.write_to(self._out) @@ -71,10 +63,12 @@ def write_event(self, event): def log(self, severity, message): """Logs messages about the state of this modular input to Splunk. These messages will show up in Splunk's internal logs. + :param severity: ``string``, severity of message, see severities defined as class constants. :param message: ``string``, message to log. """ - self._err.write(ensure_text("%s %s\n" % (severity, message))) + + self._err.write("%s %s\n" % (severity, message)) self._err.flush() def write_xml_document(self, document): @@ -83,11 +77,11 @@ def write_xml_document(self, document): :param document: An ``ElementTree`` object. """ - data = ET.tostring(document) - self._out.write(ensure_text(data)) + self._out.write(ensure_str(ET.tostring(document))) self._out.flush() def close(self): """Write the closing tag to make this XML well formed.""" - self._out.write(ensure_text("")) + if self.header_written: + self._out.write("") self._out.flush() diff --git a/bin/splunklib/modularinput/input_definition.py b/bin/splunklib/modularinput/input_definition.py old mode 100755 new mode 100644 diff --git a/bin/splunklib/modularinput/scheme.py b/bin/splunklib/modularinput/scheme.py old mode 100755 new mode 100644 diff --git a/bin/splunklib/modularinput/script.py b/bin/splunklib/modularinput/script.py old mode 100755 new mode 100644 index a254dfa..8595dc4 --- a/bin/splunklib/modularinput/script.py +++ b/bin/splunklib/modularinput/script.py @@ -105,8 +105,7 @@ def run_script(self, args, event_writer, input_stream): return 1 except Exception as e: - err_string = EventWriter.ERROR + str(e) - event_writer._err.write(err_string) + event_writer.log(EventWriter.ERROR, str(e)) return 1 @property diff --git a/bin/splunklib/modularinput/utils.py b/bin/splunklib/modularinput/utils.py old mode 100755 new mode 100644 index 853694a..3d42b63 --- a/bin/splunklib/modularinput/utils.py +++ b/bin/splunklib/modularinput/utils.py @@ -64,11 +64,14 @@ def parse_parameters(param_node): def parse_xml_data(parent_node, child_node_tag): data = {} for child in parent_node: + child_name = child.get("name") if child.tag == child_node_tag: if child_node_tag == "stanza": - data[child.get("name")] = {} + data[child_name] = { + "__app": child.get("app", None) + } for param in child: - data[child.get("name")][param.get("name")] = parse_parameters(param) + data[child_name][param.get("name")] = parse_parameters(param) elif "item" == parent_node.tag: - data[child.get("name")] = parse_parameters(child) + data[child_name] = parse_parameters(child) return data diff --git a/bin/splunklib/modularinput/validation_definition.py b/bin/splunklib/modularinput/validation_definition.py old mode 100755 new mode 100644 diff --git a/bin/splunklib/ordereddict.py b/bin/splunklib/ordereddict.py deleted file mode 100755 index 9495566..0000000 --- a/bin/splunklib/ordereddict.py +++ /dev/null @@ -1,128 +0,0 @@ -# Copyright (c) 2009 Raymond Hettinger -# -# Permission is hereby granted, free of charge, to any person -# obtaining a copy of this software and associated documentation files -# (the "Software"), to deal in the Software without restriction, -# including without limitation the rights to use, copy, modify, merge, -# publish, distribute, sublicense, and/or sell copies of the Software, -# and to permit persons to whom the Software is furnished to do so, -# subject to the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES -# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT -# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -# OTHER DEALINGS IN THE SOFTWARE. - -from UserDict import DictMixin - - -class OrderedDict(dict, DictMixin): - - def __init__(self, *args, **kwds): - if len(args) > 1: - raise TypeError('expected at most 1 arguments, got %d' % len(args)) - try: - self.__end - except AttributeError: - self.clear() - self.update(*args, **kwds) - - def clear(self): - self.__end = end = [] - end += [None, end, end] # sentinel node for doubly linked list - self.__map = {} # key --> [key, prev, next] - dict.clear(self) - - def __setitem__(self, key, value): - if key not in self: - end = self.__end - curr = end[1] - curr[2] = end[1] = self.__map[key] = [key, curr, end] - dict.__setitem__(self, key, value) - - def __delitem__(self, key): - dict.__delitem__(self, key) - key, prev, next = self.__map.pop(key) - prev[2] = next - next[1] = prev - - def __iter__(self): - end = self.__end - curr = end[2] - while curr is not end: - yield curr[0] - curr = curr[2] - - def __reversed__(self): - end = self.__end - curr = end[1] - while curr is not end: - yield curr[0] - curr = curr[1] - - def popitem(self, last=True): - if not self: - raise KeyError('dictionary is empty') - if last: - key = reversed(self).next() - else: - key = iter(self).next() - value = self.pop(key) - return key, value - - def __reduce__(self): - items = [[k, self[k]] for k in self] - tmp = self.__map, self.__end - del self.__map, self.__end - inst_dict = vars(self).copy() - self.__map, self.__end = tmp - if inst_dict: - return (self.__class__, (items,), inst_dict) - return self.__class__, (items,) - - def keys(self): - return list(self) - - setdefault = DictMixin.setdefault - update = DictMixin.update - pop = DictMixin.pop - values = DictMixin.values - items = DictMixin.items - iterkeys = DictMixin.iterkeys - itervalues = DictMixin.itervalues - iteritems = DictMixin.iteritems - - def __repr__(self): - if not self: - return '%s()' % (self.__class__.__name__,) - return '%s(%r)' % (self.__class__.__name__, self.items()) - - def copy(self): - return self.__class__(self) - - @classmethod - def fromkeys(cls, iterable, value=None): - d = cls() - for key in iterable: - d[key] = value - return d - - def __eq__(self, other): - if isinstance(other, OrderedDict): - if len(self) != len(other): - return False - for p, q in zip(self.items(), other.items()): - if p != q: - return False - return True - return dict.__eq__(self, other) - - def __ne__(self, other): - return not self == other diff --git a/bin/splunklib/results.py b/bin/splunklib/results.py old mode 100755 new mode 100644 index 20501c5..8543ab0 --- a/bin/splunklib/results.py +++ b/bin/splunklib/results.py @@ -23,7 +23,7 @@ accessing search results while avoiding buffering the result set, which can be very large. -To use the reader, instantiate :class:`ResultsReader` on a search result stream +To use the reader, instantiate :class:`JSONResultsReader` on a search result stream as follows::: reader = ResultsReader(result_stream) @@ -34,18 +34,19 @@ from __future__ import absolute_import -from io import BytesIO +from io import BufferedReader, BytesIO from splunklib import six + +from splunklib.six import deprecated + try: import xml.etree.cElementTree as et except: import xml.etree.ElementTree as et -try: - from collections import OrderedDict # must be python 2.7 -except ImportError: - from .ordereddict import OrderedDict +from collections import OrderedDict +from json import loads as json_loads try: from splunklib.six.moves import cStringIO as StringIO @@ -54,9 +55,11 @@ __all__ = [ "ResultsReader", - "Message" + "Message", + "JSONResultsReader" ] + class Message(object): """This class represents informational messages that Splunk interleaves in the results stream. @@ -67,6 +70,7 @@ class Message(object): m = Message("DEBUG", "There's something in that variable...") """ + def __init__(self, type_, message): self.type = type_ self.message = message @@ -80,6 +84,7 @@ def __eq__(self, other): def __hash__(self): return hash((self.type, self.message)) + class _ConcatenatedStream(object): """Lazily concatenate zero or more streams into a stream. @@ -92,6 +97,7 @@ class _ConcatenatedStream(object): s = _ConcatenatedStream(StringIO("abc"), StringIO("def")) assert s.read() == "abcdef" """ + def __init__(self, *streams): self.streams = list(streams) @@ -110,6 +116,7 @@ def read(self, n=None): del self.streams[0] return response + class _XMLDTDFilter(object): """Lazily remove all XML DTDs from a stream. @@ -123,6 +130,7 @@ class _XMLDTDFilter(object): s = _XMLDTDFilter("") assert s.read() == "" """ + def __init__(self, stream): self.stream = stream @@ -153,6 +161,8 @@ def read(self, n=None): n -= 1 return response + +@deprecated("Use the JSONResultsReader function instead in conjuction with the 'output_mode' query param set to 'json'") class ResultsReader(object): """This class returns dictionaries and Splunk messages from an XML results stream. @@ -180,6 +190,7 @@ class ResultsReader(object): print "Message: %s" % result print "is_preview = %s " % reader.is_preview """ + # Be sure to update the docstrings of client.Jobs.oneshot, # client.Job.results_preview and client.Job.results to match any # changes made to ResultsReader. @@ -260,16 +271,16 @@ def _parse_results(self, stream): # So we'll define it here def __itertext(self): - tag = self.tag - if not isinstance(tag, six.string_types) and tag is not None: - return - if self.text: - yield self.text - for e in self: - for s in __itertext(e): - yield s - if e.tail: - yield e.tail + tag = self.tag + if not isinstance(tag, six.string_types) and tag is not None: + return + if self.text: + yield self.text + for e in self: + for s in __itertext(e): + yield s + if e.tail: + yield e.tail text = "".join(__itertext(elem)) values.append(text) @@ -291,5 +302,72 @@ def __itertext(self): raise +class JSONResultsReader(object): + """This class returns dictionaries and Splunk messages from a JSON results + stream. + ``JSONResultsReader`` is iterable, and returns a ``dict`` for results, or a + :class:`Message` object for Splunk messages. This class has one field, + ``is_preview``, which is ``True`` when the results are a preview from a + running search, or ``False`` when the results are from a completed search. + + This function has no network activity other than what is implicit in the + stream it operates on. + + :param `stream`: The stream to read from (any object that supports``.read()``). + + **Example**:: + + import results + response = ... # the body of an HTTP response + reader = results.JSONResultsReader(response) + for result in reader: + if isinstance(result, dict): + print "Result: %s" % result + elif isinstance(result, results.Message): + print "Message: %s" % result + print "is_preview = %s " % reader.is_preview + """ + + # Be sure to update the docstrings of client.Jobs.oneshot, + # client.Job.results_preview and client.Job.results to match any + # changes made to JSONResultsReader. + # + # This wouldn't be a class, just the _parse_results function below, + # except that you cannot get the current generator inside the + # function creating that generator. Thus it's all wrapped up for + # the sake of one field. + def __init__(self, stream): + # The search/jobs/exports endpoint, when run with + # earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of + # JSON documents, each containing a result, as opposed to one + # results element containing lots of results. + stream = BufferedReader(stream) + self.is_preview = None + self._gen = self._parse_results(stream) + + def __iter__(self): + return self + + def next(self): + return next(self._gen) + __next__ = next + def _parse_results(self, stream): + """Parse results and messages out of *stream*.""" + for line in stream.readlines(): + strip_line = line.strip() + if strip_line.__len__() == 0: continue + parsed_line = json_loads(strip_line) + if "preview" in parsed_line: + self.is_preview = parsed_line["preview"] + if "messages" in parsed_line and parsed_line["messages"].__len__() > 0: + for message in parsed_line["messages"]: + msg_type = message.get("type", "Unknown Message Type") + text = message.get("text") + yield Message(msg_type, text) + if "result" in parsed_line: + yield parsed_line["result"] + if "results" in parsed_line: + for result in parsed_line["results"]: + yield result diff --git a/bin/splunklib/searchcommands/__init__.py b/bin/splunklib/searchcommands/__init__.py old mode 100755 new mode 100644 index c56c510..8a92903 --- a/bin/splunklib/searchcommands/__init__.py +++ b/bin/splunklib/searchcommands/__init__.py @@ -134,9 +134,13 @@ .. topic:: References - 1. `Search command style guide `__ + 1. `Custom Search Command manual: `__ - 2. `Commands.conf.spec `_ + 2. `Create Custom Search Commands with commands.conf.spec `_ + + 3. `Configure seach assistant with searchbnf.conf `_ + + 4. `Control search distribution with distsearch.conf `_ """ diff --git a/bin/splunklib/searchcommands/decorators.py b/bin/splunklib/searchcommands/decorators.py old mode 100755 new mode 100644 index 36590a7..d8b3f48 --- a/bin/splunklib/searchcommands/decorators.py +++ b/bin/splunklib/searchcommands/decorators.py @@ -17,10 +17,7 @@ from __future__ import absolute_import, division, print_function, unicode_literals from splunklib import six -try: - from collections import OrderedDict # must be python 2.7 -except ImportError: - from ..ordereddict import OrderedDict +from collections import OrderedDict # must be python 2.7 from inspect import getmembers, isclass, isfunction from splunklib.six.moves import map as imap diff --git a/bin/splunklib/searchcommands/environment.py b/bin/splunklib/searchcommands/environment.py old mode 100755 new mode 100644 diff --git a/bin/splunklib/searchcommands/eventing_command.py b/bin/splunklib/searchcommands/eventing_command.py old mode 100755 new mode 100644 index 1481cee..27dc13a --- a/bin/splunklib/searchcommands/eventing_command.py +++ b/bin/splunklib/searchcommands/eventing_command.py @@ -16,6 +16,7 @@ from __future__ import absolute_import, division, print_function, unicode_literals +from splunklib import six from splunklib.six.moves import map as imap from .decorators import ConfigurationSetting @@ -135,8 +136,14 @@ def fix_up(cls, command): raise AttributeError('No EventingCommand.transform override') SearchCommand.ConfigurationSettings.fix_up(command) + # TODO: Stop looking like a dictionary because we don't obey the semantics + # N.B.: Does not use Python 2 dict copy semantics def iteritems(self): iteritems = SearchCommand.ConfigurationSettings.iteritems(self) return imap(lambda name_value: (name_value[0], 'events' if name_value[0] == 'type' else name_value[1]), iteritems) + # N.B.: Does not use Python 3 dict view semantics + if not six.PY2: + items = iteritems + # endregion diff --git a/bin/splunklib/searchcommands/external_search_command.py b/bin/splunklib/searchcommands/external_search_command.py old mode 100755 new mode 100644 diff --git a/bin/splunklib/searchcommands/generating_command.py b/bin/splunklib/searchcommands/generating_command.py old mode 100755 new mode 100644 index 46858cb..6a75d2c --- a/bin/splunklib/searchcommands/generating_command.py +++ b/bin/splunklib/searchcommands/generating_command.py @@ -15,10 +15,12 @@ # under the License. from __future__ import absolute_import, division, print_function, unicode_literals +import sys from .decorators import ConfigurationSetting from .search_command import SearchCommand +from splunklib import six from splunklib.six.moves import map as imap, filter as ifilter # P1 [O] TODO: Discuss generates_timeorder in the class-level documentation for GeneratingCommand @@ -203,19 +205,57 @@ def _execute(self, ifile, process): """ if self._protocol_version == 2: - result = self._read_chunk(ifile) + self._execute_v2(ifile, self.generate()) + else: + assert self._protocol_version == 1 + self._record_writer.write_records(self.generate()) + self.finish() - if not result: - return + def _execute_chunk_v2(self, process, chunk): + count = 0 + records = [] + for row in process: + records.append(row) + count += 1 + if count == self._record_writer._maxresultrows: + break - metadata, body = result - action = getattr(metadata, 'action', None) + for row in records: + self._record_writer.write_record(row) - if action != 'execute': - raise RuntimeError('Expected execute action, not {}'.format(action)) + if count == self._record_writer._maxresultrows: + self._finished = False + else: + self._finished = True - self._record_writer.write_records(self.generate()) - self.finish() + def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout, allow_empty_input=True): + """ Process data. + + :param argv: Command line arguments. + :type argv: list or tuple + + :param ifile: Input data file. + :type ifile: file + + :param ofile: Output data file. + :type ofile: file + + :param allow_empty_input: For generating commands, it must be true. Doing otherwise will cause an error. + :type allow_empty_input: bool + + :return: :const:`None` + :rtype: NoneType + + """ + + # Generating commands are expected to run on an empty set of inputs as the first command being run in a search, + # also this class implements its own separate _execute_chunk_v2 method which does not respect allow_empty_input + # so ensure that allow_empty_input is always True + + if not allow_empty_input: + raise ValueError("allow_empty_input cannot be False for Generating Commands") + else: + return super(GeneratingCommand, self).process(argv=argv, ifile=ifile, ofile=ofile, allow_empty_input=True) # endregion @@ -324,6 +364,8 @@ def fix_up(cls, command): if command.generate == GeneratingCommand.generate: raise AttributeError('No GeneratingCommand.generate override') + # TODO: Stop looking like a dictionary because we don't obey the semantics + # N.B.: Does not use Python 2 dict copy semantics def iteritems(self): iteritems = SearchCommand.ConfigurationSettings.iteritems(self) version = self.command.protocol_version @@ -334,6 +376,10 @@ def iteritems(self): lambda name_value: (name_value[0], 'stateful') if name_value[0] == 'type' else (name_value[0], name_value[1]), iteritems) return iteritems + # N.B.: Does not use Python 3 dict view semantics + if not six.PY2: + items = iteritems + pass # endregion diff --git a/bin/splunklib/searchcommands/internals.py b/bin/splunklib/searchcommands/internals.py old mode 100755 new mode 100644 index 9a2a4e9..1ea2833 --- a/bin/splunklib/searchcommands/internals.py +++ b/bin/splunklib/searchcommands/internals.py @@ -19,10 +19,7 @@ from io import TextIOWrapper from collections import deque, namedtuple from splunklib import six -try: - from collections import OrderedDict # must be python 2.7 -except ImportError: - from ..ordereddict import OrderedDict +from collections import OrderedDict from splunklib.six.moves import StringIO from itertools import chain from splunklib.six.moves import map as imap @@ -35,6 +32,7 @@ import os import re import sys +import warnings from . import environment @@ -74,7 +72,7 @@ def set_binary_mode(fh): class CommandLineParser(object): - """ Parses the arguments to a search command. + r""" Parses the arguments to a search command. A search command line is described by the following syntax. @@ -232,7 +230,7 @@ def replace(match): _escaped_character_re = re.compile(r'(\\.|""|[\\"])') - _fieldnames_re = re.compile(r"""("(?:\\.|""|[^"])+"|(?:\\.|[^\s"])+)""") + _fieldnames_re = re.compile(r"""("(?:\\.|""|[^"\\])+"|(?:\\.|[^\s"])+)""") _options_re = re.compile(r""" # Captures a set of name/value pairs when used with re.finditer @@ -505,8 +503,9 @@ def __init__(self, ofile, maxresultrows=None): self._inspector = OrderedDict() self._chunk_count = 0 - self._record_count = 0 - self._total_record_count = 0 + self._pending_record_count = 0 + self._committed_record_count = 0 + self.custom_fields = set() @property def is_flushed(self): @@ -524,6 +523,30 @@ def ofile(self): def ofile(self, value): self._ofile = set_binary_mode(value) + @property + def pending_record_count(self): + return self._pending_record_count + + @property + def _record_count(self): + warnings.warn( + "_record_count will be deprecated soon. Use pending_record_count instead.", + PendingDeprecationWarning + ) + return self.pending_record_count + + @property + def committed_record_count(self): + return self._committed_record_count + + @property + def _total_record_count(self): + warnings.warn( + "_total_record_count will be deprecated soon. Use committed_record_count instead.", + PendingDeprecationWarning + ) + return self.committed_record_count + def write(self, data): bytes_type = bytes if sys.version_info >= (3, 0) else str if not isinstance(data, bytes_type): @@ -547,6 +570,7 @@ def write_record(self, record): def write_records(self, records): self._ensure_validity() + records = list(records) write_record = self._write_record for record in records: write_record(record) @@ -555,8 +579,7 @@ def _clear(self): self._buffer.seek(0) self._buffer.truncate() self._inspector.clear() - self._record_count = 0 - self._flushed = False + self._pending_record_count = 0 def _ensure_validity(self): if self._finished is True: @@ -569,6 +592,7 @@ def _write_record(self, record): if fieldnames is None: self._fieldnames = fieldnames = list(record.keys()) + self._fieldnames.extend([i for i in self.custom_fields if i not in self._fieldnames]) value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames) self._writerow(list(chain.from_iterable(value_list))) @@ -651,9 +675,9 @@ def _write_record(self, record): values += (repr(value), None) self._writerow(values) - self._record_count += 1 + self._pending_record_count += 1 - if self._record_count >= self._maxresultrows: + if self.pending_record_count >= self._maxresultrows: self.flush(partial=True) try: @@ -690,7 +714,7 @@ def flush(self, finished=None, partial=None): RecordWriter.flush(self, finished, partial) # validates arguments and the state of this instance - if self._record_count > 0 or (self._chunk_count == 0 and 'messages' in self._inspector): + if self.pending_record_count > 0 or (self._chunk_count == 0 and 'messages' in self._inspector): messages = self._inspector.get('messages') @@ -728,9 +752,9 @@ def flush(self, finished=None, partial=None): print(level, text, file=stderr) self.write(self._buffer.getvalue()) - self._clear() self._chunk_count += 1 - self._total_record_count += self._record_count + self._committed_record_count += self.pending_record_count + self._clear() self._finished = finished is True @@ -748,37 +772,36 @@ class RecordWriterV2(RecordWriter): def flush(self, finished=None, partial=None): RecordWriter.flush(self, finished, partial) # validates arguments and the state of this instance - inspector = self._inspector - - if self._flushed is False: - - self._total_record_count += self._record_count - self._chunk_count += 1 - - # TODO: DVPL-6448: splunklib.searchcommands | Add support for partial: true when it is implemented in - # ChunkedExternProcessor (See SPL-103525) - # - # We will need to replace the following block of code with this block: - # - # metadata = [ - # ('inspector', self._inspector if len(self._inspector) else None), - # ('finished', finished), - # ('partial', partial)] - - if len(inspector) == 0: - inspector = None - - if partial is True: - finished = False - metadata = [item for item in (('inspector', inspector), ('finished', finished))] - self._write_chunk(metadata, self._buffer.getvalue()) - self._clear() + if partial or not finished: + # Don't flush partial chunks, since the SCP v2 protocol does not + # provide a way to send partial chunks yet. + return - elif finished is True: - self._write_chunk((('finished', True),), '') + if not self.is_flushed: + self.write_chunk(finished=True) - self._finished = finished is True + def write_chunk(self, finished=None): + inspector = self._inspector + self._committed_record_count += self.pending_record_count + self._chunk_count += 1 + + # TODO: DVPL-6448: splunklib.searchcommands | Add support for partial: true when it is implemented in + # ChunkedExternProcessor (See SPL-103525) + # + # We will need to replace the following block of code with this block: + # + # metadata = [item for item in (('inspector', inspector), ('finished', finished), ('partial', partial))] + # + # if partial is True: + # finished = False + + if len(inspector) == 0: + inspector = None + + metadata = [item for item in (('inspector', inspector), ('finished', finished))] + self._write_chunk(metadata, self._buffer.getvalue()) + self._clear() def write_metadata(self, configuration): self._ensure_validity() @@ -793,7 +816,7 @@ def write_metric(self, name, value): self._inspector['metric.' + name] = value def _clear(self): - RecordWriter._clear(self) + super(RecordWriterV2, self)._clear() self._fieldnames = None def _write_chunk(self, metadata, body): @@ -818,4 +841,4 @@ def _write_chunk(self, metadata, body): self.write(metadata) self.write(body) self._ofile.flush() - self._flushed = False + self._flushed = True diff --git a/bin/splunklib/searchcommands/reporting_command.py b/bin/splunklib/searchcommands/reporting_command.py old mode 100755 new mode 100644 index 3d6b357..9470861 --- a/bin/splunklib/searchcommands/reporting_command.py +++ b/bin/splunklib/searchcommands/reporting_command.py @@ -253,7 +253,7 @@ def fix_up(cls, command): cls._requires_preop = False return - f = vars(command)[b'map'] # Function backing the map method + f = vars(command)['map'] # Function backing the map method # EXPLANATION OF PREVIOUS STATEMENT: There is no way to add custom attributes to methods. See [Why does # setattr fail on a method](http://stackoverflow.com/questions/7891277/why-does-setattr-fail-on-a-bound-method) for a discussion of this issue. @@ -266,7 +266,7 @@ def fix_up(cls, command): # Create new StreamingCommand.ConfigurationSettings class - module = command.__module__ + b'.' + command.__name__ + b'.map' + module = command.__module__ + '.' + command.__name__ + '.map' name = b'ConfigurationSettings' bases = (StreamingCommand.ConfigurationSettings,) diff --git a/bin/splunklib/searchcommands/search_command.py b/bin/splunklib/searchcommands/search_command.py old mode 100755 new mode 100644 index b2835ee..dd11391 --- a/bin/splunklib/searchcommands/search_command.py +++ b/bin/splunklib/searchcommands/search_command.py @@ -22,10 +22,7 @@ import io -try: - from collections import OrderedDict # must be python 2.7 -except ImportError: - from ..ordereddict import OrderedDict +from collections import OrderedDict from copy import deepcopy from splunklib.six.moves import StringIO from itertools import chain, islice @@ -124,6 +121,7 @@ def __init__(self): self._default_logging_level = self._logger.level self._record_writer = None self._records = None + self._allow_empty_input = True def __str__(self): text = ' '.join(chain((type(self).name, str(self.options)), [] if self.fieldnames is None else self.fieldnames)) @@ -172,6 +170,14 @@ def logging_level(self, value): raise ValueError('Unrecognized logging level: {}'.format(value)) self._logger.setLevel(level) + def add_field(self, current_record, field_name, field_value): + self._record_writer.custom_fields.add(field_name) + current_record[field_name] = field_value + + def gen_record(self, **record): + self._record_writer.custom_fields |= set(record.keys()) + return record + record = Option(doc=''' **Syntax: record= @@ -398,7 +404,7 @@ def flush(self): :return: :const:`None` """ - self._record_writer.flush(partial=True) + self._record_writer.flush(finished=False) def prepare(self): """ Prepare for execution. @@ -413,7 +419,7 @@ def prepare(self): """ pass - def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout): + def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout, allow_empty_input=True): """ Process data. :param argv: Command line arguments. @@ -425,10 +431,16 @@ def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout): :param ofile: Output data file. :type ofile: file + :param allow_empty_input: Allow empty input records for the command, if False an Error will be returned if empty chunk body is encountered when read + :type allow_empty_input: bool + :return: :const:`None` :rtype: NoneType """ + + self._allow_empty_input = allow_empty_input + if len(argv) > 1: self._process_protocol_v1(argv, ifile, ofile) else: @@ -634,6 +646,19 @@ def _process_protocol_v1(self, argv, ifile, ofile): debug('%s.process finished under protocol_version=1', class_name) + def _protocol_v2_option_parser(self, arg): + """ Determines if an argument is an Option/Value pair, or just a Positional Argument. + Method so different search commands can handle parsing of arguments differently. + + :param arg: A single argument provided to the command from SPL + :type arg: str + + :return: [OptionName, OptionValue] OR [PositionalArgument] + :rtype: List[str] + + """ + return arg.split('=', 1) + def _process_protocol_v2(self, argv, ifile, ofile): """ Processes records on the `input stream optionally writing records to the output stream. @@ -656,7 +681,7 @@ def _process_protocol_v2(self, argv, ifile, ofile): # noinspection PyBroadException try: debug('Reading metadata') - metadata, body = self._read_chunk(ifile) + metadata, body = self._read_chunk(self._as_binary_stream(ifile)) action = getattr(metadata, 'action', None) @@ -704,7 +729,7 @@ def _process_protocol_v2(self, argv, ifile, ofile): if args and type(args) == list: for arg in args: - result = arg.split('=', 1) + result = self._protocol_v2_option_parser(arg) if len(result) == 1: self.fieldnames.append(str(result[0])) else: @@ -776,7 +801,6 @@ def _process_protocol_v2(self, argv, ifile, ofile): # noinspection PyBroadException try: debug('Executing under protocol_version=2') - self._records = self._records_protocol_v2 self._metadata.action = 'execute' self._execute(ifile, None) except SystemExit: @@ -833,6 +857,8 @@ def _decode_list(mv): _encoded_value = re.compile(r'\$(?P(?:\$\$|[^$])*)\$(?:;|$)') # matches a single value in an encoded list + # Note: Subclasses must override this method so that it can be called + # called as self._execute(ifile, None) def _execute(self, ifile, process): """ Default processing loop @@ -846,21 +872,38 @@ def _execute(self, ifile, process): :rtype: NoneType """ - self._record_writer.write_records(process(self._records(ifile))) - self.finish() + if self.protocol_version == 1: + self._record_writer.write_records(process(self._records(ifile))) + self.finish() + else: + assert self._protocol_version == 2 + self._execute_v2(ifile, process) @staticmethod - def _read_chunk(ifile): + def _as_binary_stream(ifile): + naught = ifile.read(0) + if isinstance(naught, bytes): + return ifile + + try: + return ifile.buffer + except AttributeError as error: + raise RuntimeError('Failed to get underlying buffer: {}'.format(error)) + + @staticmethod + def _read_chunk(istream): # noinspection PyBroadException + assert isinstance(istream.read(0), six.binary_type), 'Stream must be binary' + try: - header = ifile.readline() + header = istream.readline() except Exception as error: raise RuntimeError('Failed to read transport header: {}'.format(error)) if not header: return None - match = SearchCommand._header.match(header) + match = SearchCommand._header.match(six.ensure_str(header)) if match is None: raise RuntimeError('Failed to parse transport header: {}'.format(header)) @@ -870,14 +913,14 @@ def _read_chunk(ifile): body_length = int(body_length) try: - metadata = ifile.read(metadata_length) + metadata = istream.read(metadata_length) except Exception as error: raise RuntimeError('Failed to read metadata of length {}: {}'.format(metadata_length, error)) decoder = MetadataDecoder() try: - metadata = decoder.decode(metadata) + metadata = decoder.decode(six.ensure_str(metadata)) except Exception as error: raise RuntimeError('Failed to parse metadata of length {}: {}'.format(metadata_length, error)) @@ -887,16 +930,18 @@ def _read_chunk(ifile): body = "" try: if body_length > 0: - body = ifile.read(body_length) + body = istream.read(body_length) except Exception as error: raise RuntimeError('Failed to read body of length {}: {}'.format(body_length, error)) - return metadata, body + return metadata, six.ensure_str(body) _header = re.compile(r'chunked\s+1.0\s*,\s*(\d+)\s*,\s*(\d+)\s*\n') def _records_protocol_v1(self, ifile): + return self._read_csv_records(ifile) + def _read_csv_records(self, ifile): reader = csv.reader(ifile, dialect=CsvDialect) try: @@ -921,51 +966,37 @@ def _records_protocol_v1(self, ifile): record[fieldname] = value yield record - def _records_protocol_v2(self, ifile): + def _execute_v2(self, ifile, process): + istream = self._as_binary_stream(ifile) while True: - result = self._read_chunk(ifile) + result = self._read_chunk(istream) if not result: return metadata, body = result action = getattr(metadata, 'action', None) - if action != 'execute': raise RuntimeError('Expected execute action, not {}'.format(action)) - finished = getattr(metadata, 'finished', False) + self._finished = getattr(metadata, 'finished', False) self._record_writer.is_flushed = False - if len(body) > 0: - reader = csv.reader(StringIO(body), dialect=CsvDialect) + self._execute_chunk_v2(process, result) - try: - fieldnames = next(reader) - except StopIteration: - return + self._record_writer.write_chunk(finished=self._finished) - mv_fieldnames = dict([(name, name[len('__mv_'):]) for name in fieldnames if name.startswith('__mv_')]) + def _execute_chunk_v2(self, process, chunk): + metadata, body = chunk - if len(mv_fieldnames) == 0: - for values in reader: - yield OrderedDict(izip(fieldnames, values)) - else: - for values in reader: - record = OrderedDict() - for fieldname, value in izip(fieldnames, values): - if fieldname.startswith('__mv_'): - if len(value) > 0: - record[mv_fieldnames[fieldname]] = self._decode_list(value) - elif fieldname not in record: - record[fieldname] = value - yield record - - if finished: - return + if len(body) <= 0 and not self._allow_empty_input: + raise ValueError( + "No records found to process. Set allow_empty_input=True in dispatch function to move forward " + "with empty records.") - self.flush() + records = self._read_csv_records(StringIO(body)) + self._record_writer.write_records(process(records)) def _report_unexpected_error(self): @@ -1036,6 +1067,8 @@ def fix_up(cls, command_class): """ return + # TODO: Stop looking like a dictionary because we don't obey the semantics + # N.B.: Does not use Python 2 dict copy semantics def iteritems(self): definitions = type(self).configuration_setting_definitions version = self.command.protocol_version @@ -1044,7 +1077,9 @@ def iteritems(self): lambda setting: (setting.name, setting.__get__(self)), ifilter( lambda setting: setting.is_supported_by_protocol(version), definitions))) - items = iteritems + # N.B.: Does not use Python 3 dict view semantics + if not six.PY2: + items = iteritems pass # endregion @@ -1054,8 +1089,7 @@ def iteritems(self): SearchMetric = namedtuple('SearchMetric', ('elapsed_seconds', 'invocation_count', 'input_count', 'output_count')) - -def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys.stdout, module_name=None): +def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys.stdout, module_name=None, allow_empty_input=True): """ Instantiates and executes a search command class This function implements a `conditional script stanza `_ based on the value of @@ -1078,6 +1112,8 @@ def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys :type output_file: :code:`file` :param module_name: Name of the module calling :code:`dispatch` or :const:`None`. :type module_name: :code:`basestring` + :param allow_empty_input: Allow empty input records for the command, if False an Error will be returned if empty chunk body is encountered when read + :type allow_empty_input: bool :returns: :const:`None` **Example** @@ -1115,4 +1151,4 @@ def stream(records): assert issubclass(command_class, SearchCommand) if module_name is None or module_name == '__main__': - command_class().process(argv, input_file, output_file) + command_class().process(argv, input_file, output_file, allow_empty_input) diff --git a/bin/splunklib/searchcommands/streaming_command.py b/bin/splunklib/searchcommands/streaming_command.py old mode 100755 new mode 100644 index 9d900c3..fa075ed --- a/bin/splunklib/searchcommands/streaming_command.py +++ b/bin/splunklib/searchcommands/streaming_command.py @@ -16,6 +16,7 @@ from __future__ import absolute_import, division, print_function, unicode_literals +from splunklib import six from splunklib.six.moves import map as imap, filter as ifilter from .decorators import ConfigurationSetting @@ -172,6 +173,8 @@ def fix_up(cls, command): raise AttributeError('No StreamingCommand.stream override') return + # TODO: Stop looking like a dictionary because we don't obey the semantics + # N.B.: Does not use Python 2 dict copy semantics def iteritems(self): iteritems = SearchCommand.ConfigurationSettings.iteritems(self) version = self.command.protocol_version @@ -185,4 +188,8 @@ def iteritems(self): lambda name_value1: (name_value1[0], 'stateful') if name_value1[0] == 'type' else (name_value1[0], name_value1[1]), iteritems) return iteritems + # N.B.: Does not use Python 3 dict view semantics + if not six.PY2: + items = iteritems + # endregion diff --git a/bin/splunklib/searchcommands/validators.py b/bin/splunklib/searchcommands/validators.py old mode 100755 new mode 100644 index f3e2e52..22f0e16 --- a/bin/splunklib/searchcommands/validators.py +++ b/bin/splunklib/searchcommands/validators.py @@ -95,7 +95,9 @@ def __call__(self, value): try: return Code.object(compile(value, 'string', self._mode), six.text_type(value)) except (SyntaxError, TypeError) as error: - raise ValueError(error.message) + message = str(error) + + six.raise_from(ValueError(message), error) def format(self, value): return None if value is None else value.source @@ -199,6 +201,48 @@ def format(self, value): return None if value is None else six.text_type(int(value)) +class Float(Validator): + """ Validates float option values. + + """ + def __init__(self, minimum=None, maximum=None): + if minimum is not None and maximum is not None: + def check_range(value): + if not (minimum <= value <= maximum): + raise ValueError('Expected float in the range [{0},{1}], not {2}'.format(minimum, maximum, value)) + return + elif minimum is not None: + def check_range(value): + if value < minimum: + raise ValueError('Expected float in the range [{0},+∞], not {1}'.format(minimum, value)) + return + elif maximum is not None: + def check_range(value): + if value > maximum: + raise ValueError('Expected float in the range [-∞,{0}], not {1}'.format(maximum, value)) + return + else: + def check_range(value): + return + + self.check_range = check_range + return + + def __call__(self, value): + if value is None: + return None + try: + value = float(value) + except ValueError: + raise ValueError('Expected float value, not {}'.format(json_encode_string(value))) + + self.check_range(value) + return value + + def format(self, value): + return None if value is None else six.text_type(float(value)) + + class Duration(Validator): """ Validates duration option values. @@ -386,4 +430,4 @@ def format(self, value): return self.__call__(value) -__all__ = ['Boolean', 'Code', 'Duration', 'File', 'Integer', 'List', 'Map', 'RegularExpression', 'Set'] +__all__ = ['Boolean', 'Code', 'Duration', 'File', 'Integer', 'Float', 'List', 'Map', 'RegularExpression', 'Set'] diff --git a/bin/splunklib/six.py b/bin/splunklib/six.py old mode 100755 new mode 100644 index 5fe9f8e..d13e50c --- a/bin/splunklib/six.py +++ b/bin/splunklib/six.py @@ -978,3 +978,16 @@ def python_2_unicode_compatible(klass): del i, importer # Finally, add the importer to the meta path import hook. sys.meta_path.append(_importer) + +import warnings + +def deprecated(message): + def deprecated_decorator(func): + def deprecated_func(*args, **kwargs): + warnings.warn("{} is a deprecated function. {}".format(func.__name__, message), + category=DeprecationWarning, + stacklevel=2) + warnings.simplefilter('default', DeprecationWarning) + return func(*args, **kwargs) + return deprecated_func + return deprecated_decorator \ No newline at end of file diff --git a/default/app.conf b/default/app.conf old mode 100755 new mode 100644 diff --git a/default/commands.conf b/default/commands.conf old mode 100755 new mode 100644 index d3b8fb9..c2e2fdc --- a/default/commands.conf +++ b/default/commands.conf @@ -6,4 +6,5 @@ requires_srinfo = true stderr_dest = message supports_rawargs = true supports_getinfo = true -supports_multivalues = true \ No newline at end of file +supports_multivalues = true +python.version = python3 \ No newline at end of file diff --git a/default/searchbnf.conf b/default/searchbnf.conf old mode 100755 new mode 100644 diff --git a/metadata/default.meta b/metadata/default.meta old mode 100755 new mode 100644 diff --git a/static/appIcon.png b/static/appIcon.png index 71a273e..6b1b681 100644 Binary files a/static/appIcon.png and b/static/appIcon.png differ diff --git a/static/appIcon_2x.png b/static/appIcon_2x.png index a8f9445..89849a3 100644 Binary files a/static/appIcon_2x.png and b/static/appIcon_2x.png differ