Skip to content

Commit

Permalink
fix: retrieval and frame queue
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod authored and GrapeBaBa committed Oct 21, 2024
1 parent 66651a2 commit 760271b
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.optimism.v2.derive.datasource;

import io.optimism.types.BlockInfo;
import io.optimism.v2.derive.stages.DataIter;
import io.optimism.v2.derive.types.BlockInfo;

/**
* the data availability provider interface.
Expand All @@ -11,5 +11,5 @@
*/
public interface DataAvailabilityProvider {

DataIter openData(BlockInfo l1Ref, String batcherAddr);
DataIter openData(BlockInfo l1Ref);
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,69 @@
package io.optimism.v2.derive.datasource.impl;

import io.optimism.v2.derive.datasource.DataAvailabilityProvider;
import io.optimism.v2.derive.exception.PipelineEofException;
import io.optimism.v2.derive.stages.DataIter;
import io.optimism.v2.derive.stages.FrameQueueProvider;
import io.optimism.v2.derive.stages.L1RetrievalProvider;
import io.optimism.v2.derive.stages.OriginAdvancer;
import io.optimism.v2.derive.stages.OriginProvider;
import io.optimism.v2.derive.stages.ResettableStage;
import io.optimism.v2.derive.types.BlockInfo;
import io.optimism.v2.derive.types.SystemConfig;
import java.util.Optional;

/**
* the l1 chain data retrieval.
*
* @author thinkAfCod
* @since 0.4.6
*/
public class L1Retrieval implements FrameQueueProvider, OriginProvider, OriginAdvancer, ResettableStage {
public class L1Retrieval implements FrameQueueProvider {

private final L1RetrievalProvider prev;

private final DataAvailabilityProvider provider;

private Optional<DataIter> data;

/**
* L1Retrieval constructor.
*
* @param prev the previous stage
* @param provider the data availability provider
*/
public L1Retrieval(L1RetrievalProvider prev, DataAvailabilityProvider provider) {
this.prev = prev;
this.provider = provider;
this.data = Optional.empty();
}

@Override
public void advanceOrigin() {}
public byte[] next() {
if (data.isEmpty()) {
var next = this.prev.nextL1Block();
this.data = Optional.ofNullable(this.provider.openData(next));
}
if (this.data.isEmpty()) {
throw new PipelineEofException("");
}

return this.data.get().next();
}

@Override
public BlockInfo origin() {
return null;
public void advanceOrigin() {
((OriginAdvancer) this.prev).advanceOrigin();
}

@Override
public void reset(BlockInfo base, SystemConfig config) {}
public BlockInfo origin() {
return ((OriginProvider) this.prev).origin();
}

@Override
public byte[] next() {
return new byte[0];
public void reset(BlockInfo base, SystemConfig config) {
((ResettableStage) this.prev).reset(base, config);
this.data = Optional.ofNullable(this.provider.openData(base));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
import io.optimism.v2.derive.exception.PipelineEofException;
import io.optimism.v2.derive.exception.PipelineProviderException;
import io.optimism.v2.derive.stages.L1RetrievalProvider;
import io.optimism.v2.derive.stages.OriginAdvancer;
import io.optimism.v2.derive.stages.OriginProvider;
import io.optimism.v2.derive.stages.ResettableStage;
import io.optimism.v2.derive.types.BlockInfo;
import io.optimism.v2.derive.types.SystemConfig;
import java.math.BigInteger;
Expand All @@ -21,7 +18,7 @@
* @author thinkAfCod
* @since 0.4.6
*/
public class L1Traversal implements L1RetrievalProvider, OriginProvider, OriginAdvancer, ResettableStage {
public class L1Traversal implements L1RetrievalProvider {

private final Config.ChainConfig rollupConfig;

Expand All @@ -40,7 +37,12 @@ public L1Traversal(Config.ChainConfig rollupConfig, ChainProvider provider) {

@Override
public BlockInfo nextL1Block() {
return this.block;
if (!this.done) {
this.done = true;
return this.block;
} else {
throw new PipelineEofException();
}
}

@Override
Expand All @@ -51,7 +53,7 @@ public String batcherAddr() {
@Override
public void advanceOrigin() {
if (this.block == null) {
throw new PipelineEofException();
throw new PipelineEofException("Missing current block, can't advance origin with no reference.");
}

var block = this.block;
Expand Down Expand Up @@ -84,6 +86,5 @@ public BlockInfo origin() {
public void reset(BlockInfo base, SystemConfig config) {
this.block = base;
this.curSysConfig = config;
// metrics record stage reset for l1 traversal
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.optimism.v2.derive.exception;

/**
* The frame parse exception.
*
* @author thinkAfCod
* @since 0.4.6
*/
public class FrameParseException extends RuntimeException {
/** Constructs a PipelineEofException. */
public FrameParseException() {
super("parses frame failed");
}

/**
* Constructs a PipelineEofException with a custom message.
*
* @param message the custom error message
*/
public FrameParseException(String message) {
super(message);
}

/**
* Constructs a new PipelineEofException with the specified detail message and cause.
*
* @param message the detail message
* @param cause the cause
*/
public FrameParseException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @author thinkAfCod
* @since 0.4.6
*/
public interface ChannelBankProvider {
public interface ChannelBankProvider extends OriginProvider, OriginAdvancer, ResettableStage {
/**
* gets the next frame in the current channel
*
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/io/optimism/v2/derive/stages/DataIter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
package io.optimism.v2.derive.stages;

/**
* the data iterator interface.
*
* @author thinkAfCod
* @since 0.4.6
*/
public interface DataIter {

byte[] Next();
/**
* get the next data.
* @return the bytes of next data.
*/
byte[] next();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* @author thinkAfCod
* @since 0.4.6
*/
public interface FrameQueueProvider {
public interface FrameQueueProvider extends OriginProvider, OriginAdvancer, ResettableStage {
/**
* gets the bytes of the next raw frame.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @author thinkAfCod
* @since 0.4.6
*/
public interface L1RetrievalProvider {
public interface L1RetrievalProvider extends OriginProvider, OriginAdvancer, ResettableStage {
/**
* get the next L1 block info.
*
Expand Down
98 changes: 94 additions & 4 deletions src/main/java/io/optimism/v2/derive/stages/impl/FrameQueue.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,118 @@
package io.optimism.v2.derive.stages.impl;

import io.optimism.config.Config;
import io.optimism.v2.derive.exception.PipelineProviderException;
import io.optimism.v2.derive.stages.ChannelBankProvider;
import io.optimism.v2.derive.stages.FrameQueueProvider;
import io.optimism.v2.derive.stages.OriginAdvancer;
import io.optimism.v2.derive.stages.OriginProvider;
import io.optimism.v2.derive.stages.ResettableStage;
import io.optimism.v2.derive.types.BlockInfo;
import io.optimism.v2.derive.types.Frame;
import io.optimism.v2.derive.types.SystemConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class FrameQueue implements ChannelBankProvider, OriginProvider, OriginAdvancer, ResettableStage {

private static final int QUEUE_SIZE = 1024;

private final FrameQueueProvider prev;

private final Config.ChainConfig rollupConfig;

private List<Frame> queue;

/**
* The frame queue constructor.
*
* @param prev The previous stage in the pipeline
* @param rollupConfig The rollup configuration
*/
public FrameQueue(FrameQueueProvider prev, Config.ChainConfig rollupConfig) {
this.prev = prev;
this.rollupConfig = rollupConfig;
this.queue = new ArrayList<>(QUEUE_SIZE);
}

/**
* loads more frames into the queue
*/
public void loadFrames() {
if (!this.queue.isEmpty()) {
return;
}
var data = this.prev.next();
List<Frame> frames = Frame.parseFrames(data);
this.queue.addAll(frames);
var origin = this.origin();
if (origin == null) {
throw new PipelineProviderException("Missing origin");
}
this.prune(origin);
}

/**
* prunes frames if Holocene is active
*
* @param origin the l1 origin block
*/
public void prune(BlockInfo origin) {
if (!rollupConfig.isHolocene(origin.timestamp())) {
return;
}
int i = 0;
while (i < this.queue.size()) {
final var prevFrame = this.queue.get(i);
var nextFrame = this.queue.get(i + 1);
var extendsChannel = prevFrame.channelId().equals(nextFrame.channelId());
if (extendsChannel && prevFrame.frameNumber() + 1 != nextFrame.frameNumber()) {
this.queue.remove(i + 1);
continue;
}
if (extendsChannel && prevFrame.isLastFrame()) {
this.queue.remove(i + 1);
continue;
}
if (!extendsChannel && !nextFrame.frameNumber().equals(0)) {
this.queue.remove(i + 1);
continue;
}
if (!extendsChannel
&& !prevFrame.isLastFrame()
&& nextFrame.frameNumber().equals(0)) {
this.queue = this.queue.stream()
.filter(f -> f.channelId().equals(prevFrame.channelId()))
.collect(Collectors.toList());
continue;
}
i += 1;
}
}

@Override
public Frame nextFrame() {
return null;
this.loadFrames();
if (this.queue.isEmpty()) {
throw new PipelineProviderException("Not enough data");
}
return this.queue.removeFirst();
}

@Override
public void advanceOrigin() {}
public void advanceOrigin() {
this.prev.advanceOrigin();
}

@Override
public BlockInfo origin() {
return null;
return this.prev.origin();
}

@Override
public void reset(BlockInfo base, SystemConfig config) {}
public void reset(BlockInfo base, SystemConfig config) {
this.prev.reset(base, config);
this.queue = new ArrayList<>(QUEUE_SIZE);
}
}
Loading

0 comments on commit 760271b

Please sign in to comment.