Commit 2d5e46e3 authored by Robert Lyon's avatar Robert Lyon Committed by Gerrit Code Review
Browse files

Merge "Remove the index name from elasticsearch JSON data (Bug 1457709)"

parents 3670c3f1 a02be21a
......@@ -649,8 +649,7 @@ class PluginSearchElasticsearch extends PluginSearch {
function($records, $type) use ($elasticaClient, $indexname) {
return $elasticaClient->deleteIds($records, $indexname, $type);
},
$requestlimit,
$elasticaIndex
$requestlimit
);
}
// Send in bulk
......@@ -662,11 +661,10 @@ class PluginSearchElasticsearch extends PluginSearch {
log_info(" {$doccount} documents to index in bulk...");
self::send_queued_items_in_bulk(
$documents,
function($records, $type) {
return $type->addDocuments($records);
function($records, $type) use ($elasticaClient, $elasticaIndex) {
return $elasticaClient->addDocuments($records, $elasticaIndex);
},
$requestlimit,
$elasticaIndex
$requestlimit
);
}
}
......@@ -688,8 +686,7 @@ class PluginSearchElasticsearch extends PluginSearch {
function($record, $type) use ($elasticaClient, $indexname) {
return $elasticaClient->deleteIds(array($record), $indexname, $type);
},
$requestlimit,
$elasticaIndex
$requestlimit
);
}
......@@ -702,11 +699,10 @@ class PluginSearchElasticsearch extends PluginSearch {
log_info(" {$doccount} documents to index individually...");
self::send_queued_items_individually(
$documents,
function($record, $type) {
return $type->addDocuments(array($record));
function($record, $type) use ($elasticaClient, $elasticaIndex) {
return $elasticaClient->addDocuments(array($record), $elasticaIndex);
},
$requestlimit,
$elasticaIndex
$requestlimit
);
}
}
......@@ -750,7 +746,7 @@ class PluginSearchElasticsearch extends PluginSearch {
}
// Add item for bulk index
else {
$documents[$record->type][$record->id] = new \Elastica\Document($record->itemid, $item->getMapping());
$documents[$record->type][$record->id] = new \Elastica\Document($record->itemid, $item->getMapping(), $record->type);
}
}
return array(
......@@ -766,14 +762,13 @@ class PluginSearchElasticsearch extends PluginSearch {
* key being the matching record in the search_elasticsearch_queue table.
* @param callback $processfunction A callback function to bulk-request each slice of documetns
*/
private static function send_queued_items_in_bulk($documents, $processfunction, $requestlimit, $elasticaIndex) {
private static function send_queued_items_in_bulk($documents, $processfunction, $requestlimit) {
$uploadcount = 0;
$batchcount = 0;
$errorcount = 0;
// Bulk insert into index
foreach ($documents as $type => $docs) {
$elasticaType = $elasticaIndex->getType($type);
for ($i = 0; $i < count($docs); $i += $requestlimit) {
$requestdocs = array_slice($docs, $i, $requestlimit, true);
$ids = array_keys($requestdocs);
......@@ -800,7 +795,7 @@ class PluginSearchElasticsearch extends PluginSearch {
if ($batchcount % 10 == 0) {
log_info(" batches: {$batchcount}; records: {$uploadcount}; errors: {$errorcount}");
}
$response = $processfunction($requestdocs, $elasticaType);
$response = $processfunction($requestdocs, $type);
if ($response->hasError()) {
log_warn("Error from Elasticsearch trying to send bulk request at time {$time}: " . $response->getError());
......@@ -831,13 +826,12 @@ class PluginSearchElasticsearch extends PluginSearch {
* key being the matching record in the search_elasticsearch_queue table.
* @param callback $processfunction A callback function to bulk-request each slice of documetns
*/
private static function send_queued_items_individually($documents, $processfunction, $requestlimit, $elasticaIndex) {
private static function send_queued_items_individually($documents, $processfunction, $requestlimit) {
$uploadcount = 0;
$errorcount = 0;
// Bulk insert into index
foreach ($documents as $type => $docs) {
$elasticaType = $elasticaIndex->getType($type);
foreach ($docs as $queueid => $doc) {
update_record(
'search_elasticsearch_queue',
......@@ -853,7 +847,7 @@ class PluginSearchElasticsearch extends PluginSearch {
if ($uploadcount % 20 == 0) {
log_info(" uploads: {$uploadcount}; errors: {$errorcount}");
}
$response = $processfunction($doc, $elasticaType);
$response = $processfunction($doc, $type);
if ($response->hasError()) {
$errorcount++;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment