Using Elasticsearch from Python

Sep 4, 2019

Introduction

Elasticsearch supports a JSON-based REST API which makes it very easy to interact with it from almost any environment. Using curl from the command line, you can perfom almost any task. Here is an example listing the indexes:

curl -X GET "localhost:9200/_cat/indices?format=json&pretty"

This assumes that Elasticsearch is running the current machine on the default port of 9200.

Using python with Elasticsearch

That being said, sometimes it is convenient to use a programming language to work with Elastic search. This may be to construct the request, or process the response. As we see below, this also allows us to paginate the results from elastic search using the scroll api.

In this article, we will use the python programming language to interact with Elastic search. We will stick mostly with standard modules to handle the request and the response. For most cases, interacting with Elastic search from python is so simple that special modules are not needed.

We will show code which uses Python 3 since Python 2 has reached end-of-life.

Exception class

We will use the following exception class for indicating an error. When Elastic search responds back with a status code of anything other than 200, it signals an error. The response will include a description of the error in JSON format.

class ESError(Exception):
    def __init__(self, resp):
        self._statusCode = resp.status
        self._reason = resp.reason
        self._message = json.dumps(json.loads(resp.read().decode('utf-8')), indent=2)

    def __str__(self):
        return '\ncode: {}, reason: {}\nmessage: {}'.format(self._statusCode, self._reason, self._message)

The following function is used for returning the response if the status code is 200, or signalling an error otherwise.

def raiseIfError(response):
    if response.status == 200:
        return response
    raise ESError(response)

Listing indices using cat

For example the above request for looking up the available indices could be written as follows:

import json, http.client
con = http.client.HTTPConnection('localhost', port=9200)
path = '/_cat/indices?format=json&pretty'
con.request('GET', path)
r = raiseIfError(con.getresponse())
print(json.dumps(json.loads(r.read().decode('utf-8')), indent=2))
con.close()

which results in the following output:

[
  {
    "health" : "green",
    "status" : "open",
    "index" : "customers.csv",
    "uuid" : "_LYv5xHgSvW7igcuNIlUOA",
    "pri" : "1",
    "rep" : "0",
    "docs.count" : "29",
    "docs.deleted" : "0",
    "store.size" : "30.7kb",
    "pri.store.size" : "30.7kb"
  }
]

Checking the health of the cluster

To check the health of the cluster, we use the same code snippet as above with the following change:

...
path = '/_cat/health?format=json&pretty'
...

The response shows the health of the cluster.

[
  {
    "epoch": "1567505710",
    "node.total": "1",
    "init": "0",
    "status": "green",
    "cluster": "elasticsearch",
    "pending_tasks": "0",
    "relo": "0",
    "node.data": "1",
    "max_task_wait_time": "-",
    "active_shards_percent": "100.0%",
    "shards": "1",
    "pri": "1",
    "unassign": "0",
    "timestamp": "10:15:10"
  }
]

Running a query

Running a query requires some change to the code shown above. You need to include a body with the GET request. The following runs an empty query (q = "{}") to list the first 10 hits from the index named diabetes-prevalence.csv.

import json, http.client
con = http.client.HTTPConnection('localhost', port=9200)
q = """{}"""
path = '/diabetes-prevalence.csv/_search'
con.request('GET', path, body=q.encode('utf-8'), headers={'Content-Type':'application/json'})
res = json.loads(raiseIfError(con.getresponse()).read().decode('utf-8'))
print(json.dumps(res, indent=2))
print('Found %d hits.' % len(res['hits']['hits']))
con.close()

The response is as follows:

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 264,
    "max_score": 1.0,
    "hits": [
      {
        "_index": "diabetes-prevalence.csv",
        "_type": "doc",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "Country Name": "Aruba",
          "Country Code": "ABW",
          "Indicator Name": "Diabetes prevalence (% of population ages 20 to 79)",
          "Indicator Code": "SH.STA.DIAB.ZS",
          "1960": "",
          "2017": 11.62
      }
...

Paginating results using size and from

When running a query on Elasticsearch, you can specify how many hits (or results) you want by including a value for the size parameter. If you don’t specify this parameter, Elasticsearch returns 10 hits. You can of course request a higher number of hits as follows:

import json, http.client
con = http.client.HTTPConnection('localhost', port=9200)
q = """{"size":100}"""
path = '/diabetes-prevalence.csv/_search'
con.request('GET', path, body=q.encode('utf-8'), headers={'Content-Type':'application/json'})
r = raiseIfError(con.getresponse())
print(json.dumps(json.loads(r.read().decode('utf-8')), indent=2))

To fetch the next batch of results, you can include a from parameter specifying how many hits to skip before returning the size number of results. The following code is a complete program can be used to paginate upto 10000 documents in an index. (10000 is a limit enforced by Elasticsearch for deep-paging. Use the scroll api to fetch more or even all the hits.)

import json, http.client
indexName = 'diabetes-prevalence.csv'
con = http.client.HTTPConnection('localhost', port=9200)
start = 0
while True:
    q = '{"size":10,"from":%d}' % (start)
    path = '/%s/_search' % (indexName)
    con.request('GET', path, body=q.encode('utf-8'), headers={'Content-Type':'application/json'})
    res = json.loads(raiseIfError(con.getresponse()).read().decode('utf-8'))
    if not res['hits']['hits']:
        break
    start += len(res['hits']['hits'])
    print(json.dumps(res, indent=2))
print('Found %d hits.' % (start))
con.close()

Using the scroll api to fetch all hits

While the above sample code can fetch all results by using from and size, it is inefficient and not recommended for a large number of results. A better way is to use the scroll api. This mechanism creates a scroll cursor (which is kept alive for a specified duration) from which all results can be fetched in order.

import json, http.client
indexName = 'diabetes-prevalence.csv'
con = http.client.HTTPConnection('localhost', port=9200)
q = '{"size":10}'
scroll = '1m'
path = '/%s/_search?scroll=%s' % (indexName, scroll)
start = 0
while True:
    con.request('POST', path, body=q.encode('utf-8'), headers={'Content-Type':'application/json'})
    res = json.loads(raiseIfError(con.getresponse()).read().decode('utf-8'))
    if not res['hits']['hits']:
        break
    start += len(res['hits']['hits'])
    print(json.dumps(res, indent=2))
    scrollId = res['_scroll_id']
    q = '{"scroll":"%s","scroll_id":"%s"}' % (scroll, scrollId)
    path = '/_search/scroll'
print('Found %d hits.' % (start))
con.close()

Some Sample Queries

To illustrate the usage of Elasticsearch queries further using python, let us try some sample queries. For the purpose of explaining the code below, we have the following python function (search()) which runs the search. It accepts as parameters the host and port where Elasticsearch is running, the index-name and the JSON of the query.

def raiseIfError(response):
    if response.status == 200:
        return response
    raise ESError(response)

def search(host, port, indexName, jsonStr):
    con = http.client.HTTPConnection(host, port)
    path = '/%s/_search' % (indexName)
    con.request('GET', path, body=jsonStr.encode('utf-8'), headers={'Content-Type':'application/json'})
    res = json.loads(raiseIfError(con.getresponse()).read().decode('utf-8'))
    print(json.dumps(res, indent=2))
    print('Found %d hits.' % len(res['hits']['hits']))
    con.close()

And here is a basic match-all invocation which returns 10 documents.

search("localhost", 9200, "customers.csv", json.dumps({}))

One way to perform an exact text search is by wrapping a match_phrase query inside a bool query.

search("localhost", 9200, "customers.csv", json.dumps({
    'query': {
        'bool': {
            'must': [{
                'match_phrase': {
                    'First Name': 'John'
                }
            }]
        }
    }
}))

And we get the 2 hits that match.

...
    "hits": [
      {
        "_index": "customers.csv",
        "_type": "doc",
        "_id": "12",
        "_score": 2.5840096,
        "_source": {
          "ID": 12,
          "Company": "Company L",
          "Last Name": "Edwards",
          "First Name": "John",
          "E-mail Address": "",
          "Job Title": "Purchasing Manager",
          "Business Phone": "(123)555-0100",
          "Home Phone": "",
          "Mobile Phone": "",
          "Fax Number": "(123)555-0101",
          "Address": "123 12th Street",
          "City": "Las Vegas",
          "State/Province": "NV",
          "ZIP/Postal Code": 99999,
          "Country/Region": "USA",
          "Web Page": "",
          "Notes": "",
          "Attachments": null
        }
      },
      {
        "_index": "customers.csv",
        "_type": "doc",
        "_id": "25",
        "_score": 2.5840096,
        "_source": {
          "ID": 25,
          "Company": "Company Y",
          "Last Name": "Rodman",
          "First Name": "John",
          "E-mail Address": "",
...

SQL-type WHERE/IN query

In an SQL database, we have the WHERE/IN query which is used to select records with one of a set of values in a field. It looks like this:

SELECT * FROM "diabetes-prevalence.csv"
WHERE  "Country name" IN ('Argentina', 'Armenia', 'American Samoa') 

A similar query in Elasticsearch can be performed using:

search("localhost", 9200, "diabetes-prevalence.csv", json.dumps({
    'query': {
        'bool': {
            'filter': [{
                'terms': {
                    'Country Name.keyword': ['Argentina', 'Armenia', 'American Samoa']
                }
            }]
        }
    }
}))

Conclusion

This article is introduced the usage of python to interact with Elasticsearch. While cURL can be used from the command line for quick request/response scenarios, using python allows you to do more including processing the response, composing the request, etc.

Argon uses both pagination and scrolling

  1. Pagination is used in the main Explorer View to browse the query results.

  2. Scrolling is used when exporting the query results to CSV or JSON.

  3. Pagination is also used in various other places like selecting multiple values, etc.