Fixed Partitions

Date:


Partitions need to be mapped to cluster nodes.
The mapping also needs to be stored and made accessible to the clients.
It’s common to use a dedicated Consistent Core; this
handles both. The dedicated Consistent Core acts as a coordinator which
keeps track of all nodes in the cluster and maps partitions to nodes.
It also stores the mapping in a fault tolerant way by using a
Replicated Log. The master cluster in YugabyteDB
or controller implementation in Kafka are both
good examples of this.

Peer-to-peer systems like Akka or Hazelcast
also need a particular cluster node to act as an coordinator.
They use Emergent Leader as the coordinator.

Systems like [kubernetes] use a generic
Consistent Core like [etcd].
They need to elect one of the cluster nodes to play the role of
coordinator as discussed here.

Tracking Cluster Membership

Each cluster node will register itself with the consistent-core.
It also periodically sends a HeartBeat to allow
the Consistent Core detect node failures.

class KVStore…

  public void start() {
      socketListener.start();
      requestHandler.start();
      network.sendAndReceive(coordLeader, new RegisterClusterNodeRequest(generateMessageId(), listenAddress));
      scheduler.scheduleAtFixedRate(()->{
          network.send(coordLeader, new HeartbeatMessage(generateMessageId(), listenAddress));
      }, 200, 200, TimeUnit.MILLISECONDS);

  }

The coordinator handles the registration and then stores member information.

class ClusterCoordinator…

  ReplicatedLog replicatedLog;
  Membership membership = new Membership();
  TimeoutBasedFailureDetector failureDetector = new TimeoutBasedFailureDetector(Duration.ofMillis(TIMEOUT_MILLIS));

  private void handleRegisterClusterNodeRequest(Message message) {
      logger.info("Registering node " + message.from);
      CompletableFuture completableFuture = registerClusterNode(message.from);
      completableFuture.whenComplete((response, error) -> {
          logger.info("Sending register response to node " + message.from);
          network.send(message.from, new RegisterClusterNodeResponse(message.messageId, listenAddress));
      });
  }

  public CompletableFuture registerClusterNode(InetAddressAndPort address) {
      return replicatedLog.propose(new RegisterClusterNodeCommand(address));
  }

When a registration is committed in the Replicated Log,
the membership will be updated.

class ClusterCoordinator…

  private void applyRegisterClusterNodeEntry(RegisterClusterNodeCommand command) {
      updateMembership(command.memberAddress);
  }

class ClusterCoordinator…

  private void updateMembership(InetAddressAndPort address) {
      membership = membership.addNewMember(address);
      failureDetector.heartBeatReceived(address);
  }

The coordinator maintains a list of all nodes that are part of the cluster:

class Membership…

  public class Membership {
      List<Member> liveMembers = new ArrayList<>();
      List<Member> failedMembers = new ArrayList<>();
  
      public boolean isFailed(InetAddressAndPort address) {
          return failedMembers.stream().anyMatch(m -> m.address.equals(address));
      }

class Member…

  public class Member implements Comparable<Member> {
      InetAddressAndPort address;
      MemberStatus status;

The coordinator will detect cluster node failures using a
mechanism similar to
Lease.
If a cluster node stops sending the heartbeat, the node
will be marked as failed.

class ClusterCoordinator…

  @Override
  public void onBecomingLeader() {
      scheduledTask = executor.scheduleWithFixedDelay(this::checkMembership,
              1000,
              1000,
              TimeUnit.MILLISECONDS);
      failureDetector.start();
  }

  private void checkMembership() {
      List<Member> failedMembers = getFailedMembers();
      if (!failedMembers.isEmpty()) {
          replicatedLog.propose(new MemberFailedCommand(failedMembers));
      }
  }

  private List<Member> getFailedMembers() {
      List<Member> liveMembers = membership.getLiveMembers();
      return liveMembers.stream()
              .filter(m -> failureDetector.isMonitoring(m.getAddress()) && !failureDetector.isAlive(m.getAddress()))
              .collect(Collectors.toList());

  }
An example scenario

Consider that there are three data servers athens, byzantium and cyrene.
Considering there are 9 partitions, the flow looks like following.

The client can then use the partition table to map a given key
to a particular cluster node.

Now a new cluster node – ‘ephesus’ – is added to the cluster.
The admin triggers a reassignment and the coordinator
checks which nodes are underloaded by checking the partition table.
It figures out that ephesus is the node which is underloaded,
and decides to allocate partition 7 to it, moving it from athens.
The coordinator stores the migrations and then sends the
request to athens to move partition 7 to ephesus.
Once the migration is complete, athens lets the coordinator know.
The coordinator then updates the partition table.

Assigning Partitions To Cluster Nodes

The coordinator assigns partitions to cluster nodes which are known at
that point in time. If it’s triggered every time a new cluster node is added,
it might map partitions too early until the cluster reaches a stable state.
This is why the coordinator should be configured to wait until
the cluster reaches a minimum size.

The first time the partition assignment is done, it can simply
be done in a round robin fashion.

class ClusterCoordinator…

  CompletableFuture assignPartitionsToClusterNodes() {
      if (!minimumClusterSizeReached()) {
          return CompletableFuture.failedFuture(new NotEnoughClusterNodesException(MINIMUM_CLUSTER_SIZE));
      }
      return initializePartitionAssignment();
  }

  private boolean minimumClusterSizeReached() {
      return membership.getLiveMembers().size() >= MINIMUM_CLUSTER_SIZE;
  }
  private CompletableFuture initializePartitionAssignment() {
      partitionAssignmentStatus = PartitionAssignmentStatus.IN_PROGRESS;
      PartitionTable partitionTable = arrangePartitions();
      return replicatedLog.propose(new PartitiontableCommand(partitionTable));
  }

  public PartitionTable arrangePartitions() {
      PartitionTable partitionTable = new PartitionTable();
      List<Member> liveMembers = membership.getLiveMembers();
      for (int partitionId = 1; partitionId <= noOfPartitions; partitionId++) {
          int index = partitionId % liveMembers.size();
          Member member = liveMembers.get(index);
          partitionTable.addPartition(partitionId, new PartitionInfo(partitionId, member.getAddress(), PartitionStatus.ASSIGNED));
      }
      return partitionTable;
  }

The replication log makes the partition table persistent.

class ClusterCoordinator…

  PartitionTable partitionTable;
  PartitionAssignmentStatus partitionAssignmentStatus = PartitionAssignmentStatus.UNASSIGNED;

  private void applyPartitionTableCommand(PartitiontableCommand command) {
      this.partitionTable = command.partitionTable;
      partitionAssignmentStatus = PartitionAssignmentStatus.ASSIGNED;
      if (isLeader()) {
          sendMessagesToMembers(partitionTable);
      }
  }

Once the partition assignment is persisted, the coordinator
sends messages to all cluster nodes to tell each node which partitions
it now owns.

class ClusterCoordinator…

  List<Integer> pendingPartitionAssignments = new ArrayList<>();

  private void sendMessagesToMembers(PartitionTable partitionTable) {
      Map<Integer, PartitionInfo> partitionsTobeHosted = partitionTable.getPartitionsTobeHosted();
      partitionsTobeHosted.forEach((partitionId, partitionInfo) -> {
          pendingPartitionAssignments.add(partitionId);
          HostPartitionMessage message = new HostPartitionMessage(requestNumber++, this.listenAddress, partitionId);
          logger.info("Sending host partition message to " + partitionInfo.hostedOn + " partitionId=" + partitionId);
          scheduler.execute(new RetryableTask(partitionInfo.hostedOn, network, this, partitionId, message));
      });
  }

The controller will keep trying to reach nodes continuously until
its message is successful.

class RetryableTask…

  static class RetryableTask implements Runnable {
      Logger logger = LogManager.getLogger(RetryableTask.class);
      InetAddressAndPort address;
      Network network;
      ClusterCoordinator coordinator;
      Integer partitionId;
      int attempt;
      private Message message;

      public RetryableTask(InetAddressAndPort address, Network network, ClusterCoordinator coordinator, Integer partitionId, Message message) {
          this.address = address;
          this.network = network;
          this.coordinator = coordinator;
          this.partitionId = partitionId;
          this.message = message;
      }

      @Override
      public void run() {
          attempt++;
          try {
              //stop trying if the node is failed.
              if (coordinator.isSuspected(address)) {
                  return;
              }
              logger.info("Sending " + message + " to=" + address);
              network.send(address, message);
          } catch (Exception e) {
              logger.error("Error trying to send ");
              scheduleWithBackOff();
          }
      }

      private void scheduleWithBackOff() {
          scheduler.schedule(this, getBackOffDelay(attempt), TimeUnit.MILLISECONDS);
      }


      private long getBackOffDelay(int attempt) {
          long baseDelay = (long) Math.pow(2, attempt);
          long jitter = randomJitter();
          return baseDelay + jitter;
      }

      private long randomJitter() {
          int i = new Random(1).nextInt();
          i = i < 0 ? i * -1 : i;
          long jitter = i % 50;
          return jitter;
      }

  }

When cluster node receives the request to create the partition,
it creates one with the given partition id.
If we imagine this happening within a simple key-value store,
its implementation will look something like this:

class KVStore…

  Map<Integer, Partition> allPartitions = new ConcurrentHashMap<>();
  private void handleHostPartitionMessage(Message message) {
      Integer partitionId = ((HostPartitionMessage) message).getPartitionId();
      addPartitions(partitionId);
      logger.info("Adding partition " + partitionId + " to " + listenAddress);
      network.send(message.from, new HostPartitionAcks(message.messageId, this.listenAddress, partitionId));
  }

  public void addPartitions(Integer partitionId) {
      allPartitions.put(partitionId, new Partition(partitionId));

  }

class Partition…

  SortedMap<String, String> kv = new TreeMap<>();
  private Integer partitionId;

Once the coordinator receives the message that the partition
has been successfully created,
it persists it in the replicated log and updates the partition status to be online.

class ClusterCoordinator…

  private void handleHostPartitionAck(Message message) {
      int partitionId = ((HostPartitionAcks) message).getPartitionId();
      pendingPartitionAssignments.remove(Integer.valueOf(partitionId));
      logger.info("Received host partition ack from " + message.from + " partitionId=" + partitionId + " pending=" + pendingPartitionAssignments);
      CompletableFuture future = replicatedLog.propose(new UpdatePartitionStatusCommand(partitionId, PartitionStatus.ONLINE));
      future.join();
  }

Once the High-Water Mark is reached,
and the record is applied, the partition’s status will be updated.

class ClusterCoordinator…

  private void updateParitionStatus(UpdatePartitionStatusCommand command) {
      removePendingRequest(command.partitionId);
      logger.info("Changing status for " + command.partitionId + " to " + command.status);
      logger.info(partitionTable.toString());
      partitionTable.updateStatus(command.partitionId, command.status);
  }
Client Interface

If we again consider the example of a simple key and value store,
if a client needs to store or get a value for a particular key,
it can do so by following these steps:

  • The client applies the hash function to the key and finds
    the relevant partition based on the total number of partitions.
  • The client gets the partition table from the coordinator
    and finds the cluster node that is hosting the partition.
    The client also periodically refreshes the partition table.

Clients fetching a partition table from the coordinator can
quickly lead to bottlenecks,
especially if all requests are being served by a
single coordinator leader. That is why it is common practice to
keep metadata available on all cluster nodes.
The coordinator can either push metadata to cluster nodes,
or cluster nodes can pull it from the coordinator.
Clients can then connect with any cluster node to refresh
the metadata.

This is generally implemented inside the client library provided by the key value store,
or by client request handling (which happens on the cluster nodes.)

class Client…

  public void put(String key, String value) throws IOException {
      Integer partitionId = findPartition(key, noOfPartitions);
      InetAddressAndPort nodeAddress = getNodeAddressFor(partitionId);
      sendPutMessage(partitionId, nodeAddress, key, value);
  }

  private InetAddressAndPort getNodeAddressFor(Integer partitionId) {
      PartitionInfo partitionInfo = partitionTable.getPartition(partitionId);
      InetAddressAndPort nodeAddress = partitionInfo.getAddress();
      return nodeAddress;
  }

  private void sendPutMessage(Integer partitionId, InetAddressAndPort address, String key, String value) throws IOException {
      PartitionPutMessage partitionPutMessage = new PartitionPutMessage(partitionId, key, value);
      SocketClient socketClient = new SocketClient(address);
      socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionPutKV.getId(),
                                                JsonSerDes.serialize(partitionPutMessage)));
  }
  public String get(String key) throws IOException {
      Integer partitionId = findPartition(key, noOfPartitions);
      InetAddressAndPort nodeAddress = getNodeAddressFor(partitionId);
      return sendGetMessage(partitionId, key, nodeAddress);
  }

  private String sendGetMessage(Integer partitionId, String key, InetAddressAndPort address) throws IOException {
      PartitionGetMessage partitionGetMessage = new PartitionGetMessage(partitionId, key);
      SocketClient socketClient = new SocketClient(address);
      RequestOrResponse response = socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionGetKV.getId(), JsonSerDes.serialize(partitionGetMessage)));
      PartitionGetResponseMessage partitionGetResponseMessage = JsonSerDes.deserialize(response.getMessageBodyJson(), PartitionGetResponseMessage.class);
      return partitionGetResponseMessage.getValue();
  }
Moving partitions to newly added members

When new nodes are added to a cluster, some partitions can be moved to
other nodes. This can be done automatically once a new cluster node is added.
But it can involve a lot of data being moved across the cluster node,
which is why an administrator will typically trigger the repartitioning.
One simple method to do this is to calculate the average number of partitions
each node should host and then move the additional partitions
to the new node.
For example, if the number of partitions is 30 and there are three existing nodes
in the cluster, each node should host 10 partitions.
If a new node is added, the average per node is about 7. The coordinator
will therefore try to move three partitions from each cluster node
to the new one.

class ClusterCoordinator…

  List<Migration> pendingMigrations = new ArrayList<>();

  boolean reassignPartitions() {
      if (partitionAssignmentInProgress()) {
          logger.info("Partition assignment in progress");
          return false;
      }
      List<Migration> migrations = repartition(this.partitionTable);
      CompletableFuture proposalFuture = replicatedLog.propose(new MigratePartitionsCommand(migrations));
      proposalFuture.join();
      return true;
  }
public List<Migration> repartition(PartitionTable partitionTable) {
    int averagePartitionsPerNode = getAveragePartitionsPerNode();
    List<Member> liveMembers = membership.getLiveMembers();
    var overloadedNodes = partitionTable.getOverloadedNodes(averagePartitionsPerNode, liveMembers);
    var underloadedNodes = partitionTable.getUnderloadedNodes(averagePartitionsPerNode, liveMembers);

    var migrations = tryMovingPartitionsToUnderLoadedMembers(averagePartitionsPerNode, overloadedNodes, underloadedNodes);
    return migrations;
}

private List<Migration> tryMovingPartitionsToUnderLoadedMembers(int averagePartitionsPerNode,
                                                                Map<InetAddressAndPort, PartitionList> overloadedNodes,
                                                                Map<InetAddressAndPort, PartitionList> underloadedNodes) {
    List<Migration> migrations = new ArrayList<>();
    for (InetAddressAndPort member : overloadedNodes.keySet()) {
        var partitions = overloadedNodes.get(member);
        var toMove = partitions.subList(averagePartitionsPerNode, partitions.getSize());
        overloadedNodes.put(member, partitions.subList(0, averagePartitionsPerNode));
        ArrayDeque<Integer> moveQ = new ArrayDeque<Integer>(toMove.partitionList());
        while (!moveQ.isEmpty() && nodeWithLeastPartitions(underloadedNodes, averagePartitionsPerNode).isPresent()) {
            assignToNodesWithLeastPartitions(migrations, member, moveQ, underloadedNodes, averagePartitionsPerNode);
        }
        if (!moveQ.isEmpty()) {
            overloadedNodes.get(member).addAll(moveQ);
        }
    }
    return migrations;
}

int getAveragePartitionsPerNode() {
    return noOfPartitions / membership.getLiveMembers().size();
}

The coordinator will persist the computed migrations in the replicated log
and then send requests to move partitions across the cluster nodes.

private void applyMigratePartitionCommand(MigratePartitionsCommand command) {
    logger.info("Handling partition migrations " + command.migrations);
    for (Migration migration : command.migrations) {
        RequestPartitionMigrationMessage message = new RequestPartitionMigrationMessage(requestNumber++, this.listenAddress, migration);
        pendingMigrations.add(migration);
        if (isLeader()) {
            scheduler.execute(new RetryableTask(migration.fromMember, network, this, migration.getPartitionId(), message));
        }
    }
}

When a cluster node receives a request to migrate, it will mark
the partition as migrating.
This stops any further modifications to the partition.
It will then send the entire partition data to the target node.

class KVStore…

  private void handleRequestPartitionMigrationMessage(RequestPartitionMigrationMessage message) {
      Migration migration = message.getMigration();
      Integer partitionId = migration.getPartitionId();
      InetAddressAndPort toServer = migration.getToMember();
      if (!allPartitions.containsKey(partitionId)) {
          return;// The partition is not available with this node.
      }
      Partition partition = allPartitions.get(partitionId);
      partition.setMigrating();
      network.send(toServer, new MovePartitionMessage(requestNumber++, this.listenAddress, toServer, partition));
  }

The cluster node that receives the request will add
the new partition to itself and
return an acknowledgement.

class KVStore…

  private void handleMovePartition(Message message) {
      MovePartitionMessage movePartitionMessage = (MovePartitionMessage) message;
      Partition partition = movePartitionMessage.getPartition();
      allPartitions.put(partition.getId(), partition);
      network.send(message.from, new PartitionMovementComplete(message.messageId, listenAddress,
              new Migration(movePartitionMessage.getMigrateFrom(), movePartitionMessage.getMigrateTo(),  partition.getId())));
  }

The cluster node previously owned the partition will then
send the migration complete message
to the cluster coordinator.

class KVStore…

  private void handlePartitionMovementCompleteMessage(PartitionMovementComplete message) {
      allPartitions.remove(message.getMigration().getPartitionId());
      network.send(coordLeader, new MigrationCompleteMessage(requestNumber++, listenAddress,
              message.getMigration()));
  }

The cluster coordinator will then mark the migration as complete.
The change will be stored in the replicated log.

class ClusterCoordinator…

  private void handleMigrationCompleteMessage(MigrationCompleteMessage message) {
      MigrationCompleteMessage migrationCompleteMessage = message;
      CompletableFuture propose = replicatedLog.propose(new MigrationCompletedCommand(message.getMigration()));
      propose.join();
  }

class ClusterCoordinator…

  private void applyMigrationCompleted(MigrationCompletedCommand command) {
      pendingMigrations.remove(command.getMigration());
      logger.info("Completed migration " + command.getMigration());
      logger.info("pendingMigrations = " + pendingMigrations);
      partitionTable.migrationCompleted(command.getMigration());
  }

class PartitionTable…

  public void migrationCompleted(Migration migration) {
      this.addPartition(migration.partitionId, new PartitionInfo(migration.partitionId, migration.toMember, ClusterCoordinator.PartitionStatus.ONLINE));
  }



Source link

Share post:

[tds_leads title_text="Subscribe" input_placeholder="Email address" btn_horiz_align="content-horiz-center" pp_checkbox="yes" pp_msg="SSd2ZSUyMHJlYWQlMjBhbmQlMjBhY2NlcHQlMjB0aGUlMjAlM0NhJTIwaHJlZiUzRCUyMiUyMyUyMiUzRVByaXZhY3klMjBQb2xpY3klM0MlMkZhJTNFLg==" f_title_font_family="653" f_title_font_size="eyJhbGwiOiIyNCIsInBvcnRyYWl0IjoiMjAiLCJsYW5kc2NhcGUiOiIyMiJ9" f_title_font_line_height="1" f_title_font_weight="700" f_title_font_spacing="-1" msg_composer="success" display="column" gap="10" input_padd="eyJhbGwiOiIxNXB4IDEwcHgiLCJsYW5kc2NhcGUiOiIxMnB4IDhweCIsInBvcnRyYWl0IjoiMTBweCA2cHgifQ==" input_border="1" btn_text="I want in" btn_tdicon="tdc-font-tdmp tdc-font-tdmp-arrow-right" btn_icon_size="eyJhbGwiOiIxOSIsImxhbmRzY2FwZSI6IjE3IiwicG9ydHJhaXQiOiIxNSJ9" btn_icon_space="eyJhbGwiOiI1IiwicG9ydHJhaXQiOiIzIn0=" btn_radius="3" input_radius="3" f_msg_font_family="653" f_msg_font_size="eyJhbGwiOiIxMyIsInBvcnRyYWl0IjoiMTIifQ==" f_msg_font_weight="600" f_msg_font_line_height="1.4" f_input_font_family="653" f_input_font_size="eyJhbGwiOiIxNCIsImxhbmRzY2FwZSI6IjEzIiwicG9ydHJhaXQiOiIxMiJ9" f_input_font_line_height="1.2" f_btn_font_family="653" f_input_font_weight="500" f_btn_font_size="eyJhbGwiOiIxMyIsImxhbmRzY2FwZSI6IjEyIiwicG9ydHJhaXQiOiIxMSJ9" f_btn_font_line_height="1.2" f_btn_font_weight="700" f_pp_font_family="653" f_pp_font_size="eyJhbGwiOiIxMyIsImxhbmRzY2FwZSI6IjEyIiwicG9ydHJhaXQiOiIxMSJ9" f_pp_font_line_height="1.2" pp_check_color="#000000" pp_check_color_a="#ec3535" pp_check_color_a_h="#c11f1f" f_btn_font_transform="uppercase" tdc_css="eyJhbGwiOnsibWFyZ2luLWJvdHRvbSI6IjQwIiwiZGlzcGxheSI6IiJ9LCJsYW5kc2NhcGUiOnsibWFyZ2luLWJvdHRvbSI6IjM1IiwiZGlzcGxheSI6IiJ9LCJsYW5kc2NhcGVfbWF4X3dpZHRoIjoxMTQwLCJsYW5kc2NhcGVfbWluX3dpZHRoIjoxMDE5LCJwb3J0cmFpdCI6eyJtYXJnaW4tYm90dG9tIjoiMzAiLCJkaXNwbGF5IjoiIn0sInBvcnRyYWl0X21heF93aWR0aCI6MTAxOCwicG9ydHJhaXRfbWluX3dpZHRoIjo3Njh9" msg_succ_radius="2" btn_bg="#ec3535" btn_bg_h="#c11f1f" title_space="eyJwb3J0cmFpdCI6IjEyIiwibGFuZHNjYXBlIjoiMTQiLCJhbGwiOiIxOCJ9" msg_space="eyJsYW5kc2NhcGUiOiIwIDAgMTJweCJ9" btn_padd="eyJsYW5kc2NhcGUiOiIxMiIsInBvcnRyYWl0IjoiMTBweCJ9" msg_padd="eyJwb3J0cmFpdCI6IjZweCAxMHB4In0="]
spot_imgspot_img

Popular

More like this
Related

8 ways to keep human creativity front and center

As artificial intelligence (AI) becomes increasingly integrated into...

iQOO 13 to have downgraded charging, new rumor reiterates

The iQOO 12 launched last November with 120W...

Human Longevity May Have Reached its Upper Limit

For most of the 20th century, each successive...