Often it can be difficult to know what the suitable split points are
upfront.In these instances, we can implement auto-splitting.
Here, the coordinator will create only one partition with a
key range which includes all the key space.
Each partition can be configured with a fixed maximum size.
A background task then runs on each cluster node
to track the size of the partitions.
When a partition reaches its maximum size, it’s split into two partitions,
each one being approximately half the size of the original.
Calculating partition size and Finding the middle key
Getting the size of the partition and finding the middle key is dependent
on what storage engines are being used. A simple way of dong this
can be to just scan through the entire partition to calculate its size.
TiKV initially used this approach.
To be able to split the tablet, the key which is situated
at the mid point needs to be found as well. To avoid scanning through
the partition twice, a simple implementation can get the middle
key if the size is more than the configured maximum.
class Partition…
public String getMiddleKeyIfSizeCrossed(int partitionMaxSize) { int kvSize = 0; for (String key : kv.keySet()) { kvSize += key.length() + kv.get(key).length(); if (kvSize >= partitionMaxSize / 2) { return key; } } return ""; }
The coordinator, handling the split trigger message update the
key range metadata for the original partition,
and creates a new partition metadata for the split range.
class ClusterCoordinator…
private void handleSplitTriggerMessage(SplitTriggerMessage message) { logger.info("Handling SplitTriggerMessage " + message.getPartitionId() + " split key " + message.getSplitKey()); splitPartition(message.getPartitionId(), message.getSplitKey()); } public CompletableFuture splitPartition(int partitionId, String splitKey) { logger.info("Splitting partition " + partitionId + " at key " + splitKey); PartitionInfo parentPartition = partitionTable.getPartition(partitionId); Range originalRange = parentPartition.getRange(); List<Range> splits = originalRange.split(splitKey); Range shrunkOriginalRange = splits.get(0); Range newRange = splits.get(1); return replicatedLog.propose(new SplitPartitionCommand(partitionId, splitKey, shrunkOriginalRange, newRange)); }
After the partitions metadata is stored successfully, it
sends a message to the cluster node that is hosting the parent partition
to split the parent partition’s data.
class ClusterCoordinator…
private void applySplitPartitionCommand(SplitPartitionCommand command) { PartitionInfo originalPartition = partitionTable.getPartition(command.getOriginalPartitionId()); Range originalRange = originalPartition.getRange(); if (!originalRange.coveredBy(command.getUpdatedRange().getStartKey(), command.getNewRange().getEndKey())) { logger.error("The original range start and end keys "+ originalRange + " do not match split ranges"); return; } originalPartition.setRange(command.getUpdatedRange()); PartitionInfo newPartitionInfo = new PartitionInfo(newPartitionId(), originalPartition.getAddress(), PartitionStatus.ASSIGNED, command.getNewRange()); partitionTable.addPartition(newPartitionInfo.getPartitionId(), newPartitionInfo); //send requests to cluster nodes if this is the leader node. if (isLeader()) { var message = new SplitPartitionMessage(command.getOriginalPartitionId(), command.getSplitKey(), newPartitionInfo, requestNumber++, listenAddress); scheduler.execute(new RetryableTask(originalPartition.getAddress(), network, this, originalPartition.getPartitionId(), message)); } }
class Range…
public boolean coveredBy(String startKey, String endKey) { return getStartKey().equals(startKey) && getEndKey().equals(endKey); }
The cluster node splits the original partition and creates a new partition.
The data from the original partition is then copied to the new partition.
It then responds to the coordinator telling that the split is complete.
class KVStore…
private void handleSplitPartitionMessage(SplitPartitionMessage splitPartitionMessage) {
splitPartition(splitPartitionMessage.getPartitionId(),
splitPartitionMessage.getSplitKey(),
splitPartitionMessage.getSplitPartitionId());
network.send(coordLeader,
new SplitPartitionResponseMessage(splitPartitionMessage.getPartitionId(),
splitPartitionMessage.getPartitionId(),
splitPartitionMessage.getSplitPartitionId(),
splitPartitionMessage.messageId, listenAddress));
}
private void splitPartition(int parentPartitionId, String splitKey, int newPartitionId) {
Partition partition = allPartitions.get(parentPartitionId);
Partition splitPartition = partition.splitAt(splitKey, newPartitionId);
logger.info("Adding new partition " + splitPartition.getId() + " for range " + splitPartition.getRange());
allPartitions.put(splitPartition.getId(), splitPartition);
}
class Partition…
public Partition splitAt(String splitKey, int newPartitionId) { List<Range> splits = this.range.split(splitKey); Range shrunkOriginalRange = splits.get(0); Range splitRange = splits.get(1); SortedMap<String, String> partition1Kv = (range.getStartKey().equals(Range.MIN_KEY)) ? kv.headMap(splitKey) : kv.subMap(range.getStartKey(), splitKey); SortedMap<String, String> partition2Kv = (range.getEndKey().equals(Range.MAX_KEY)) ? kv.tailMap(splitKey) : kv.subMap(splitKey, range.getEndKey()); this.kv = partition1Kv; this.range = shrunkOriginalRange; return new Partition(newPartitionId, partition2Kv, splitRange); }
class Range…
public List<Range> split(String splitKey) { return Arrays.asList(new Range(startKey, splitKey), new Range(splitKey, endKey)); }
Once the coordinator receives the message, it marks the partitions as online
class ClusterCoordinator…
private void handleSplitPartitionResponse(SplitPartitionResponseMessage message) { replicatedLog.propose(new UpdatePartitionStatusCommand(message.getPartitionId(), PartitionStatus.ONLINE)); }
One of the possible issues that can arise when trying to modify
the existing partition is that
the client cannot cache and always needs to get the latest partition
metadata before it can send any requests to the cluster node.
Data stores use Generation Clock for partitions;
this is updated every single time a partition is split.
Any client requests with an older generation number will be rejected.
Clients can then reload the
partition table from the coordinator and retry the request.
This ensures that clients that possess older metadata don’t get
the wrong results.
YugabyteDB
chooses to create two separate new partitions and marks the original
as explained in their
Automatic table splitting design..
Example Scenario
Consider an example where the cluster node athens holds partition P1
covering the entire key range. The maximum partition size is configured
to be 10 bytes. The SplitCheck detects the size has grown beyond 10,
and finds the approximate middle key to be bob. It then sends a
message to the cluster coordinator,
asking it to create metadata for the split partition.
Once this metadata has been successfully created by the coordinator,
the coordinator then asks athens to split partition P1
and passes it the partitionId
from the metadata. Athens can then shrink P1 and create a new partition,
copying the data from P1 to the new partition. After the partition
has been successfully created
it sends confirmation to the coordinator. The coordinator then marks the new
partition as online.
Load based splitting
With auto-splitting, we only ever begin with one range. This means
all client requests go to a single server even if there are other nodes
in the cluster. All requests will continue to go to the single server
that is hosting the single range until the range is split and moved to other
servers. This is why sometimes splitting on parameters such as
total nunmber of requests, or CPU, and memory usage are also used to
trigger a partition split.
Modern databases like CockroachDB and YugabyteDB
support load based plitting. More details can be found in their
documentation at [cockroach-load-splitting]
and [yb-load-splitting]