Problem
Consider a key-value store where values are stored with a timestamp
to designate each version. Any cluster node that handles the client request
will be able to read the latest version using the current timestamp
at the request processing node.
In the following example, the value ‘Before Dawn’ is updated
to value “After Dawn” at time 2, as per Green’s clock.
Both Alice and Bob are trying to read the latest value for ‘title’.
While Alice’s request is processed by cluster node Amber, Bob’s request is
processed by cluster node Blue.
Amber has its clock lagging at 1; which means that
when Alice reads the latest value, it delivers the value ‘Before Dawn’.
Blue has its clock at 2; when Bob reads the latest value,
it returns the value as “After Dawn”
This violates a consistency known as external consistency.
If Alice and Bob now make a phone call, Alice will be confused; Bob will
tell that the latest value is “After Dawn”, while her cluster node is
showing “Before Dawn”.
The same is true if Green’s clock is fast and the writes happen in ‘future’
compared to Amber’s clock.
This is a problem if system’s timestamp is used as a version for storing values,
because wall clocks are not monotonic.
Clock values from two different servers cannot and should not be compared.
When Hybrid Clock is used as a version in
Versioned Value, it allows values to be ordered
on a single server as well as on different servers which
are causally related.
However, Hybrid Clocks (or any Lamport Clock based clocks)
can only give partial order.
This means that any values which are not causally related and stored by
two different clients across different nodes cannot be ordered.
This creates a problem when using a timestamp to read the
values across cluster nodes.
If the read request originates on cluster nodes with lagging clocks,
it probably won’t be able to read the most up to date versions of
given values.
Solution
Cluster nodes wait until the clock values
on every node in the cluster are guaranteed to be above the timestamp
assigned to the value while reading or writting.
If the difference betweeen clocks is very small,
write requests can wait without adding a great deal of overhead.
As an example, assume the maximum clock offset across cluster nodes is 10ms.
(This means that, at any given point in time,
the slowest clock in the cluster is lagging behind t – 10ms.)
To guarantee that every other cluster node has its clock set past t,
the cluster node that handle any write operation
will have to wait for t + 10ms before storing the value.
Consider a key value store with Versioned Value where
each update is added as a new value, with a timestamp used as a version.
In the Alice and Bob example mentioned above the write operation
storing the title@2, will wait until all the clocks in the cluster are at 2.
This makes sure that Alice will always see the latest value of the title
even if the clock at the cluster node of Alice is lagging behind.
Consider a slightly different scenario.
Philip is updating the title to ‘After Dawn’. Green’s clock has its
time at 2. But Green knows that there might be a server with a clock
lagging behind upto 1 unit. It will therefore have to
wait in the write operation for a duration of 1 unit.
While Philip is updating the title, Bob’s read request is handled
by server Blue. Blue’s clock is at 2, so it tries to read the title at
timestamp 2. At this point Green has not yet made the value available.
This means Bob gets the value at the highest timestamp lower than 2,
which is ‘Before Dawn’
Alice’s read request is handled
by server Amber. Amber’s clock is at 1 so it tries to read the title at
timestamp 1. Alice gets the value ‘Before Dawn’
Once Philip’s write request completes – after the wait of max_diff is over –
if Bob now sends a new read request, server Blue will try to read the latest
value according to its clock (which has advanced to 3); this will return
the value “After Dawn”
If Alice initializes a new read request, server Blue will try to read the
latest value as per its clock – which is now at 2. It will therefore,
also return the value “After Dawn”
The main problem when trying to implement this solution is that
getting the exact time difference across cluster nodes
is simply not possible with the date/time hardware and operating systems APIs
that are currently available.
Such is the nature of the challenge that Google has its own specialized date time API
called True Time.
Similarly Amazon has
AWS Time Sync Service and a library called ClockBound.
However, these APIs are very specific to Google and Amazon,
so can’t really be scaled beyond the confines of those organizations
Typically key value stores use Hybrid Clock to
implement Versioned Value.
While it is not possible to get the exact difference between clocks,
a sensible default value can be chosen based
on historical observations.
Observed values for maximum clock drift on servers across
datacenters is generally 200 to 500ms.
The key-value store waits for configured max-offset before storing the value.
class KVStore…
int maxOffset = 200; NavigableMap<HybridClockKey, String> kv = new ConcurrentSkipListMap<>(); public void put(String key, String value) { HybridTimestamp writeTimestamp = clock.now(); waitTillSlowestClockCatchesUp(writeTimestamp); kv.put(new HybridClockKey(key, writeTimestamp), value); } private void waitTillSlowestClockCatchesUp(HybridTimestamp writeTimestamp) { var waitUntilTimestamp = writeTimestamp.add(maxOffset, 0); sleepUntil(waitUntilTimestamp); } private void sleepUntil(HybridTimestamp waitUntil) { HybridTimestamp now = clock.now(); while (clock.now().before(waitUntil)) { var waitTime = (waitUntil.getWallClockTime() - now.getWallClockTime()) ; Uninterruptibles.sleepUninterruptibly(waitTime, TimeUnit.MILLISECONDS); now = clock.now(); } } public String get(String key, HybridTimestamp readTimestamp) { return kv.get(new HybridClockKey(key, readTimestamp)); }
Read Restart
200ms is too high an interval to wait for every write request.
This is why databases like CockroachDB or YugabyteDB
implement a check in the read requests instead.
While serving a read request, cluster nodes check if there is a version
available in the interval of readTimestamp and readTimestamp + maximum clock drift.
If the version is available – assuming the reader’s clock might be lagging –
it is then asked to restart the read request with that version.
class KVStore…
public void put(String key, String value) { HybridTimestamp writeTimestamp = clock.now(); kv.put(new HybridClockKey(key, writeTimestamp), value); } public String get(String key, HybridTimestamp readTimestamp) { checksIfVersionInUncertaintyInterval(key, readTimestamp); return kv.floorEntry(new HybridClockKey(key, readTimestamp)).getValue(); } private void checksIfVersionInUncertaintyInterval(String key, HybridTimestamp readTimestamp) { HybridTimestamp uncertaintyLimit = readTimestamp.add(maxOffset, 0); HybridClockKey versionedKey = kv.floorKey(new HybridClockKey(key, uncertaintyLimit)); if (versionedKey == null) { return; } HybridTimestamp maxVersionBelowUncertainty = versionedKey.getVersion(); if (maxVersionBelowUncertainty.after(readTimestamp)) { throw new ReadRestartException(readTimestamp, maxOffset, maxVersionBelowUncertainty); } ; }
class Client…
String read(String key) { int attemptNo = 1; int maxAttempts = 5; while(attemptNo < maxAttempts) { try { HybridTimestamp now = clock.now(); return kvStore.get(key, now); } catch (ReadRestartException e) { logger.info(" Got read restart error " + e + "Attempt No. " + attemptNo); Uninterruptibles.sleepUninterruptibly(e.getMaxOffset(), TimeUnit.MILLISECONDS); attemptNo++; } } throw new ReadTimeoutException("Unable to read after " + attemptNo + " attempts."); }
In the Alice and Bob example above, if there is a version for “title”
available at timestamp 2, and Alice sends a read request with read timestamp 1,
a ReadRestartException will be thrown asking Alice to restart the read request
at readTimestamp 2.
Read restarts only happen if there is a version written in the
uncertainty interval. Write request do not need to wait.
It’s important to remember that the configured value for maximum clock drift
is an assumption, it is not guaranteed. In some cases,
a bad server can have a clock drift more than the assumed value. In such cases,
the problem will persist.
Using Clock Bound APIs
Cloud providers like Google and Amazon, implement clock machinery with
atomic clocks and GPS to make sure that the clock drift across cluster nodes
is kept below a few milliseconds. As we’ve just discussed, Google has
True Time. AWS has
AWS Time Sync Service and ClockBound.
There are two key requirements for cluster nodes to make sure these waits
are implemented correctly.
- The clock drift across cluster nodes is kept to a minimum.
Google’s True-Time keeps it below 1ms in most cases (7ms in the worst cases) - The possible clock drift is always
available in the date-time API, this ensures programmers don’t need
to guess the value.
The clock machinery on cluster nodes computes error bounds for
date-time values. Considering there is a possible error in timestamps
returned by the local system clock, the API makes the error explicit.
It will give the lower as well as the upper bound on clock values.
The real time value is guaranteed to be within this interval.
public class ClockBound { public final long earliest; public final long latest; public ClockBound(long earliest, long latest) { this.earliest = earliest; this.latest = latest; } public boolean before(long timestamp) { return timestamp < earliest; } public boolean after(long timestamp) { return timestamp > latest; }
As explained in this AWS blog the error is
calculated at each cluster node as ClockErrorBound.
The real time values will always be somewhere between
local clock time and +- ClockErrorBound.
The error bounds are returned whenever date-time
values are asked for.
public ClockBound now() { return now; }
There are two properties guaranteed by the clock-bound API
- Clock bounds should overlap across cluster nodes
- For two time values t1 and t2, if t1 is less than t2,
then clock_bound(t1).earliest is less than clock_bound(t2).latest
across all cluster nodes
Imagine we have three cluster nodes: Green, Blue and Orange.
Each node might have a different error bound.
Let’s say the error on Green is 1, Blue is 2 and Orange is 3. At time=4,
the clock bound across cluster nodes will look like this:
In this scenario, two rules need to be followed to implement the commit-wait.
- For any write operation, the clock bound’s latest value
should be picked as the timestamp.
This will ensure that it is always higher than any timestamp assigned
to previous write operations (considering the second rule below). -
The system must wait until the write timestamp is less than
the clock bound’s earliest value, before storing the value.This is Because the earliest value is guaranteed to be lower than
clock bound’s latest values across all cluster nodes.
This write operation will be accessible
to anyone reading with the clock-bound’s latest value in future. Also,
this value is guaranteed to be ordered before any other write operation
happen in future.
class KVStore…
public void put(String key, String value) { ClockBound now = boundedClock.now(); long writeTimestamp = now.latest; addPending(writeTimestamp); waitUntilTimeInPast(writeTimestamp); kv.put(new VersionedKey(key, writeTimestamp), value); removePending(writeTimestamp); } private void waitUntilTimeInPast(long writeTimestamp) { ClockBound now = boundedClock.now(); while(now.earliest < writeTimestamp) { Uninterruptibles.sleepUninterruptibly(now.earliest - writeTimestamp, TimeUnit.MILLISECONDS); now = boundedClock.now(); } } private void removePending(long writeTimestamp) { pendingWriteTimestamps.remove(writeTimestamp); try { lock.lock(); cond.signalAll(); } finally { lock.unlock(); } } private void addPending(long writeTimestamp) { pendingWriteTimestamps.add(writeTimestamp); }
If we return to the Alice and Bob example above, when the value for
“title”- “After Dawn” – is written by Philip on server Green,
the put operation on Green waits until the chosen write timestamp is
below the earliest value of the clock bound.
This guarantees that every other cluster node
is guaranteed to have a higher timestamp for the latest value of the
clock bound.
To illustrate, considering this scenario. Green has error bound of
+-1
. So, with a put operation which starts at time 4,
when it stores the value, Green will pick up the latest value of clock
bound which is 5. It then waits until the earliest value of the clock
bound is more than 5. Essentially, Green waits for the uncertainty
interval before actually storing the value in the key-value store.
When the value is made available in the key value store,
that the clock bound’s latest value is guaranteed to be higher than 5
on each and every cluster node.
This means that Bob’s request handled by Blue as well as Alice’s request
handled by Amber, are guaranteed to get the latest value of the title.
We will get the same result if Green has ‘wider’ time bounds.
The greater the error bound, the longer the wait. If Green’s error bound
is maximum, it will continue to wait before making the values available in
the key-value store. Neither Amber nor Blue will be able to get
the value until their latest time value is past 7. When Alice gets the
most up-to-date value of title at latest time 7,
every other cluster node will be guaranteed to get it at it’s latest time value.
Read-Wait
When reading the value, the client will always pick the maximum value
from the clock bound from its cluster node.
The cluster node that is receiving the request needs to make sure that once
a response is returned at the specific request timestamp, there are
no values written at that timestamp or the lower timestamp.
If the timestamp in the request is higher than the
timestamp at the server, the cluster node will wait until
the clock catches up,
before returning the response.
It will then check if there are any pending write requests at the lower timestamp,
which are not yet stored. If there are, then the
read requests will pause until the requests are complete.
The server will then read the values at the request timestamp and return the value.
This ensures that once a response is returned at a particular timestamp,
no values will ever be written at the lower timestamp.
This guarantee is called Snapshot Isolation
class KVStore…
final Lock lock = new ReentrantLock(); Queue<Long> pendingWriteTimestamps = new ArrayDeque<>(); final Condition cond = lock.newCondition(); public Optional<String> read(long readTimestamp) { waitUntilTimeInPast(readTimestamp); waitForPendingWrites(readTimestamp); Optional<VersionedKey> max = kv.keySet().stream().max(Comparator.naturalOrder()); if(max.isPresent()) { return Optional.of(kv.get(max.get())); } return Optional.empty(); } private void waitForPendingWrites(long readTimestamp) { try { lock.lock(); while (pendingWriteTimestamps.stream().anyMatch(ts -> ts <= readTimestamp)) { cond.awaitUninterruptibly(); } } finally { lock.unlock(); } }
Consider this final scenario: Alice’s read request is handled by
server Amber with error bound of 3. It picks up the latest time as 7 to
read the title. Meanwhile, Philip’s write request is handled by Green
(with an error bound of +-1), it picks up 5 to store the value.
Alice’s read request waits until the earliest time at Green is past 7
and the pending write request. It then returns the latest value with
a timestamp below 7.