Skip to content

Commit

Permalink
IGNITE-23494 Do not use ByteUtils#toBytes to serialize assignments (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rpuch authored Oct 22, 2024
1 parent a332245 commit fb97554
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 51 deletions.
2 changes: 2 additions & 0 deletions modules/partition-distribution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ dependencies {
implementation project(':ignite-api')
implementation project(':ignite-core')
implementation libs.jetbrains.annotations

testImplementation libs.hamcrest.core
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.partitiondistribution;

import java.io.Serializable;
import org.apache.ignite.internal.tostring.S;

/**
Expand All @@ -27,9 +26,7 @@
* the asynchronous members (a.k.a. "learners") of the same group. Peers get synchronously updated during write operations, while learners
* are eventually consistent and received updates some time in the future.
*/
public class Assignment implements Serializable {
private static final long serialVersionUID = -8892379245627437834L;

public class Assignment {
private final String consistentId;

private final boolean isPeer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static java.util.Collections.unmodifiableSet;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -29,17 +28,14 @@
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.Nullable;

/**
* Class that encapsulates a set of nodes and its metadata.
*/
public class Assignments implements Serializable {
/** Serial version UID. */
private static final long serialVersionUID = -59553172012153869L;

public class Assignments {
/** Empty assignments. */
public static final Assignments EMPTY =
new Assignments(Collections.emptySet(), false, HybridTimestamp.NULL_HYBRID_TIMESTAMP);
Expand Down Expand Up @@ -140,7 +136,7 @@ public boolean isEmpty() {
* Serializes the instance into an array of bytes.
*/
public byte[] toBytes() {
return ByteUtils.toBytes(this);
return VersionedSerialization.toBytes(this, AssignmentsSerializer.INSTANCE);
}

/**
Expand All @@ -158,7 +154,7 @@ public static byte[] toBytes(Set<Assignment> assignments, long timestamp) {
@Nullable
@Contract("null -> null; !null -> !null")
public static Assignments fromBytes(byte @Nullable [] bytes) {
return bytes == null ? null : ByteUtils.fromBytes(bytes);
return bytes == null ? null : VersionedSerialization.fromBytes(bytes, AssignmentsSerializer.INSTANCE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.partitiondistribution;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.apache.ignite.internal.versioned.VersionedSerializer;

/**
* {@link VersionedSerializer} for {@link Assignments} instances.
*/
public class AssignmentsSerializer extends VersionedSerializer<Assignments> {
/** Serializer instance. */
public static final AssignmentsSerializer INSTANCE = new AssignmentsSerializer();

@Override
protected void writeExternalData(Assignments assignments, IgniteDataOutput out) throws IOException {
out.writeVarInt(assignments.nodes().size());
for (Assignment assignment : assignments.nodes()) {
writeAssignment(assignment, out);
}

out.writeBoolean(assignments.force());
// Writing long and not varlong as the latter will take 9 bytes for timestamps.
out.writeLong(assignments.timestamp());
}

private static void writeAssignment(Assignment assignment, IgniteDataOutput out) throws IOException {
out.writeUTF(assignment.consistentId());
out.writeBoolean(assignment.isPeer());
}

@Override
protected Assignments readExternalData(byte protoVer, IgniteDataInput in) throws IOException {
Set<Assignment> nodes = readNodes(in);
boolean force = in.readBoolean();
long timestamp = in.readLong();

return force ? Assignments.forced(nodes, timestamp) : Assignments.of(nodes, timestamp);
}

private static Set<Assignment> readNodes(IgniteDataInput in) throws IOException {
int length = in.readVarIntAsInt();

Set<Assignment> nodes = new HashSet<>();
for (int i = 0; i < length; i++) {
nodes.add(readAssignment(in));
}

return nodes;
}

private static Assignment readAssignment(IgniteDataInput in) throws IOException {
String consistentId = in.readUTF();
boolean isPeer = in.readBoolean();

return isPeer ? Assignment.forPeer(consistentId) : Assignment.forLearner(consistentId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.partitiondistribution;

import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import java.util.Base64;
import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class AssignmentsSerializerTest {
private static final String NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMCYQECYgAA6AMAAAAAAAA=";
private static final String FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMCYQECYgAB6AMAAAAAAAA=";

private final AssignmentsSerializer serializer = new AssignmentsSerializer();

@ParameterizedTest
@ValueSource(booleans = {true, false})
void serializationAndDeserialization(boolean force) {
Set<Assignment> nodes = Set.of(Assignment.forPeer("abc"), Assignment.forLearner("def"));
Assignments originalAssignments = force ? Assignments.forced(nodes, 1000L) : Assignments.of(nodes, 1000L);

byte[] bytes = VersionedSerialization.toBytes(originalAssignments, serializer);
Assignments restoredAssignments = VersionedSerialization.fromBytes(bytes, serializer);

assertThat(restoredAssignments, equalTo(originalAssignments));
}

@Test
void v1NotForcedCanBeDeserialized() {
byte[] bytes = Base64.getDecoder().decode(NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1);
Assignments restoredAssignments = VersionedSerialization.fromBytes(bytes, serializer);

assertNodesFromV1(restoredAssignments);

assertThat(restoredAssignments.force(), is(false));
assertThat(restoredAssignments.timestamp(), is(1000L));
}

@Test
void v1ForcedCanBeDeserialized() {
byte[] bytes = Base64.getDecoder().decode(FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1);
Assignments restoredAssignments = VersionedSerialization.fromBytes(bytes, serializer);

assertNodesFromV1(restoredAssignments);

assertThat(restoredAssignments.force(), is(true));
assertThat(restoredAssignments.timestamp(), is(1000L));
}

private static void assertNodesFromV1(Assignments restoredAssignments) {
assertThat(restoredAssignments.nodes(), hasSize(2));
List<Assignment> orderedNodes = restoredAssignments.nodes().stream()
.sorted(comparing(Assignment::consistentId))
.collect(toList());

Assignment assignment1 = orderedNodes.get(0);
assertThat(assignment1.consistentId(), is("a"));
assertThat(assignment1.isPeer(), is(true));

Assignment assignment2 = orderedNodes.get(1);
assertThat(assignment2.consistentId(), is("b"));
assertThat(assignment2.isPeer(), is(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.partitiondistribution;

import static java.util.Objects.nonNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -30,19 +29,12 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.network.ClusterNode;
import org.junit.jupiter.api.Test;

/**
* Test for Rendezvous distribution function.
*/
public class RendezvousDistributionFunctionTest {
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(RendezvousDistributionFunctionTest.class);

/** Distribution deviation ratio. */
public static final double DISTRIBUTION_DEVIATION_RATIO = 0.2;

Expand Down Expand Up @@ -106,35 +98,6 @@ private List<String> prepareNetworkTopology(int nodes) {
.collect(Collectors.toUnmodifiableList());
}

@Test
public void serializeAssignment() {
int nodeCount = 50;

int parts = 10_000;

int replicas = 4;

List<String> nodes = prepareNetworkTopology(nodeCount);

assertTrue(parts > nodeCount, "Partitions should be more than nodes");

List<List<String>> assignment = RendezvousDistributionFunction.assignPartitions(
nodes,
parts,
replicas,
false,
null
);

byte[] assignmentBytes = ByteUtils.toBytes(assignment);

LOG.info("Assignment is serialized successfully [bytes={}]", assignmentBytes.length);

List<List<ClusterNode>> deserializedAssignment = (List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentBytes);

assertEquals(assignment, deserializedAssignment);
}

/**
* Returns sorted and compacted string representation of given {@code col}. Two nearby numbers with difference at most 1 are compacted
* to one continuous segment. E.g. collection of [1, 2, 3, 5, 6, 7, 10] will be compacted to [1-3, 5-7, 10].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
Expand Down Expand Up @@ -455,7 +454,7 @@ private boolean stableKeySwitchMessage(NetworkMessage msg, int partId, Assignmen
ByteArray opKey = new ByteArray(toByteArray(operation.key()));

if (operation.type() == OperationType.PUT && opKey.equals(stablePartAssignmentsKey)) {
return blockedAssignments.equals(ByteUtils.fromBytes(toByteArray(operation.value())));
return blockedAssignments.equals(Assignments.fromBytes(toByteArray(operation.value())));
}
}
}
Expand Down

0 comments on commit fb97554

Please sign in to comment.