mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
commit
331409314c
@ -1,100 +0,0 @@
|
||||
import eland
|
||||
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch_dsl import Search
|
||||
|
||||
import pandas as pd
|
||||
|
||||
class DataFrame():
|
||||
|
||||
def __init__(self, client, index_pattern):
|
||||
self.client = eland.Client(client)
|
||||
self.index_pattern = index_pattern
|
||||
|
||||
self.client.indices().exists(index_pattern)
|
||||
|
||||
@staticmethod
|
||||
def _es_results_to_pandas(results):
|
||||
# TODO - resolve nested fields
|
||||
rows = []
|
||||
for hit in results['hits']['hits']:
|
||||
row = {}
|
||||
for k in hit.keys():
|
||||
if k == '_source':
|
||||
row.update(hit['_source'])
|
||||
rows.append(row)
|
||||
return pd.DataFrame(data=rows)
|
||||
|
||||
@staticmethod
|
||||
def _flatten_mapping(prefix, properties, result):
|
||||
for k, v in properties.items():
|
||||
if 'properties' in v:
|
||||
if(prefix == ''):
|
||||
prefix = k
|
||||
else:
|
||||
prefix = prefix + '.' + k
|
||||
DataFrame._flatten_mapping(prefix, v['properties'], result)
|
||||
else:
|
||||
if(prefix == ''):
|
||||
key = k
|
||||
else:
|
||||
key = prefix + '.' + k
|
||||
type = v['type']
|
||||
result.append((key, type))
|
||||
|
||||
@staticmethod
|
||||
def _es_mappings_to_pandas(mappings):
|
||||
fields = []
|
||||
for index in mappings:
|
||||
if 'properties' in mappings[index]['mappings']:
|
||||
properties = mappings[index]['mappings']['properties']
|
||||
|
||||
DataFrame._flatten_mapping('', properties, fields)
|
||||
|
||||
return pd.DataFrame(data=fields, columns=['field', 'datatype'])
|
||||
|
||||
def head(self, n=5):
|
||||
results = self.client.search(index=self.index_pattern, size=n)
|
||||
|
||||
return DataFrame._es_results_to_pandas(results)
|
||||
|
||||
def describe(self):
|
||||
# First get all types
|
||||
#mapping = self.client.indices().get_mapping(index=self.index_pattern)
|
||||
mapping = self.client.indices().get_mapping(index=self.index_pattern)
|
||||
|
||||
fields = DataFrame._es_mappings_to_pandas(mapping)
|
||||
|
||||
# Get numeric types (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#the-where-method-and-masking)
|
||||
# https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html
|
||||
# TODO refactor this list out of method
|
||||
numeric_fields = fields.query('datatype == ["long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float"]')
|
||||
|
||||
# for each field we copute:
|
||||
# count, mean, std, min, 25%, 50%, 75%, max
|
||||
search = Search(using=self.client, index=self.index_pattern).extra(size=0)
|
||||
|
||||
for field in numeric_fields.field:
|
||||
search.aggs.metric('extended_stats_'+field, 'extended_stats', field=field)
|
||||
search.aggs.metric('percentiles_'+field, 'percentiles', field=field)
|
||||
|
||||
response = search.execute()
|
||||
|
||||
results = pd.DataFrame(index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'])
|
||||
|
||||
for field in numeric_fields.field:
|
||||
values = []
|
||||
values.append(response.aggregations['extended_stats_'+field]['count'])
|
||||
values.append(response.aggregations['extended_stats_'+field]['avg'])
|
||||
values.append(response.aggregations['extended_stats_'+field]['std_deviation'])
|
||||
values.append(response.aggregations['extended_stats_'+field]['min'])
|
||||
values.append(response.aggregations['percentiles_'+field]['values']['25.0'])
|
||||
values.append(response.aggregations['percentiles_'+field]['values']['50.0'])
|
||||
values.append(response.aggregations['percentiles_'+field]['values']['75.0'])
|
||||
values.append(response.aggregations['extended_stats_'+field]['max'])
|
||||
|
||||
# if not None
|
||||
if (values.count(None) < len(values)):
|
||||
results = results.assign(**{field: values})
|
||||
|
||||
return results
|
Loading…
x
Reference in New Issue
Block a user