Commit 73249481 authored by Robert Lyon's avatar Robert Lyon Committed by Gerrit Code Review
Browse files

Merge "More robust handling of Elasticsearch bulk operations"

parents cbe94a4c f7194ce1
......@@ -497,6 +497,26 @@ $cfg->cleanurlusereditable = true;
// $cfg->plugin_search_elasticsearch_bypassindexname = null;
// $cfg->plugin_search_elasticsearch_analyzer = 'mahara_analyzer';
// $cfg->plugin_search_elasticsearch_types = 'usr,interaction_instance,interaction_forum_post,group,view,artefact';
/**
* @global int $cfg->plugin_search_elasticsearch_requestlimit How many items to send per elasticsearch bulk request
* The main side effect of raising this, is that it increases the size of the POST request you send to your
* elasticsearch server. If you're using a proxy in front of elasticsearch, a very large request is likely to exceed
* its default POST request size limit.
*/
$cfg->plugin_search_elasticsearch_requestlimit = '100';
/**
* @global int $cfg->plugin_search_elasticsearch_redolimit If there are records in the queue that have failed on
* being sent in a previous bulk attempt, how many of them should we retry sending individually during each cron
* run. The idea here is that if there is one "bad" record in a bulk request which causes the whole request to fail,
* you want to retry the records in that batch individually so that the rest of them can be sent.
*
* We count retried records against the (optional) cron limit. So if cronlimit is 1000 and redolimit is 200, then
* we'll do 800 new records and at the end retry 200 individual records.
*
* A reasonable starting value for this is your cronlimit divided by your requestlimit. (i.e. 50000/100 = 500).
*/
$cfg->plugin_search_elasticsearch_redolimit = '500';
/**
* Additional HTML: Use these settings to put snippets of HTML at the top of every page on the site.
......
......@@ -10,6 +10,8 @@
<FIELD NAME="itemid" TYPE="int" LENGTH="10" NOTNULL="true" />
<FIELD NAME="type" TYPE="char" LENGTH="255" NOTNULL="true" DEFAULT="" />
<FIELD NAME="artefacttype" TYPE="char" LENGTH="255" NOTNULL="false" />
<FIELD NAME="status" TYPE="int" LENGTH="10" NOTNULL="true" DEFAULT="0" />
<FIELD NAME="lastprocessed" TYPE="datetime" NOTNULL="false" />
</FIELDS>
<KEYS>
......@@ -17,6 +19,7 @@
</KEYS>
<INDEXES>
<INDEX NAME="itemidix" UNIQUE="false" FIELDS="itemid"/>
<INDEX NAME="statusix" UNIQUE="false" FIELDS="status"/>
</INDEXES>
</TABLE>
</TABLES>
......
......@@ -20,5 +20,23 @@ function xmldb_search_elasticsearch_upgrade($oldversion=0) {
add_index($table, $index);
}
if ($oldversion < 2015060900) {
log_debug('Add "status" and "lastprocessed" columns to search_elasticsearch_queue table');
$table = new XMLDBTable('search_elasticsearch_queue');
$field = new XMLDBField('status');
$field->setAttributes(XMLDB_TYPE_INTEGER, 10, null, XMLDB_NOTNULL, null, null, null, 0);
add_field($table, $field);
$table = new XMLDBTable('search_elasticsearch_queue');
$field = new XMLDBField('lastprocessed');
$field->setAttributes(XMLDB_TYPE_DATETIME);
add_field($table, $field);
$table = new XMLDBTable('search_elasticsearch_queue');
$index = new XMLDBIndex('statusix');
$index->setAttributes(XMLDB_INDEX_NOTUNIQUE, array('status'));
add_index($table, $index);
}
return true;
}
......@@ -37,6 +37,23 @@ defined('INTERNAL') || die();
*/
class PluginSearchElasticsearch extends PluginSearch {
/**
* Records in search_elasticsearch_queue that haven't been sent to Elasticsearch yet.
*/
const queue_status_new = 0;
/**
* Records in search_elasticsearch_queue that have been sent in bulk to Elasticsearch.
* These are deleted after being successfully sent, so they'll only be seen in the table
* if the request to send them failed.
*/
const queue_status_sent_in_bulk = 1;
/**
* Records in search_elasticsearch_queue that have been sent individually to Elasticsearch.
* These are deleted after being successfully sent, so they'll only be seen in the table
* if the individual request to send them failed.
*/
const queue_status_sent_individually = 2;
/**
* This function indicates whether the plugin should take the raw $query string
* when its group_search_user function is called, or whether it should get the
......@@ -573,67 +590,290 @@ class PluginSearchElasticsearch extends PluginSearch {
*/
public static function index_queued_items() {
$limitnum = intval(get_config_plugin('search', 'elasticsearch', 'cronlimit'));
if ($limitnum <= 0) {
$limitfrom = $limitnum = '';
$cronlimit = intval(get_config_plugin('search', 'elasticsearch', 'cronlimit'));
if ($cronlimit <= 0) {
$limitfrom = $limitto = '';
}
else {
$limitfrom = 0;
$limitto = $cronlimit;
}
$records = get_records_array('search_elasticsearch_queue', '', '', 'id', '*', $limitfrom, $limitnum);
if (!$records) {
$requestlimit = intval(get_config_plugin('search', 'elasticsearch', 'requestlimit'));
if ($requestlimit <= 0) {
// If they specified no request limit, just use a really big number. This is easier
// than writing special code just to handle the case where there's no limit.
$requestlimit = 1000;
}
$redolimit = intval(get_config_plugin('search', 'elasticsearch', 'redolimit'));
if ($redolimit <= 0) {
// If they've set redolimit to 0, they don't want to retry failed records at all
$redolimit = 0;
$redoablecount = 0;
}
else {
// Find out how many failed records there are
// (Since any sent in bulk will be deleted if the request processed successfully, any remaining ones
// are failed records.)
$redoablecount = count_records('search_elasticsearch_queue', 'status', self::queue_status_sent_in_bulk);
$redolimit = min($redolimit, $redoablecount);
if ($limitto) {
$redolimit = min($redolimit, $limitto);
$limitto -= $redolimit;
}
}
$records = get_records_array('search_elasticsearch_queue', 'status', self::queue_status_new, 'id', '*', $limitfrom, $limitto);
if (!$records && !$redolimit) {
return;
}
$elasticaClient = self::make_client();
$indexname = self::get_write_indexname();
$elasticaIndex = $elasticaClient->getIndex($indexname);
$artefacttypesmap_array = self::elasticsearchartefacttypesmap_to_array();
if ($records) {
list($documents, $deletions) = self::preprocess_queued_items($records, $artefacttypesmap_array);
// Delete in bulk
if ($deletions) {
$delcount = 0;
foreach ($deletions as $docs) {
$delcount += count($docs);
}
log_info(" {$delcount} deletions to index in bulk...");
self::send_queued_items_in_bulk(
$deletions,
function($records, $type) use ($elasticaClient, $indexname) {
return $elasticaClient->deleteIds($records, $indexname, $type);
},
$requestlimit,
$elasticaIndex
);
}
// Send in bulk
if ($documents) {
$doccount = 0;
foreach ($documents as $docs) {
$doccount += count($docs);
}
log_info(" {$doccount} documents to index in bulk...");
self::send_queued_items_in_bulk(
$documents,
function($records, $type) {
return $type->addDocuments($records);
},
$requestlimit,
$elasticaIndex
);
}
}
// Now, pick up any failed ones
$records = get_records_array('search_elasticsearch_queue', 'status', self::queue_status_sent_in_bulk, 'id', '*', 0, $redolimit);
if ($records) {
list($documents, $deletions) = self::preprocess_queued_items($records, $artefacttypesmap_array);
// Delete individually
if ($deletions) {
$delcount = 0;
foreach ($deletions as $docs) {
$delcount += count($docs);
}
log_info(" {$delcount} deletions to index individually...");
self::send_queued_items_individually(
$deletions,
function($record, $type) use ($elasticaClient, $indexname) {
return $elasticaClient->deleteIds(array($record), $indexname, $type);
},
$requestlimit,
$elasticaIndex
);
}
// Send individually
if ($documents) {
$doccount = 0;
foreach ($documents as $docs) {
$doccount += count($docs);
}
log_info(" {$doccount} documents to index individually...");
self::send_queued_items_individually(
$documents,
function($record, $type) {
return $type->addDocuments(array($record));
},
$requestlimit,
$elasticaIndex
);
}
}
// Refresh Index
$elasticaIndex->refresh();
}
/**
* Process a set of records from search_elasticsearch_queue and sort them into
* items to insert and delete into the Elasticsearch index.
* @param array $records
* @param array $artefacttypesmap_array
* @return array()
*/
private static function preprocess_queued_items($records, $artefacttypesmap_array) {
$documents = array();
$deletions = array();
foreach ($records as $record) {
$deleteitem = false;
$tmp = null;
$ES_class = 'ElasticsearchType_' . $record->type;
if ($record->type == 'artefact') {
$tmp['db'] = $ES_class::getRecordById($record->type, $record->itemid, $artefacttypesmap_array);
$dbrecord = $ES_class::getRecordById($record->type, $record->itemid, $artefacttypesmap_array);
}
else {
$tmp['db'] = $ES_class::getRecordById($record->type, $record->itemid);
$dbrecord = $ES_class::getRecordById($record->type, $record->itemid);
}
// If the record has been physically deleted from the DB or if its artefacttype is not selected
if ($tmp['db'] == false) {
if ($dbrecord == false) {
$deleteitem = true;
}
else {
$item = new $ES_class($tmp['db']);
$item = new $ES_class($dbrecord);
$deleteitem = $item->getisDeleted();
}
// Remove item from index
// Mark item for bulk deletion from index
if ($deleteitem == true) {
$tmp = $elasticaClient->deleteIds(array($record->itemid), $indexname, $record->type);
$deletions[$record->type][$record->id] = $record->itemid;
}
// Add item for bulk index
else {
$documents[$record->type][] = new \Elastica\Document($record->itemid, $item->getMapping());
$documents[$record->type][$record->id] = new \Elastica\Document($record->itemid, $item->getMapping());
}
delete_records('search_elasticsearch_queue', 'id', $record->id);
}
// Bulk index
return array(
$documents,
$deletions
);
}
/**
* Uploat a set of items to Elasticsearch in bulk
* @param array $documents A multi-dimensional array. The top level has keys representing elasticsearch document types.
* Each of these has a value which is an array of actual Elasticsearch documents or deletion requests, with their
* key being the matching record in the search_elasticsearch_queue table.
* @param callback $processfunction A callback function to bulk-request each slice of documetns
*/
private static function send_queued_items_in_bulk($documents, $processfunction, $requestlimit, $elasticaIndex) {
$uploadcount = 0;
$batchcount = 0;
$errorcount = 0;
// Bulk insert into index
foreach ($documents as $type => $docs) {
$elasticaType = $elasticaIndex->getType($type);
$elasticaType->addDocuments($docs);
for ($i = 0; $i < count($docs); $i += $requestlimit) {
$requestdocs = array_slice($docs, $i, $requestlimit, true);
$ids = array_keys($requestdocs);
$questionmarks = implode(',', array_fill(0, count($ids), '?'));
$time = db_format_timestamp(time());
// Mark them before sending, in case the request fails.
$sql = 'UPDATE {search_elasticsearch_queue} SET status = ?, lastprocessed = ? WHERE id IN (' . $questionmarks . ')';
execute_sql(
$sql,
array_merge(
array(
self::queue_status_sent_in_bulk,
$time
),
$ids
)
);
// Send them
try {
$batchcount++;
$uploadcount += count($requestdocs);
if ($batchcount % 10 == 0) {
log_info(" batches: {$batchcount}; records: {$uploadcount}; errors: {$errorcount}");
}
$response = $processfunction($requestdocs, $elasticaType);
if ($response->hasError()) {
log_warn("Error from Elasticsearch trying to send bulk request at time {$time}: " . $response->getError());
$errorcount++;
}
else {
// Delete them (since they've been sent successfully)
delete_records_select('search_elasticsearch_queue', 'id IN (' . $questionmarks. ')', $ids);
}
}
catch (Exception $e) {
$errorcount++;
log_warn('Exception sending elasticsearch request at time ' . $time . ': ' . $e->getMessage() );
}
}
}
log_info(" batches: {$batchcount}; records: {$uploadcount}; errors: {$errorcount}");
if ($errorcount) {
log_info(" The records in the {$errorcount} errored batches will be queued for individual indexing");
}
}
// Refresh Index
$elasticaIndex->refresh();
/**
* Upload a set of items to Elasticsearch individually
* @param array $documents A multi-dimensional array. The top level has keys representing elasticsearch document types.
* Each of these has a value which is an array of actual Elasticsearch documents or deletion requests, with their
* key being the matching record in the search_elasticsearch_queue table.
* @param callback $processfunction A callback function to bulk-request each slice of documetns
*/
private static function send_queued_items_individually($documents, $processfunction, $requestlimit, $elasticaIndex) {
$uploadcount = 0;
$errorcount = 0;
// Bulk insert into index
foreach ($documents as $type => $docs) {
$elasticaType = $elasticaIndex->getType($type);
foreach ($docs as $queueid => $doc) {
update_record(
'search_elasticsearch_queue',
(object) array(
'id' => $queueid,
'status' => self::queue_status_sent_individually,
'lastprocessed' => db_format_timestamp(time())
)
);
// Send it
try {
$uploadcount++;
if ($uploadcount % 20 == 0) {
log_info(" uploads: {$uploadcount}; errors: {$errorcount}");
}
$response = $processfunction($doc, $elasticaType);
if ($response->hasError()) {
$errorcount++;
log_warn("Error from Elasticsearch trying to send individual record {$queueid}: " . $response->getError());
}
else {
// No errors! Go ahead and delete it from the queue
delete_records('search_elasticsearch_queue', 'id', $queueid);
}
}
catch (Exception $e) {
$errorcount++;
log_warn('Exception sending elasticsearch record ' . $queueid . ': ' . $e->getMessage() );
}
}
}
log_info(" uploads: {$uploadcount}; errors: {$errorcount}");
}
public static function search_all ($query_string, $limit, $offset = 0, $options=array(), $mainfacetterm = null, $subfacet = null) {
global $USER;
return ElasticsearchPseudotype_all::search($query_string, $limit, $offset, $options, $mainfacetterm, $USER);
......
......@@ -13,5 +13,5 @@ defined('INTERNAL') || die();
$config = new stdClass();
$config->name = 'elasticsearch';
$config->version = 2015012800;
$config->release = '1.0.1';
$config->version = 2015060900;
$config->release = '1.0.2';
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment