Skip to content

Commit 1ec1b2a

Browse files
committed
Add code
0 parents  commit 1ec1b2a

File tree

12 files changed

+320
-0
lines changed

12 files changed

+320
-0
lines changed

.gitignore

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
HELP.md
2+
target/
3+
!.mvn/wrapper/maven-wrapper.jar
4+
!**/src/main/**
5+
!**/src/test/**
6+
7+
### STS ###
8+
.apt_generated
9+
.classpath
10+
.factorypath
11+
.project
12+
.settings
13+
.springBeans
14+
.sts4-cache
15+
16+
### IntelliJ IDEA ###
17+
.idea
18+
*.iws
19+
*.iml
20+
*.ipr
21+
22+
### NetBeans ###
23+
/nbproject/private/
24+
/nbbuild/
25+
/dist/
26+
/nbdist/
27+
/.nb-gradle/
28+
build/
29+
30+
### VS Code ###
31+
.vscode/
32+
.DS_Store

pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.example</groupId>
8+
<artifactId>message-queue-pub-sub</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
<dependencies>
11+
<dependency>
12+
<groupId>org.projectlombok</groupId>
13+
<artifactId>lombok</artifactId>
14+
<version>RELEASE</version>
15+
<scope>compile</scope>
16+
</dependency>
17+
</dependencies>
18+
19+
<properties>
20+
<maven.compiler.source>15</maven.compiler.source>
21+
<maven.compiler.target>15</maven.compiler.target>
22+
</properties>
23+
24+
</project>

problem-statement.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
## Message Queue
2+
We have to design a message queue supporting publisher-subscriber model. It should support following operations:
3+
4+
* It should support multiple topics where messages can be published.
5+
* Publisher should be able to publish a message to a particular topic.
6+
* Subscribers should be able to subscribe to a topic.
7+
* Whenever a message is published to a topic, all the subscribers, who are subscribed to that topic, should receive the message.
8+
* Subscribers should be able to run in parallel
9+
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.uditagarwal;
2+
3+
import com.uditagarwal.pub_sub_queue.public_interface.Queue;
4+
import com.uditagarwal.pub_sub_queue.model.Message;
5+
import com.uditagarwal.pub_sub_queue.model.Topic;
6+
7+
public class Main {
8+
public static void main(String[] args) throws InterruptedException {
9+
final Queue queue = new Queue();
10+
final Topic topic1 = queue.createTopic("t1");
11+
final Topic topic2 = queue.createTopic("t2");
12+
final SleepingSubscriber sub1 = new SleepingSubscriber("sub1", 10000);
13+
final SleepingSubscriber sub2 = new SleepingSubscriber("sub2", 10000);
14+
queue.subscribe(sub1, topic1);
15+
queue.subscribe(sub2, topic1);
16+
17+
final SleepingSubscriber sub3 = new SleepingSubscriber("sub3", 5000);
18+
queue.subscribe(sub3, topic2);
19+
20+
queue.publish(topic1, new Message("m1"));
21+
queue.publish(topic1, new Message("m2"));
22+
23+
queue.publish(topic2, new Message("m3"));
24+
25+
Thread.sleep(15000);
26+
queue.publish(topic2, new Message("m4"));
27+
queue.publish(topic1, new Message("m5"));
28+
29+
queue.resetOffset(topic1, sub1, 0);
30+
}
31+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.uditagarwal;
2+
3+
import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber;
4+
import com.uditagarwal.pub_sub_queue.model.Message;
5+
6+
public class SleepingSubscriber implements ISubscriber {
7+
private final String id;
8+
private final int sleepTimeInMillis;
9+
10+
public SleepingSubscriber(String id, int sleepTimeInMillis) {
11+
this.id = id;
12+
this.sleepTimeInMillis = sleepTimeInMillis;
13+
}
14+
15+
@Override
16+
public String getId() {
17+
return id;
18+
}
19+
20+
@Override
21+
public void consume(Message message) throws InterruptedException {
22+
System.out.println("Subscriber: " + id + " started consuming: " + message.getMsg());
23+
Thread.sleep(sleepTimeInMillis);
24+
System.out.println("Subscriber: " + id + " done consuming: " + message.getMsg());
25+
}
26+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.uditagarwal.pub_sub_queue.handler;
2+
3+
import com.uditagarwal.pub_sub_queue.model.Message;
4+
import com.uditagarwal.pub_sub_queue.model.Topic;
5+
import com.uditagarwal.pub_sub_queue.model.TopicSubscriber;
6+
import lombok.Getter;
7+
import lombok.NonNull;
8+
import lombok.SneakyThrows;
9+
10+
@Getter
11+
public class SubscriberWorker implements Runnable {
12+
13+
private final Topic topic;
14+
private final TopicSubscriber topicSubscriber;
15+
16+
public SubscriberWorker(@NonNull final Topic topic, @NonNull final TopicSubscriber topicSubscriber) {
17+
this.topic = topic;
18+
this.topicSubscriber = topicSubscriber;
19+
}
20+
21+
@SneakyThrows
22+
@Override
23+
public void run() {
24+
synchronized (topicSubscriber) {
25+
do {
26+
int curOffset = topicSubscriber.getOffset().get();
27+
while (curOffset >= topic.getMessages().size()) {
28+
topicSubscriber.wait();
29+
}
30+
Message message = topic.getMessages().get(curOffset);
31+
topicSubscriber.getSubscriber().consume(message);
32+
33+
// We cannot just increment here since subscriber offset can be reset while it is consuming. So, after
34+
// consuming we need to increase only if it was previous one.
35+
topicSubscriber.getOffset().compareAndSet(curOffset, curOffset + 1);
36+
} while (true);
37+
}
38+
}
39+
40+
synchronized public void wakeUpIfNeeded() {
41+
synchronized (topicSubscriber) {
42+
topicSubscriber.notify();
43+
}
44+
}
45+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.uditagarwal.pub_sub_queue.handler;
2+
3+
import com.uditagarwal.pub_sub_queue.model.Topic;
4+
import com.uditagarwal.pub_sub_queue.model.TopicSubscriber;
5+
import lombok.NonNull;
6+
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
10+
public class TopicHandler {
11+
private final Topic topic;
12+
private final Map<String, SubscriberWorker> subscriberWorkers;
13+
14+
public TopicHandler(@NonNull final Topic topic) {
15+
this.topic = topic;
16+
subscriberWorkers = new HashMap<>();
17+
}
18+
19+
public void publish() {
20+
for (TopicSubscriber topicSubscriber:topic.getSubscribers()) {
21+
startSubsriberWorker(topicSubscriber);
22+
}
23+
}
24+
25+
public void startSubsriberWorker(@NonNull final TopicSubscriber topicSubscriber) {
26+
final String subscriberId = topicSubscriber.getSubscriber().getId();
27+
if (!subscriberWorkers.containsKey(subscriberId)) {
28+
final SubscriberWorker subscriberWorker = new SubscriberWorker(topic, topicSubscriber);
29+
subscriberWorkers.put(subscriberId, subscriberWorker);
30+
new Thread(subscriberWorker).start();
31+
}
32+
final SubscriberWorker subscriberWorker = subscriberWorkers.get(subscriberId);
33+
subscriberWorker.wakeUpIfNeeded();
34+
}
35+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.uditagarwal.pub_sub_queue.model;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Getter;
5+
6+
@AllArgsConstructor
7+
@Getter
8+
public class Message {
9+
private final String msg;
10+
11+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.uditagarwal.pub_sub_queue.model;
2+
3+
import lombok.Getter;
4+
import lombok.NonNull;
5+
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
9+
@Getter
10+
public class Topic {
11+
private final String topicName;
12+
private final String topicId;
13+
private final List<Message> messages; // TODO: Change getter this to send only immutable list outside.
14+
private final List<TopicSubscriber> subscribers; // TODO: Change getter this to send only immutable list outside.
15+
16+
public Topic(@NonNull final String topicName, @NonNull final String topicId) {
17+
this.topicName = topicName;
18+
this.topicId = topicId;
19+
this.messages = new ArrayList<>();
20+
this.subscribers = new ArrayList<>();
21+
}
22+
23+
public synchronized void addMessage(@NonNull final Message message) {
24+
messages.add(message);
25+
}
26+
27+
public void addSubscriber(@NonNull final TopicSubscriber subscriber) {
28+
subscribers.add(subscriber);
29+
}
30+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.uditagarwal.pub_sub_queue.model;
2+
3+
import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Getter;
6+
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
@Getter
10+
@AllArgsConstructor
11+
public class TopicSubscriber {
12+
AtomicInteger offset;
13+
ISubscriber subscriber;
14+
15+
public TopicSubscriber(ISubscriber subscriber) {
16+
this.subscriber = subscriber;
17+
this.offset = new AtomicInteger(0);
18+
}
19+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.uditagarwal.pub_sub_queue.public_interface;
2+
3+
import com.uditagarwal.pub_sub_queue.model.Message;
4+
5+
public interface ISubscriber {
6+
7+
String getId();
8+
void consume(Message message) throws InterruptedException;
9+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.uditagarwal.pub_sub_queue.public_interface;
2+
3+
import com.uditagarwal.pub_sub_queue.handler.TopicHandler;
4+
import com.uditagarwal.pub_sub_queue.model.Message;
5+
import com.uditagarwal.pub_sub_queue.model.Topic;
6+
import com.uditagarwal.pub_sub_queue.model.TopicSubscriber;
7+
import lombok.NonNull;
8+
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
import java.util.UUID;
12+
13+
public class Queue {
14+
Map<String, TopicHandler> topicProcessors;
15+
16+
public Queue() {
17+
this.topicProcessors = new HashMap<>();
18+
}
19+
20+
public Topic createTopic(@NonNull final String topicName) {
21+
final Topic topic = new Topic(topicName, UUID.randomUUID().toString());
22+
TopicHandler topicHandler = new TopicHandler(topic);
23+
topicProcessors.put(topic.getTopicId(), topicHandler);
24+
System.out.println("Created topic: " + topic.getTopicName());
25+
return topic;
26+
}
27+
28+
public void subscribe(@NonNull final ISubscriber subscriber, @NonNull final Topic topic) {
29+
topic.addSubscriber(new TopicSubscriber(subscriber));
30+
System.out.println(subscriber.getId() + " subscribed to topic: " + topic.getTopicName());
31+
}
32+
33+
public void publish(@NonNull final Topic topic, @NonNull final Message message) {
34+
topic.addMessage(message);
35+
System.out.println(message.getMsg() + " published to topic: " + topic.getTopicName());
36+
new Thread(() -> topicProcessors.get(topic.getTopicId()).publish()).start();
37+
}
38+
39+
public void resetOffset(@NonNull final Topic topic, @NonNull final ISubscriber subscriber, @NonNull final Integer newOffset) {
40+
for (TopicSubscriber topicSubscriber : topic.getSubscribers()) {
41+
if (topicSubscriber.getSubscriber().equals(subscriber)) {
42+
topicSubscriber.getOffset().set(newOffset);
43+
System.out.println(topicSubscriber.getSubscriber().getId() + " offset reset to: " + newOffset);
44+
new Thread(() -> topicProcessors.get(topic.getTopicId()).startSubsriberWorker(topicSubscriber)).start();
45+
break;
46+
}
47+
}
48+
}
49+
}

0 commit comments

Comments
 (0)