Skip to content

Supporting protobuf message support #1329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -38,16 +38,16 @@ services:
- PREDIS_DSN=redis+predis://redis
- PHPREDIS_DSN=redis+phpredis://redis
- GPS_DSN=gps:?projectId=mqdev&emulatorHost=http://google-pubsub:8085
- SQS_DSN=sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4576&version=latest
- SNS_DSN=sns:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4575&version=latest
- SNSQS_DSN=snsqs:?key=key&secret=secret&region=us-east-1&sns_endpoint=http://localstack:4575&sqs_endpoint=http://localstack:4576&version=latest
- SQS_DSN=sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4566&version=latest
- SNS_DSN=sns:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4566&version=latest
- SNSQS_DSN=snsqs:?key=key&secret=secret&region=us-east-1&sns_endpoint=http://localstack:4566&sqs_endpoint=http://localstack:4566&version=latest
- WAMP_DSN=wamp://thruway:9090
- REDIS_HOST=redis
- REDIS_PORT=6379
- AWS_SQS_KEY=key
- AWS_SQS_SECRET=secret
- AWS_SQS_REGION=us-east-1
- AWS_SQS_ENDPOINT=http://localstack:4576
- AWS_SQS_ENDPOINT=http://localstack:4566
- AWS_SQS_VERSION=latest
- BEANSTALKD_DSN=beanstalk://beanstalkd:11300
- GEARMAN_DSN=gearman://gearmand:4730
@@ -127,10 +127,9 @@ services:
- '9090:9090'

localstack:
image: 'localstack/localstack:0.8.10'
image: 'localstack/localstack:3.0.2'
ports:
- '4576:4576'
- '4575:4575'
- '4566:4566'
environment:
HOSTNAME_EXTERNAL: 'localstack'
SERVICES: 'sqs,sns'
2 changes: 1 addition & 1 deletion docker/bin/test.sh
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ waitForService gearmand 4730 50
waitForService kafka 9092 50
waitForService mongo 27017 50
waitForService thruway 9090 50
waitForService localstack 4576 50
waitForService localstack 4566 50

php docker/bin/refresh-mysql-database.php || exit 1
php docker/bin/refresh-postgres-database.php || exit 1
6 changes: 4 additions & 2 deletions pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php
Original file line number Diff line number Diff line change
@@ -158,10 +158,11 @@ public function testConsumerReceiveMessageFromTopicDirectly()
$this->amqpContext->declareTopic($topic);

$consumer = $this->amqpContext->createConsumer($topic);
//guard
// guard
$this->assertNull($consumer->receive(1000));

$message = $this->amqpContext->createMessage(__METHOD__);
$message->setDeliveryTag(0);

$producer = $this->amqpContext->createProducer();
$producer->send($topic, $message);
@@ -181,10 +182,11 @@ public function testConsumerReceiveMessageWithZeroTimeout()
$this->amqpContext->declareTopic($topic);

$consumer = $this->amqpContext->createConsumer($topic);
//guard
// guard
$this->assertNull($consumer->receive(1000));

$message = $this->amqpContext->createMessage(__METHOD__);
$message->setDeliveryTag(0);

$producer = $this->amqpContext->createProducer();
$producer->send($topic, $message);
15 changes: 9 additions & 6 deletions pkg/gps/GpsConnectionFactory.php
Original file line number Diff line number Diff line change
@@ -77,21 +77,22 @@ public function createContext(): Context
if ($this->config['lazy']) {
return new GpsContext(function () {
return $this->establishConnection();
});
}, [
'serilalizeToJson' => $this->config['serilalizeToJson'],
]);
}

return new GpsContext($this->establishConnection());
return new GpsContext($this->establishConnection(), [
'serilalizeToJson' => $this->config['serilalizeToJson'],
]);
}

private function parseDsn(string $dsn): array
{
$dsn = Dsn::parseFirst($dsn);

if ('gps' !== $dsn->getSchemeProtocol()) {
throw new \LogicException(sprintf(
'The given scheme protocol "%s" is not supported. It must be "gps"',
$dsn->getSchemeProtocol()
));
throw new \LogicException(sprintf('The given scheme protocol "%s" is not supported. It must be "gps"', $dsn->getSchemeProtocol()));
}

$emulatorHost = $dsn->getString('emulatorHost');
@@ -105,6 +106,7 @@ private function parseDsn(string $dsn): array
'emulatorHost' => $emulatorHost,
'hasEmulator' => $hasEmulator,
'lazy' => $dsn->getBool('lazy'),
'serilalizeToJson' => $dsn->getBool('serilalizeToJson'),
]), function ($value) { return null !== $value; });
}

@@ -121,6 +123,7 @@ private function defaultConfig(): array
{
return [
'lazy' => true,
'serilalizeToJson' => true,
];
}
}
9 changes: 8 additions & 1 deletion pkg/gps/GpsConsumer.php
Original file line number Diff line number Diff line change
@@ -110,7 +110,14 @@ private function getSubscription(): Subscription

private function convertMessage(GoogleMessage $message): GpsMessage
{
$gpsMessage = GpsMessage::jsonUnserialize($message->data());
$options = $this->context->getOptions();

if ($options['serilalizeToJson']) {
$gpsMessage = GpsMessage::jsonUnserialize($message->data());
} else {
$gpsMessage = new GpsMessage($message->data(), $message->attributes());
}

$gpsMessage->setNativeMessage($message);

return $gpsMessage;
17 changes: 7 additions & 10 deletions pkg/gps/GpsContext.php
Original file line number Diff line number Diff line change
@@ -52,11 +52,7 @@ public function __construct($client, array $options = [])
} elseif (is_callable($client)) {
$this->clientFactory = $client;
} else {
throw new \InvalidArgumentException(sprintf(
'The $client argument must be either %s or callable that returns %s once called.',
PubSubClient::class,
PubSubClient::class
));
throw new \InvalidArgumentException(sprintf('The $client argument must be either %s or callable that returns %s once called.', PubSubClient::class, PubSubClient::class));
}
}

@@ -148,16 +144,17 @@ public function getClient(): PubSubClient
if (false == $this->client) {
$client = call_user_func($this->clientFactory);
if (false == $client instanceof PubSubClient) {
throw new \LogicException(sprintf(
'The factory must return instance of %s. It returned %s',
PubSubClient::class,
is_object($client) ? get_class($client) : gettype($client)
));
throw new \LogicException(sprintf('The factory must return instance of %s. It returned %s', PubSubClient::class, is_object($client) ? get_class($client) : gettype($client)));
}

$this->client = $client;
}

return $this->client;
}

public function getOptions(): array
{
return $this->options;
}
}
21 changes: 18 additions & 3 deletions pkg/gps/Tests/GpsConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
@@ -41,9 +41,6 @@ public function testThrowIfDsnCouldNotBeParsed()

/**
* @dataProvider provideConfigs
*
* @param mixed $config
* @param mixed $expectedConfig
*/
public function testShouldParseConfigurationAsExpected($config, $expectedConfig)
{
@@ -58,20 +55,23 @@ public static function provideConfigs()
null,
[
'lazy' => true,
'serilalizeToJson' => true,
],
];

yield [
'gps:',
[
'lazy' => true,
'serilalizeToJson' => true,
],
];

yield [
[],
[
'lazy' => true,
'serilalizeToJson' => true,
],
];

@@ -83,6 +83,7 @@ public static function provideConfigs()
'emulatorHost' => 'http://google-pubsub:8085',
'hasEmulator' => true,
'lazy' => true,
'serilalizeToJson' => true,
],
];

@@ -94,6 +95,7 @@ public static function provideConfigs()
'emulatorHost' => 'http://google-pubsub:8085',
'hasEmulator' => true,
'lazy' => true,
'serilalizeToJson' => true,
],
];

@@ -104,6 +106,19 @@ public static function provideConfigs()
'projectId' => 'mqdev',
'emulatorHost' => 'http://Fgoogle-pubsub:8085',
'lazy' => false,
'serilalizeToJson' => true,
],
];

yield [
['dsn' => 'gps:?foo=fooVal&projectId=mqdev&emulatorHost=http%3A%2F%2Fgoogle-pubsub%3A8085&serilalizeToJson=false'],
[
'foo' => 'fooVal',
'projectId' => 'mqdev',
'emulatorHost' => 'http://google-pubsub:8085',
'hasEmulator' => true,
'lazy' => true,
'serilalizeToJson' => false,
],
];
}
98 changes: 98 additions & 0 deletions pkg/gps/Tests/GpsConsumerTest.php
Original file line number Diff line number Diff line change
@@ -131,6 +131,11 @@ public function testShouldReceiveMessageNoWait()
->willReturn($client)
;

$context
->expects($this->once())
->method('getOptions')
->willReturn(['serilalizeToJson' => true]);

$consumer = new GpsConsumer($context, new GpsQueue('queue-name'));

$message = $consumer->receiveNoWait();
@@ -171,6 +176,11 @@ public function testShouldReceiveMessage()
->willReturn($client)
;

$context
->expects($this->once())
->method('getOptions')
->willReturn(['serilalizeToJson' => true]);

$consumer = new GpsConsumer($context, new GpsQueue('queue-name'));

$message = $consumer->receive(12345);
@@ -179,6 +189,94 @@ public function testShouldReceiveMessage()
$this->assertSame('the body', $message->getBody());
}

public function testShouldReceiveMessageUnSerialize()
{
$message = new GpsMessage('the body');
$nativeMessage = new Message([
'data' => json_encode($message),
], []);

$subscription = $this->createSubscriptionMock();
$subscription
->expects($this->once())
->method('pull')
->with($this->identicalTo([
'maxMessages' => 1,
'requestTimeout' => 12.345,
]))
->willReturn([$nativeMessage]);

$client = $this->createPubSubClientMock();
$client
->expects($this->once())
->method('subscription')
->willReturn($subscription);

$context = $this->createContextMock();
$context
->expects($this->once())
->method('getClient')
->willReturn($client);
$context
->expects($this->once())
->method('getOptions')
->willReturn(['serilalizeToJson' => false]);

$consumer = new GpsConsumer($context, new GpsQueue('queue-name'));

$message = $consumer->receive(12345);

$this->assertInstanceOf(GpsMessage::class, $message);
$this->assertSame($nativeMessage->data(), $message->getBody());
$this->assertSame($nativeMessage->attributes(), $message->getProperties());
}

public function testShouldReceiveMessageUnSerializeWithAttributes()
{
$message = new GpsMessage('the body');
$nativeMessage = new Message([
'data' => json_encode($message),
'attributes' => [
'foo' => 'fooVal',
'bar' => 'barVal',
],
], []);

$subscription = $this->createSubscriptionMock();
$subscription
->expects($this->once())
->method('pull')
->with($this->identicalTo([
'maxMessages' => 1,
'requestTimeout' => 12.345,
]))
->willReturn([$nativeMessage]);

$client = $this->createPubSubClientMock();
$client
->expects($this->once())
->method('subscription')
->willReturn($subscription);

$context = $this->createContextMock();
$context
->expects($this->once())
->method('getClient')
->willReturn($client);
$context
->expects($this->once())
->method('getOptions')
->willReturn(['serilalizeToJson' => false]);

$consumer = new GpsConsumer($context, new GpsQueue('queue-name'));

$message = $consumer->receive(12345);

$this->assertInstanceOf(GpsMessage::class, $message);
$this->assertSame($nativeMessage->data(), $message->getBody());
$this->assertSame($nativeMessage->attributes(), $message->getProperties());
}

/**
* @return \PHPUnit\Framework\MockObject\MockObject|GpsContext
*/
7 changes: 7 additions & 0 deletions pkg/gps/Tests/GpsContextTest.php
Original file line number Diff line number Diff line change
@@ -86,6 +86,13 @@ public function testCreateConsumerShouldThrowExceptionIfInvalidDestination()
$context->createConsumer(new GpsTopic(''));
}

public function testShouldReturnOptions()
{
$context = new GpsContext($this->createPubSubClientMock(), ['foo' => 'fooVal']);

$this->assertSame(['ackDeadlineSeconds' => 10, 'foo' => 'fooVal'], $context->getOptions());
}

/**
* @return PubSubClient|\PHPUnit\Framework\MockObject\MockObject|PubSubClient
*/
2 changes: 1 addition & 1 deletion pkg/gps/Tests/GpsMessageTest.php
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ public function testCouldBeUnserializedFromJson()

$json = json_encode($message);

//guard
// guard
$this->assertNotEmpty($json);

$unserializedMessage = GpsMessage::jsonUnserialize($json);
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ protected function createContext()

/**
* @param GpsContext $context
* @param mixed $queueName
*/
protected function createQueue(Context $context, $queueName)
{
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ protected function createContext()

/**
* @param GpsContext $context
* @param mixed $queueName
*/
protected function createQueue(Context $context, $queueName)
{