Skip to content

Commit a762113

Browse files
committed
More checkpointing
1 parent b07108e commit a762113

File tree

10 files changed

+141
-127
lines changed

10 files changed

+141
-127
lines changed

src/Elasticsearch/Client.php

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,17 @@
77

88
namespace Elasticsearch;
99

10-
use Elasticsearch\Common\DICBuilder;
11-
use Elasticsearch\Common\EmptyLogger;
10+
1211
use Elasticsearch\Common\Exceptions;
1312
use Elasticsearch\Common\Exceptions\Missing404Exception;
14-
use Elasticsearch\Common\Exceptions\RoutingMissingException;
15-
use Elasticsearch\Common\Exceptions\UnexpectedValueException;
1613
use Elasticsearch\Endpoints;
1714
use Elasticsearch\Namespaces\CatNamespace;
1815
use Elasticsearch\Namespaces\ClusterNamespace;
1916
use Elasticsearch\Namespaces\IndicesNamespace;
2017
use Elasticsearch\Namespaces\NamespaceFutureUtil;
2118
use Elasticsearch\Namespaces\NodesNamespace;
2219
use Elasticsearch\Namespaces\SnapshotNamespace;
20+
use Elasticsearch\Namespaces\BooleanRequestWrapper;
2321

2422

2523
/**
@@ -561,7 +559,8 @@ public function exists($params)
561559

562560
$type = $this->extractArgument($params, 'type');
563561

564-
562+
//manually make this verbose so we can check status code
563+
$params['verbose'] = true;
565564

566565
/** @var callback $endpointBuilder */
567566
$endpointBuilder = $this->endpoints;
@@ -573,20 +572,8 @@ public function exists($params)
573572
->setType($type);
574573
$endpoint->setParams($params);
575574

576-
try {
577-
$response = $endpoint->performRequest();
578-
} catch (Missing404Exception $exception) {
579-
return false;
580-
} catch (RoutingMissingException $exception) {
581-
return false;
582-
}
583-
575+
return BooleanRequestWrapper::performRequest($endpoint);
584576

585-
if ($response['status'] === 200) {
586-
return true;
587-
} else {
588-
return false;
589-
}
590577
}
591578

592579

src/Elasticsearch/Connections/Connection.php

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public function __construct($handler, $hostDetails, $connectionParams,
131131
*
132132
* @return mixed
133133
*/
134-
public function performRequest($method, $uri, $params = null, $body = null, $options = array())
134+
public function performRequest($method, $uri, $params = null, $body = null, $options = array(), $ignore = [])
135135
{
136136
if (isset($body) === true) {
137137
$body = $this->serializer->serialize($body);
@@ -145,10 +145,16 @@ public function performRequest($method, $uri, $params = null, $body = null, $opt
145145
'client' => ['timeout' => 1.0], //TODO fix this!
146146
'headers' => [
147147
'host' => [$this->host]
148-
]
148+
],
149+
'ignore' => $ignore
150+
149151
];
150152
$request = array_merge_recursive($request, $this->connectionParams, $options);
151153

154+
// make sure we start at the beginning of the stream, so it stays a moderate size
155+
// better strategy to let grow and rewind/reset periodically? need to bench
156+
rewind($this->connectionParams['client']['save_to']);
157+
152158
$handler = $this->handler;
153159
$future = $handler($request, $this);
154160

@@ -173,19 +179,16 @@ private function wrapHandler (callable $handler, LoggerInterface $logger, Logger
173179
{
174180
return function (array $request, Connection $connection) use ($handler, $logger, $tracer) {
175181
// Send the request using the wrapped handler.
176-
return Core::proxy($handler($request), function ($response) use ($connection, $logger, $tracer) {
177-
// Add the headers to the response when it is available.
178-
//foreach ($headers as $key => $value) {
179-
// $response['headers'][$key] = (array) $value;
180-
//}
181-
// Note that you can return a regular response array when using
182-
// the proxy method.
182+
return Core::proxy($handler($request), function ($response) use ($connection, $logger, $tracer, $request) {
183+
183184
if (isset($response['error']) === true) {
184185

185186
if ($response['error'] instanceof ConnectException) {
186187
$this->throwCurlException($response['curl']['errno'], $response['error']->getMessage());
187188
} else if ($response['error'] instanceof RingException) {
188189
throw new TransportException($response['error']->getMessage());
190+
} else {
191+
throw new TransportException($response['error']->getMessage());
189192
}
190193
} else {
191194
$connection->markAlive();
@@ -194,9 +197,9 @@ private function wrapHandler (callable $handler, LoggerInterface $logger, Logger
194197
$response['body'] = stream_get_contents($response['body'], $response['headers']['Content-Length'][0]);
195198

196199
if ($response['status'] >= 400 && $response['status'] < 500) {
197-
$this->process4xxError($response['status'], $response['body']);
200+
$this->process4xxError($response['status'], $response['body'], $request['ignore']);
198201
} else if ($response['status'] >= 500) {
199-
$this->process5xxError($response['status'], $response['body']);
202+
$this->process5xxError($response['status'], $response['body'], $request['ignore']);
200203
}
201204

202205
// No error, deserialize
@@ -477,11 +480,15 @@ private function buildCurlCommand($method, $uri, $body)
477480
* @throws \Elasticsearch\Common\Exceptions\Missing404Exception
478481
* @throws \Elasticsearch\Common\Exceptions\AlreadyExpiredException
479482
*/
480-
private function process4xxError($statusCode, $responseBody)
483+
private function process4xxError($statusCode, $responseBody, $ignore)
481484
{
482485

483486
//$exceptionText = "$statusCode Server Exception: $exceptionText\n$responseBody";
484487

488+
if (array_search($statusCode, $ignore) !== false) {
489+
return;
490+
}
491+
485492
if ($statusCode === 400 && strpos($responseBody, "AlreadyExpiredException") !== false) {
486493
throw new AlreadyExpiredException($responseBody, $statusCode);
487494
} elseif ($statusCode === 403) {
@@ -508,7 +515,7 @@ private function process4xxError($statusCode, $responseBody)
508515
* @throws \Elasticsearch\Common\Exceptions\NoDocumentsToGetException
509516
* @throws \Elasticsearch\Common\Exceptions\ServerErrorResponseException
510517
*/
511-
private function process5xxError($response)
518+
private function process5xxError($response, $ignore)
512519
{
513520
$statusCode = $response['requestInfo']['http_code'];
514521
$exceptionText = $response['error'];
@@ -517,6 +524,10 @@ private function process5xxError($response)
517524
$exceptionText = "$statusCode Server Exception: $exceptionText\n$responseBody";
518525
$this->log->error($exceptionText);
519526

527+
if (array_search($statusCode, $ignore) !== false) {
528+
return;
529+
}
530+
520531
if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) {
521532
throw new RoutingMissingException($responseBody, $statusCode);
522533
} elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/',$responseBody) === 1) {

src/Elasticsearch/Endpoints/AbstractEndpoint.php

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
use Elasticsearch\Common\Exceptions\UnexpectedValueException;
1212
use Elasticsearch\Transport;
13+
use Exception;
1314

1415
/**
1516
* Class AbstractEndpoint
@@ -39,7 +40,10 @@ abstract class AbstractEndpoint
3940
private $transport = null;
4041

4142
/** @var array */
42-
private $ignore = null;
43+
private $ignore = [];
44+
45+
/** @var bool */
46+
private $verbose = false;
4347

4448
/** @var array */
4549
private $clientParams = [];
@@ -77,29 +81,17 @@ public function __construct($transport)
7781
*/
7882
public function performRequest()
7983
{
80-
$result = array();
8184

82-
try {
83-
$result = $this->transport->performRequest(
84-
$this->getMethod(),
85-
$this->getURI(),
86-
$this->params,
87-
$this->getBody(),
88-
$this->clientParams
89-
);
90-
} catch (\Exception $exception) {
91-
$code = $exception->getCode();
92-
if ($this->ignore === null) {
93-
throw $exception;
94-
} else if (array_search($code, $this->ignore) === false) {
95-
throw $exception;
96-
} else {
97-
//TODO return null or dedicated object here instead?
98-
return array('data' => $exception->getMessage());
99-
}
100-
}
85+
$promise = $this->transport->performRequest(
86+
$this->getMethod(),
87+
$this->getURI(),
88+
$this->params,
89+
$this->getBody(),
90+
$this->clientParams,
91+
$this->ignore
92+
);
10193

102-
return $result;
94+
return $promise;
10395

10496
}
10597

@@ -119,7 +111,7 @@ public function setParams($params)
119111
$params = $this->convertCustom($params);
120112
$this->extractRingOptions($params);
121113
$this->params = $this->convertArraysToStrings($params);
122-
$this->extractIgnore();
114+
$this->extractIgnoreVerbose();
123115
return $this;
124116
}
125117

@@ -188,12 +180,19 @@ public function setID($docID)
188180
public function resultOrFuture($result)
189181
{
190182

183+
$response = null;
191184
$async = isset($this->clientParams['client']['future']) ? $this->clientParams['client']['future'] : null;
192185
if (is_null($async) || $async === false) {
193-
return $result->wait();
186+
$response = $result->wait();
194187
} elseif ($async === true || $async === 'lazy') {
195188
return $result;
196189
}
190+
191+
if (isset($this->verbose) && $this->verbose == true) {
192+
return $response;
193+
} else {
194+
return $response['body'];
195+
}
197196
}
198197

199198

@@ -278,7 +277,7 @@ private function ifParamsInvalidThrowException($params)
278277
return; //no params, just return.
279278
}
280279

281-
$whitelist = array_merge($this->getParamWhitelist(), array('ignore', 'custom', 'curlOpts', 'client'));
280+
$whitelist = array_merge($this->getParamWhitelist(), array('ignore', 'verbose', 'custom', 'curlOpts', 'client'));
282281

283282
foreach ($params as $key => $value) {
284283
if (array_search($key, $whitelist) === false) {
@@ -293,12 +292,17 @@ private function ifParamsInvalidThrowException($params)
293292
}
294293

295294

296-
private function extractIgnore()
295+
private function extractIgnoreVerbose()
297296
{
298297
if (isset($this->params['ignore']) === true) {
299298
$this->ignore = explode(",", $this->params['ignore']);
300299
unset($this->params['ignore']);
301300
}
301+
302+
if (isset($this->params['verbose']) === true) {
303+
$this->verbose = $this->params['verbose'];
304+
unset($this->params['verbose']);
305+
}
302306
}
303307

304308
private function extractRingOptions(&$params)

src/Elasticsearch/Namespaces/AbstractNamespace.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace Elasticsearch\Namespaces;
99

10-
use Elasticsearch\Common\Exceptions\UnexpectedValueException;
10+
1111
use Elasticsearch\Transport;
1212

1313
/**
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
/**
3+
* Created by JetBrains PhpStorm.
4+
* User: tongz
5+
* Date: 2/6/15
6+
* Time: 6:36 PM
7+
* To change this template use File | Settings | File Templates.
8+
*/
9+
10+
namespace Elasticsearch\Namespaces;
11+
12+
13+
use Elasticsearch\Common\Exceptions\Missing404Exception;
14+
use Elasticsearch\Common\Exceptions\RoutingMissingException;
15+
use Elasticsearch\Endpoints\AbstractEndpoint;
16+
use GuzzleHttp\Ring\Future\FutureArrayInterface;
17+
18+
trait BooleanRequestWrapper {
19+
public static function performRequest(AbstractEndpoint $endpoint)
20+
{
21+
try {
22+
$response = $endpoint->performRequest();
23+
$response = $endpoint->resultOrFuture($response);
24+
if (!($response instanceof FutureArrayInterface)) {
25+
if ($response['status'] === 200) {
26+
return true;
27+
} else {
28+
return false;
29+
}
30+
} else {
31+
// async mode, can't easily resolve this...punt to user
32+
return $response;
33+
}
34+
35+
} catch (Missing404Exception $exception) {
36+
return false;
37+
} catch (RoutingMissingException $exception) {
38+
return false;
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)