Query Consolidation to Reduce DB Load

Trading Response Time for CPU Cycles

Let's say we have a database bottleneck problem caused by too frequent database lookups for a specific record type. To reduce database load, the following utility allows us to strategically sacrifice a small amount of application server response time to, in certain use cases, significantly decrease the frequency of queries directed towards the database:


package com.radovation.util;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static java.time.Instant.now;

public class ConsolidatedLookup<K, V> {
  private final Lock lock = new ReentrantLock();
  private final Duration consolidateDuring;
  private final Function<Set<K>, Map<K, V>> lookup;
  private Condition consolidationDone;
  private boolean consolidating;
  private Set<K> keys;
  private Map<K, V> result;

  public ConsolidatedLookup(Duration consolidateDuring, Function<Set<K>, Map<K, V>> lookup) {
    this.consolidateDuring = consolidateDuring;
    this.lookup = lookup;
  }

  public V execute(K key) {
    boolean firstThread = false;
    lock.lock();
    Set<K> localKeys = keys;
    Map<K, V> localResult = result;
    try {
      if (!consolidating) {
        // first thread is the leader and resets data and performs the lookup further down
        firstThread = true;
        localKeys = keys = new HashSet<>();
        localResult = result = new HashMap<>();
        consolidating = true;
        consolidationDone = lock.newCondition();
      }
      localKeys.add(key);
      if (firstThread) {
        Date deadline = Date.from(now().plus(consolidateDuring));
        boolean deadlineReached = false;
        while (!deadlineReached) {
          // always wait on a condition in a loop to guard for spurious wake-ups (see Condition javadoc)
          deadlineReached = !consolidationDone.awaitUntil(deadline);
        }
        localResult.putAll(lookup.apply(keys));
        consolidating = false;
        keys = null;
        result = null;
        consolidationDone.signalAll();
      } else {
        while (consolidating) {
          // always wait on a condition in a loop to guard for spurious wake-ups (see Condition javadoc)
          consolidationDone.await();
        }
      }
      return localResult.get(key);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RuntimeException(e);
    } finally {
      lock.unlock();
    }
  }
}

How it Works: Query Consolidation

In essence, we are replacing the lookup query for a single record (identified by a unique key) with a lookup query for a set of records (identified by a set of unique keys). We then suspend the execution for a few milliseconds to accumulate a batch of keys to query for. During this brief pause, other concurrent threads are also contributing their own keys to this shared batch. This collaborative collection strategy allows multiple requests to be bundled into a single database query. Once the designated time limit is reached, this consolidated query, containing keys from multiple threads, is executed. The paused threads are subsequently resumed, at which point they can access their respective results in the query output.

Extending to Composite Unique Keys

The strategy outlined thus far is based on the concept of performing lookups using simple keys. However, it’s important to understand that this pattern is not limited to such cases. It can be effectively extended to work with composite but unique keys as well, allowing us to perform efficient batch lookups based on a combination of multiple attributes, while ensuring a unique result for each query.

Below are the key considerations when adapting this utility to work with composite keys:

  1. Unique Constraint: Ensure that the combination of attributes used as the composite key is constrained to be unique within the dataset. This ensures that each set of attributes in the composite key maps to a single, unique record in the database.

  2. Equality and Hashing: The object representing the composite key must implement appropriate equals and hashCode methods. This is vital for grouping and comparing composite keys effectively. The utility depends on these implementations to group lookup requests accurately.

  3. Query Adaptation: The request to the database needs to be adapted to search based on multiple attributes. Rather than asking the database to filter records based on a single attribute, the request should specify a combination of attributes that, together, uniquely identify the desired records.

Despite these additional considerations, the benefits remain the same.

Why Not Just Use Caching?

Caching query results in the application is a more standard and straightforward approach to limiting the query request rate. It’s a strategy most developers are familiar with, and in a majority of use cases, it’s my go-to solution.

But let’s consider specific scenarios.

When Caching Shines:

If your application tends to request the same data repeatedly within a short timeframe, caching is a lifesaver. It stores the results of previous queries so that when a request for the same data comes in, it can be served directly from the cache, avoiding the need to query the underlying data source repeatedly. This not only reduces the query load but can also significantly speed up response times.

When Query Consolidation Makes Sense:

However, there are certain use cases where the query consolidation strategy has its merits—particularly when you anticipate querying for a diverse set of entities in quick succession, rather than repeatedly querying for the same entity.

Here’s why: in these scenarios, traditional caching may not provide much benefit, since each query is for a different entity and the cache hit rate may be low. On the contrary, query consolidation smartly batches these diverse queries together and sends a single, consolidated query to the underlying data source. This reduces the overall query rate without requiring you to hold a large and potentially ineffective cache.

So, in essence, the decision between caching and query consolidation boils down to the nature of your query patterns. Are you frequently re-querying for the same entities, where caching can effectively reduce load and improve performance? Or are you dealing with a burst of varied queries, where consolidation could be the more efficient and elegant solution?

How to Use: Sample Code

Use the implementation of ConsolidatedLookup as an exercise in understanding concurrency constructs in Java, or feel free to integrate it into your own project. Injecting this utility into existing code is minimally intrusive. Below is an example of how to do it, ready to play around with.

Enjoy!


package com.radovation.util;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * Example application to showcase how the query consolidation works.
 */
public class ConsolidatedLookupExample {
  /**
   * Dummy entity for the example.
   */
  class Entity {
    private final String key;

    Entity(String key) {
      this.key = key;
    }

    public String getKey() {
      return key;
    }
  }

  /**
   * A dummy representation of the data layer component (database).
   */
  class Database {
    private final AtomicInteger queryCounter = new AtomicInteger();
    private final AtomicInteger totalEntitiesQueriedForCounter = new AtomicInteger();
    private final AtomicInteger lookupByPrimaryKeyCounter = new AtomicInteger();
    private final ConsolidatedLookup<String, Entity> consolidatedLookup;
    private final boolean useConsolidation;

    Database(boolean useConsolidation, Duration consolidateDuring) {
      this.useConsolidation = useConsolidation;
      this.consolidatedLookup = new ConsolidatedLookup<>(consolidateDuring, this::findByPrimaryKeys);
    }

    public AtomicInteger getQueryCounter() {
      return queryCounter;
    }

    public AtomicInteger getLookupByPrimaryKeyCounter() {
      return lookupByPrimaryKeyCounter;
    }

    public AtomicInteger getTotalEntitiesQueriedForCounter() {
      return totalEntitiesQueriedForCounter;
    }

    public Entity findByPrimaryKey(String key) {
      lookupByPrimaryKeyCounter.incrementAndGet();
      if (useConsolidation) {
        // this will eventually lead to the invocation of findByPrimaryKeys below instead,
        // so we should not count this as a query execution
        return consolidatedLookup.execute(key);
      } else {
        queryCounter.incrementAndGet();
        totalEntitiesQueriedForCounter.incrementAndGet();
        return new Entity(key);
      }
    }

    public Map<String, Entity> findByPrimaryKeys(Set<String> keys) {
      queryCounter.incrementAndGet();
      totalEntitiesQueriedForCounter.addAndGet(keys.size());
      return keys.stream()
          .map(Entity::new)
          .collect(Collectors.toMap(Entity::getKey, Function.identity()));
    }
  }

  /**
   * An example representation of application logic hitting the database.
   */
  static class Worker implements Runnable {
    private final Random random;
    private final Database database;

    Worker(Random random, Database database) {
      this.random = random;
      this.database = database;
    }

    @Override
    public void run() {
      for (int i = 0; i < 100; i++) {
        Entity entity = database.findByPrimaryKey(String.valueOf(i));
        if (!entity.key.equals(String.valueOf(i))) {
          throw new IllegalStateException("There is a bug somewhere");
        }
        try {
          Thread.sleep(random.nextInt(20));
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new RuntimeException(e);
        }
      }
    }
  }

  private void run() {
    final Random random = new Random();
    final boolean USE_CONSOLIDATION = true; // true of false to use query consolidation or not
    Duration consolidateDuring = Duration.ofMillis(10); // max thread suspension time
    Database database = new Database(USE_CONSOLIDATION, consolidateDuring);
    List<Thread> threads = new ArrayList<>();
    for (int i = 0; i < 50; i++) {
      Thread thread = new Thread(new Worker(random, database));
      threads.add(thread);
      thread.start();
    }
    threads.forEach(thread -> {
      try {
        thread.join();
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(e);
      }
    });
    System.out.println("Number of lookups by primary key from application: " + database.getLookupByPrimaryKeyCounter());
    System.out.println("Number of actual database queries: " + database.getQueryCounter());
    System.out.println("Total number of entities being queried for in the database: " + database.getTotalEntitiesQueriedForCounter());
  }

  public static void main(String[] args) {
    new ConsolidatedLookupExample().run();
  }
}
Back to blog