Emergent Leader

Date:


Solution

One of the common techniques used in peer-to-peer systems is to
order cluster nodes according to their ‘age’. The oldest member of
the cluster plays the role of the coordinator for the cluster.
The coordinator is responsible for deciding on membership changes
as well as making decisions such as where
Fixed Partitions should be placed
across cluster nodes.

To form the cluster,
one of the cluster nodes acts as a seed node or an introducer node.
All the cluster nodes join the cluster by contacting the seed node.

Every cluster node is configured with the seed node address.
When a cluster node is started, it tries to contact the seed node
to join the cluster.

class ClusterNode…

  MembershipService membershipService;
  public void start(Config config) {
      this.membershipService =  new MembershipService(config.getListenAddress());
      membershipService.join(config.getSeedAddress());
  }

The seed node could be any of the cluster nodes. It’s configured with its own
address as the seed node address and is the first node that is started.
It immediately begins accepting requests. The age of the seed node is 1.

class MembershipService…

  Membership membership;
  public void join(InetAddressAndPort seedAddress) {
      int maxJoinAttempts = 5;
      for(int i = 0; i < maxJoinAttempts; i++){
          try {
              joinAttempt(seedAddress);
              return;
          } catch (Exception e) {
              logger.info("Join attempt " + i + "from " + selfAddress + " to " + seedAddress + " failed. Retrying");
          }
      }
      throw new JoinFailedException("Unable to join the cluster after " + maxJoinAttempts + " attempts");
  }

  private void joinAttempt(InetAddressAndPort seedAddress) throws ExecutionException, TimeoutException {
      if (selfAddress.equals(seedAddress)) {
          int membershipVersion = 1;
          int age = 1;
          updateMembership(new Membership(membershipVersion, Arrays.asList(new Member(selfAddress, age, MemberStatus.JOINED))));
          start();
          return;
      }
      long id = this.messageId++;
      CompletableFuture<JoinResponse> future = new CompletableFuture<>();
      JoinRequest message = new JoinRequest(id, selfAddress);
      pendingRequests.put(id, future);
      network.send(seedAddress, message);


      JoinResponse joinResponse = Uninterruptibles.getUninterruptibly(future, 5, TimeUnit.SECONDS);
      updateMembership(joinResponse.getMembership());
      start();
  }

  private void start() {
      heartBeatScheduler.start();
      failureDetector.start();
      startSplitBrainChecker();
      logger.info(selfAddress + " joined the cluster. Membership=" + membership);
  }


  private void updateMembership(Membership membership) {
      this.membership  = membership;
  }

There can be more than one seed node. But seed nodes start accepting
requests only after they themselves join the cluster. Also the cluster
will be functional if the seed node is down, but no new nodes will be able
to add to the cluster.

Non seed nodes then send the join request to the seed node.
The seed node handles the join request by creating a new member record
and assigning its age.
It then updates its own membership list and sends messages to all the
existing members with the new membership list.
It then waits to make sure that the response is
returned from every node, but will eventually return the join response
even if the response is delayed.

class MembershipService…

  public void handleJoinRequest(JoinRequest joinRequest) {
      handlePossibleRejoin(joinRequest);
      handleNewJoin(joinRequest);
  }

  private void handleNewJoin(JoinRequest joinRequest) {
      List<Member> existingMembers = membership.getLiveMembers();
      updateMembership(membership.addNewMember(joinRequest.from));
      ResultsCollector resultsCollector = broadcastMembershipUpdate(existingMembers);
      JoinResponse joinResponse = new JoinResponse(joinRequest.messageId, selfAddress, membership);
      resultsCollector.whenComplete((response, exception) -> {
          logger.info("Sending join response from " + selfAddress + " to " + joinRequest.from);
          network.send(joinRequest.from, joinResponse);
      });
  }

class Membership…

  public Membership addNewMember(InetAddressAndPort address) {
      var newMembership = new ArrayList<>(liveMembers);
      int age = yongestMemberAge() + 1;
      newMembership.add(new Member(address, age, MemberStatus.JOINED));
      return new Membership(version + 1, newMembership, failedMembers);
  }

  private int yongestMemberAge() {
      return liveMembers.stream().map(m -> m.age).max(Integer::compare).orElse(0);
  }

If a node which was already part of the cluster is trying to rejoin
after a crash, the failure detector state related to that member is
cleared.

class MembershipService…

  private void handlePossibleRejoin(JoinRequest joinRequest) {
      if (membership.isFailed(joinRequest.from)) {
          //member rejoining
          logger.info(joinRequest.from  + " rejoining the cluster. Removing it from failed list");
          membership.removeFromFailedList(joinRequest.from);
      }
  }

It’s then added as a new member. Each member needs to be identified
uniquely. It can be assigned a unique identifier at startup.
This then provides a point of reference that makes it possible to
check if it is an existing cluster node that is rejoining.

The membership class maintains the list of live members as well as
failed members. The members are moved from live to failed list
if they stop sending HeartBeat as explained in the
failure detection section.

class Membership…

  public class Membership {
      List<Member> liveMembers = new ArrayList<>();
      List<Member> failedMembers = new ArrayList<>();
  
      public boolean isFailed(InetAddressAndPort address) {
          return failedMembers.stream().anyMatch(m -> m.address.equals(address));
      }

Sending membership updates to all the existing members

Membership updates are sent to all the other nodes concurrently.
The coordinator also needs to track whether all the members
successfully received the updates.

A common technique is to send a one way request to all nodes
and expect an acknowledgement message.
The cluster nodes send acknowledgement messages to the coordinator
to confirm receipt of the membership update.
A ResultCollector object can track receipt of all the
messages asynchronously, and is notified every time
an acknowledgement is received for a membership update.
It completes its future once the expected
acknowledgement messages are received.

class MembershipService…

  private ResultsCollector broadcastMembershipUpdate(List<Member> existingMembers) {
      ResultsCollector resultsCollector = sendMembershipUpdateTo(existingMembers);
      resultsCollector.orTimeout(2, TimeUnit.SECONDS);
      return resultsCollector;
  }

  Map<Long, CompletableFuture> pendingRequests = new HashMap();
  private ResultsCollector sendMembershipUpdateTo(List<Member> existingMembers) {
      var otherMembers = otherMembers(existingMembers);
      ResultsCollector collector = new ResultsCollector(otherMembers.size());
      if (otherMembers.size() == 0) {
          collector.complete();
          return collector;
      }
      for (Member m : otherMembers) {
          long id = this.messageId++;
          CompletableFuture<Message> future = new CompletableFuture();
          future.whenComplete((result, exception)->{
              if (exception == null){
                  collector.ackReceived();
              }
          });
          pendingRequests.put(id, future);
          network.send(m.address, new UpdateMembershipRequest(id, selfAddress, membership));
      }
      return collector;
  }

class MembershipService…

  private void handleResponse(Message message) {
      completePendingRequests(message);
  }

  private void completePendingRequests(Message message) {
      CompletableFuture requestFuture = pendingRequests.get(message.messageId);
      if (requestFuture != null) {
          requestFuture.complete(message);
      }
  }

class ResultsCollector…

  class ResultsCollector {
      int totalAcks;
      int receivedAcks;
      CompletableFuture future = new CompletableFuture();
  
      public ResultsCollector(int totalAcks) {
          this.totalAcks = totalAcks;
      }
  
      public void ackReceived() {
          receivedAcks++;
          if (receivedAcks == totalAcks) {
              future.complete(true);
          }
      }
  
      public void orTimeout(int time, TimeUnit unit) {
          future.orTimeout(time, unit);
      }
  
      public void whenComplete(BiConsumer<? super Object, ? super Throwable> func) {
          future.whenComplete(func);
      }
  
      public void complete() {
          future.complete("true");
      }
  }

To see how ResultCollector works, consider a cluster
with a set of nodes: let’s call them athens, byzantium and cyrene.
athens is acting as a coordinator. When a new node – delphi –
sends a join request to athens, athens updates the membership and sends the updateMembership request
to byantium and cyrene. It also creates a ResultCollector object to track
acknowledgements. It records each acknowledgement received
with ResultCollector. When it receives acknowledgements from both
byzantium and cyrene, it then responds to delphi.

Frameworks like Akka
use Gossip Dissemination and Gossip Convergence
to track whether updates have reached all cluster nodes.

An example scenario

Consider another three nodes.
Again, we’ll call them athens, byzantium and cyrene.
athens acts as a seed node; the other two nodes are configured as such.

When athens starts, it detects that it is itself the seed node.
It immediately initializes the membership list and starts
accepting requests.

When byzantium starts, it sends a join request to athens.
Note that even if byzantium starts before athens, it will keep
trying to send join requests until it can connect to athens.
Athens finally adds byzantium to the membership list and sends the
updated membership list to byzantium. Once byzantium receives
the response from athens, it can start accepting requests.

With all-to-all heartbeating, byzantium starts sending heartbeats
to athens, and athens sends heartbeat to byzantium.

cyrene starts next. It sends join requests to athens.
Athens updates the membership list and sends updated membership
list to byantium. It then sends the join response with
the membership list to cyrene.

With all to all heartbeating, cyrene, athens and byzantium
all send heartbeats to each other.

Handling missing membership updates

It’s possible that some cluster nodes miss membership updates.
There are two solutions to handle this problem.

If all members are sending heartbeat to all other members,
the membership version number can be sent as part of the heartbeat.
The cluster node that handles the heartbeat can
then ask for the latest membership.
Frameworks like Akka which use Gossip Dissemination
track convergence of the gossiped state.

class MembershipService…

  private void handleHeartbeatMessage(HeartbeatMessage message) {
      failureDetector.heartBeatReceived(message.from);
      if (isCoordinator() && message.getMembershipVersion() < this.membership.getVersion()) {
          membership.getMember(message.from).ifPresent(member -> {
              logger.info("Membership version in " + selfAddress + "=" + this.membership.version + " and in " + message.from + "=" + message.getMembershipVersion());

              logger.info("Sending membership update from " + selfAddress + " to " + message.from);
              sendMembershipUpdateTo(Arrays.asList(member));
          });
      }
  }

In the above example, if byzantium misses the membership update
from athens, it will be detected when byzantine sends the heartbeat
to athens. athens can then send the latest membership to byzantine.

Alternatively each cluster node can check the lastest membership list periodically,
– say every one second – with other cluster nodes.
If any of the nodes figure out that their member list is outdated,
it can then ask for the latest membership list so it can update it.
To be able to compare membership lists, generally
a version number is maintained and incremented everytime
there is a change.

Failure Detection

Each cluster also runs a failure detector to check if
heartbeats are missing from any of the cluster nodes.
In a simple case, all cluster nodes send heartbeats to all the other nodes.
But only the coordinator marks the nodes as failed and
communicates the updated membership list to all the other nodes.
This makes sure that not all nodes unilaterally deciding if
some other nodes have failed. Hazelcast is an example
of this implementation.

class MembershipService…

  private boolean isCoordinator() {
      Member coordinator = membership.getCoordinator();
      return coordinator.address.equals(selfAddress);
  }

  TimeoutBasedFailureDetector<InetAddressAndPort> failureDetector
          = new TimeoutBasedFailureDetector<InetAddressAndPort>(Duration.ofSeconds(2));

  private void checkFailedMembers(List<Member> members) {
      if (isCoordinator()) {
          removeFailedMembers();

      } else {
          //if failed member consists of coordinator, then check if this node is the next coordinator.
          claimLeadershipIfNeeded(members);
      }
  }

  void removeFailedMembers() {
      List<Member> failedMembers = checkAndGetFailedMembers(membership.getLiveMembers());
      if (failedMembers.isEmpty()) {
          return;
      }
      updateMembership(membership.failed(failedMembers));
      sendMembershipUpdateTo(membership.getLiveMembers());
  }

Avoiding all-to-all heartbeating

All-to-all heartbeating is not feasible in large clusters.
Typically each node will receive heartbeats from
only a few other nodes. If a failure is detected,
it’s broadcasted to all the other nodes
including the coordinator.

For example in Akka a node ring is formed
by sorting network addresses and each cluster node sends
heartbeats to only a few cluster nodes.
Ignite arranges all the nodes in the cluster
in a ring and each node sends heartbeat only to the node next
to it.
Hazelcast uses all-to-all heartbeat.

Any membership changes, because of nodes being added or
node failures need to be broadcast to all the other
cluster nodes. A node can connect to every other node to
send the required information.
Gossip Dissemination can be used
to broadcast this information.

Split Brain Situation

Even though a single coordinator node decides when to
mark another nodes as down, there’s no explicit leader-election
happening to select which node acts as a coordinator.
Every cluster node expects a heartbeat from the existing
coordinator node; if it doesn’t get a heartbeat in time,
it can then claim to be the coordinator and remove the existing
coordinator from the memberlist.

class MembershipService…

  private void claimLeadershipIfNeeded(List<Member> members) {
      List<Member> failedMembers = checkAndGetFailedMembers(members);
      if (!failedMembers.isEmpty() && isOlderThanAll(failedMembers)) {
          var newMembership = membership.failed(failedMembers);
          updateMembership(newMembership);
          sendMembershipUpdateTo(newMembership.getLiveMembers());
      }
  }

  private boolean isOlderThanAll(List<Member> failedMembers) {
      return failedMembers.stream().allMatch(m -> m.age < thisMember().age);
  }

  private List<Member> checkAndGetFailedMembers(List<Member> members) {
      List<Member> failedMembers = members
              .stream()
              .filter(member -> !member.address.equals(selfAddress) && failureDetector.isMonitoring(member.address) && !failureDetector.isAlive(member.address))
              .map(member -> new Member(member.address, member.age, member.status)).collect(Collectors.toList());

      failedMembers.forEach(member->{
          failureDetector.remove(member.address);
          logger.info(selfAddress + " marking " + member.address + " as DOWN");
      });
      return failedMembers;
  }

This can create a situation where there are two or more subgroups
formed in an existing cluster, each considering the others
to have failed. This is called split-brain problem.

Consider a five node cluster, athens, byzantium, cyrene, delphi and euphesus.
If athens receives heartbeats from dephi and euphesus, but
stops getting heartbeats from byzantium, cyrene, it marks
both byzantium and cyrene as failed.

byzantium and cyrene could send heartbeats to each other,
but stop receiving heartbeats from cyrene, dephi and euphesus.
byzantium being the second oldest member of the cluster,
then becomes the coordinator.
So two separate clusters are formed one with athens as
the coordinator and the other with byzantium as the coordinator.

Handling split brain

One common way to handle split brain issue is to
check whether there are enough members to handle any
client request, and reject the request if there
are not enough live members. For example,
Hazelcast allows you to configure
minimum cluster size to execute any client request.

public void handleClientRequest(Request request) {
    if (!hasMinimumRequiredSize()) {
        throw new NotEnoughMembersException("Requires minium 3 members to serve the request");
    }
}

private boolean hasMinimumRequiredSize() {
    return membership.getLiveMembers().size() > 3;
}

The part which has the majority of the nodes,
continues to operate, but as explained in the Hazelcast
documentation, there will always be a
time window
in which this protection has yet to come into effect.

The problem can be avoided if cluster nodes are
not marked as down unless it’s guaranteed that they
won’t cause split brain.
For example, Akka recommends
that you don’t have nodes
marked as down
through the failure detector; you can instead use its
split brain resolver.
component.

Recovering from split brain

The coordinator runs a periodic job to check if it
can connect to the failed nodes.
If a connection can be established, it sends a special
message indicating that it wants to trigger a
split brain merge.

If the receiving node is the coordinator of the subcluster,
it will check to see if the cluster that is initiating
the request is part of the minority group. If it is,
it will send a merge request. The coordinator of the minority group,
which receives the merge request, will then execute
the merge request on all the nodes in the minority sub group.

class MembershipService…

  splitbrainCheckTask = taskScheduler.scheduleWithFixedDelay(() -> {
                  searchOtherClusterGroups();
          },
          1, 1, TimeUnit.SECONDS);

class MembershipService…

  private void searchOtherClusterGroups() {
      if (membership.getFailedMembers().isEmpty()) {
          return;
      }
      List<Member> allMembers = new ArrayList<>();
      allMembers.addAll(membership.getLiveMembers());
      allMembers.addAll(membership.getFailedMembers());
          if (isCoordinator()) {
          for (Member member : membership.getFailedMembers()) {
              logger.info("Sending SplitBrainJoinRequest to " + member.address);
              network.send(member.address, new SplitBrainJoinRequest(messageId++, this.selfAddress, membership.version, membership.getLiveMembers().size()));
          }
      }
 }

If the receiving node is the coordinator of the majority subgroup, it asks the
sending coordinator node to merge with itself.

class MembershipService…

  private void handleSplitBrainJoinMessage(SplitBrainJoinRequest splitBrainJoinRequest) {
      logger.info(selfAddress + " Handling SplitBrainJoinRequest from " + splitBrainJoinRequest.from);
      if (!membership.isFailed(splitBrainJoinRequest.from)) {
          return;
      }

      if (!isCoordinator()) {
          return;
      }

      if(splitBrainJoinRequest.getMemberCount() < membership.getLiveMembers().size()) {
          //requesting node should join this cluster.
          logger.info(selfAddress + " Requesting " + splitBrainJoinRequest.from + " to rejoin the cluster");
          network.send(splitBrainJoinRequest.from, new SplitBrainMergeMessage(splitBrainJoinRequest.messageId, selfAddress));

      } else {
          //we need to join the other cluster
          mergeWithOtherCluster(splitBrainJoinRequest.from);
      }

  }

  private void mergeWithOtherCluster(InetAddressAndPort otherClusterCoordinator) {
      askAllLiveMembersToMergeWith(otherClusterCoordinator);
      handleMerge(new MergeMessage(messageId++, selfAddress, otherClusterCoordinator)); //initiate merge on this node.
  }

  private void askAllLiveMembersToMergeWith(InetAddressAndPort mergeToAddress) {
      List<Member> liveMembers = membership.getLiveMembers();
      for (Member m : liveMembers) {
          network.send(m.address, new MergeMessage(messageId++, selfAddress, mergeToAddress));
      }
  }

In the example discussed in the above section, when athens
can communicate with byzantium, it will ask byzantium to merge
with itself.

The coordinator of the smaller subgroup,
then asks all the cluster nodes
inside its group to trigger a merge.
The merge operation shuts down and rejoins the cluster
nodes to the coordinator of the larger group.

class MembershipService…

  private void handleMerge(MergeMessage mergeMessage) {
      logger.info(selfAddress + " Merging with " + mergeMessage.getMergeToAddress());
      shutdown();
      //join the cluster again through the other cluster's coordinator
      taskScheduler.execute(()-> {
          join(mergeMessage.getMergeToAddress());
      });
  }

In the example above, byzantium and cyrene shutdown and
rejoin athens to form a full cluster again.

Comparison with Leader and Followers

It’s useful to compare this pattern with that of
Leader and Followers. The leader-follower
setup, as used by patterns like Consistent Core,
does not function unless the leader is selected
by running an election. This guarantees that the
Quorum of cluster nodes have
an agreement about who the leader is. In the worst case
scenario, if an agreement isn’t reached, the system will
be unavailable to process any requests.
In other words, it prefers consistency over availability.

The emergent leader, on the other hand will always
have some cluster node acting as a leader for processing
client requests. In this case, availability is preferred
over consistency.



Source link

Share post:

Subscribe

spot_imgspot_img

Popular

More like this
Related

Twitter will manually authenticate verified accounts starting December 2

Twitter relaunched its verification program last May after...

The Top 10 Games On Switch

The Nintendo Switch has an incredibly strong library...

Space Elevators Are Less Sci-Fi Than You Think

Space elevators are often dismissed as a science...