Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/main/java/com/timgroup/statsd/StatsDAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class StatsDAggregator {
public static int DEFAULT_FLUSH_INTERVAL = 2000; // 2s
public static int DEFAULT_SHARDS = 4; // 4 partitions to reduce contention.

protected final String AGGREGATOR_THREAD_NAME = "statsd-aggregator-thread";

protected final ArrayList<Map<Message, Message>> aggregateMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to keep this PR about the lock only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private final Lock[] locks;

protected final int shardGranularity;
protected final long flushInterval;
Expand Down Expand Up @@ -44,13 +48,15 @@ public StatsDAggregator(
this.flushInterval = flushInterval;
this.shardGranularity = shards;
this.aggregateMetrics = new ArrayList<>(shards);
this.locks = new ReentrantLock[shards];

if (flushInterval > 0) {
this.scheduler = new Timer(AGGREGATOR_THREAD_NAME, true);
}

for (int i = 0; i < this.shardGranularity; i++) {
this.aggregateMetrics.add(i, new HashMap<Message, Message>());
this.locks[i] = new ReentrantLock();
}
}

Expand Down Expand Up @@ -86,7 +92,8 @@ public boolean aggregateMessage(Message message) {
int bucket = Math.abs(hash % this.shardGranularity);
Map<Message, Message> map = aggregateMetrics.get(bucket);

synchronized (map) {
locks[bucket].lock();
try {
// For now let's just put the message in the map
Message msg = MapUtils.putIfAbsent(map, message);
if (msg != null) {
Expand All @@ -110,6 +117,8 @@ public boolean aggregateMessage(Message message) {
}
}
}
} finally {
locks[bucket].unlock();
}

return true;
Expand All @@ -127,7 +136,8 @@ protected void flush() {
for (int i = 0; i < shardGranularity; i++) {
Map<Message, Message> map = aggregateMetrics.get(i);

synchronized (map) {
locks[i].lock();
try {
Iterator<Map.Entry<Message, Message>> iter = map.entrySet().iterator();
while (iter.hasNext()) {
Message msg = iter.next().getValue();
Expand All @@ -139,6 +149,8 @@ protected void flush() {

iter.remove();
}
} finally {
locks[i].unlock();
}
}
}
Expand Down
18 changes: 8 additions & 10 deletions src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,17 +254,15 @@ public int hashCode() {

for (int i = 0; i < StatsDAggregator.DEFAULT_SHARDS; i++) {
Map<Message, Message> map = fakeProcessor.aggregator.aggregateMetrics.get(i);
synchronized (map) {
Iterator<Map.Entry<Message, Message>> iter = map.entrySet().iterator();
int count = 0;
while (iter.hasNext()) {
count++;
iter.next();
}

// sharding should be balanced
assertEquals(iterations, count);
Iterator<Map.Entry<Message, Message>> iter = map.entrySet().iterator();
int count = 0;
while (iter.hasNext()) {
count++;
iter.next();
}

// sharding should be balanced
assertEquals(iterations, count);
}
}

Expand Down