Skip to content

Commit

Permalink
Merge pull request #41986 from lochana-chathura/worker_new3
Browse files Browse the repository at this point in the history
Add `stream-receive-action` parsing support
  • Loading branch information
lochana-chathura authored Jan 16, 2024
2 parents 4c11167 + 3a75d12 commit 1dfe02c
Show file tree
Hide file tree
Showing 37 changed files with 2,888 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public enum DiagnosticErrorCode implements DiagnosticCode {
WORKER_SEND_RECEIVE_PARAMETER_COUNT_MISMATCH("BCE2082", "worker.send.receive.parameter.count.mismatch"),
INVALID_WORKER_INTERACTION("BCE2083", "worker.invalid.worker.interaction"),
WORKER_INTERACTIONS_ONLY_ALLOWED_BETWEEN_PEERS("BCE2084", "worker.interactions.only.allowed.between.peers"),
// VACANT_ERROR("BCE2085", ""),
STREAM_RECEIVE_ACTION_NOT_YET_SUPPORTED("BCE2085", "stream.receive.action.not.yet.supported"),
// VACANT_ERROR("BCE2086", ""),
EXPLICIT_WORKER_CANNOT_BE_DEFAULT("BCE2087", "explicit.worker.cannot.be.default"),
INVALID_MULTIPLE_FORK_JOIN_SEND("BCE2088", "worker.multiple.fork.join.send"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.wso2.ballerinalang.compiler.parser;

import io.ballerina.compiler.syntax.tree.AlternateReceiveWorkerNode;
import io.ballerina.compiler.syntax.tree.AlternateReceiveNode;
import io.ballerina.compiler.syntax.tree.AnnotAccessExpressionNode;
import io.ballerina.compiler.syntax.tree.AnnotationAttachPointNode;
import io.ballerina.compiler.syntax.tree.AnnotationDeclarationNode;
Expand Down Expand Up @@ -203,12 +203,14 @@
import io.ballerina.compiler.syntax.tree.SeparatedNodeList;
import io.ballerina.compiler.syntax.tree.ServiceDeclarationNode;
import io.ballerina.compiler.syntax.tree.SimpleNameReferenceNode;
import io.ballerina.compiler.syntax.tree.SingleReceiveNode;
import io.ballerina.compiler.syntax.tree.SingletonTypeDescriptorNode;
import io.ballerina.compiler.syntax.tree.SpecificFieldNode;
import io.ballerina.compiler.syntax.tree.SpreadFieldNode;
import io.ballerina.compiler.syntax.tree.SpreadMemberNode;
import io.ballerina.compiler.syntax.tree.StartActionNode;
import io.ballerina.compiler.syntax.tree.StatementNode;
import io.ballerina.compiler.syntax.tree.StreamReceiveNode;
import io.ballerina.compiler.syntax.tree.StreamTypeDescriptorNode;
import io.ballerina.compiler.syntax.tree.StreamTypeParamsNode;
import io.ballerina.compiler.syntax.tree.SyncSendActionNode;
Expand Down Expand Up @@ -2527,25 +2529,23 @@ public BLangNode transform(ReceiveActionNode receiveActionNode) {
Location receiveActionPos = getPosition(receiveActionNode);
Node receiveWorkers = receiveActionNode.receiveWorkers();

if (receiveWorkers.kind() == SyntaxKind.SIMPLE_NAME_REFERENCE) {
BLangWorkerReceive singleWorkerRecv =
createSimpleWorkerReceive(((SimpleNameReferenceNode) receiveWorkers).name());
if (receiveWorkers.kind() == SyntaxKind.SINGLE_RECEIVE) {
SingleReceiveNode singleReceiveNode = (SingleReceiveNode) receiveWorkers;
BLangWorkerReceive singleWorkerRecv = createSimpleWorkerReceive(singleReceiveNode.worker().name());
singleWorkerRecv.pos = receiveActionPos;
return singleWorkerRecv;
}

if (receiveWorkers.kind() == SyntaxKind.ALTERNATE_RECEIVE_WORKER) {
if (receiveWorkers.kind() == SyntaxKind.ALTERNATE_RECEIVE) {
SeparatedNodeList<SimpleNameReferenceNode> alternateWorkers =
((AlternateReceiveWorkerNode) receiveWorkers).workers();
List<BLangWorkerReceive> workerReceives = new ArrayList<>(alternateWorkers.size());
for (SimpleNameReferenceNode w : alternateWorkers) {
workerReceives.add(createSimpleWorkerReceive(w.name()));
}
((AlternateReceiveNode) receiveWorkers).workers();
return createAlternateWorkerReceive(alternateWorkers, receiveActionPos);
}

BLangAlternateWorkerReceive alternateWorkerRecv = TreeBuilder.createAlternateWorkerReceiveNode();
alternateWorkerRecv.setWorkerReceives(workerReceives);
alternateWorkerRecv.pos = receiveActionPos;
return alternateWorkerRecv;
if (receiveWorkers.kind() == SyntaxKind.STREAM_RECEIVE) {
dlog.error(receiveActionPos, DiagnosticErrorCode.STREAM_RECEIVE_ACTION_NOT_YET_SUPPORTED);
// mock rest of the flow as an alternative receive
return createAlternateWorkerReceive(((StreamReceiveNode) receiveWorkers).workers(), receiveActionPos);
}

ReceiveFieldsNode receiveFieldsNode = (ReceiveFieldsNode) receiveWorkers;
Expand All @@ -2571,6 +2571,19 @@ public BLangNode transform(ReceiveActionNode receiveActionNode) {
return multipleWorkerRv;
}

private BLangAlternateWorkerReceive createAlternateWorkerReceive(
SeparatedNodeList<SimpleNameReferenceNode> alternateWorkers, Location receiveActionPos) {
List<BLangWorkerReceive> workerReceives = new ArrayList<>(alternateWorkers.size());
for (SimpleNameReferenceNode w : alternateWorkers) {
workerReceives.add(createSimpleWorkerReceive(w.name()));
}

BLangAlternateWorkerReceive alternateWorkerRecv = TreeBuilder.createAlternateWorkerReceiveNode();
alternateWorkerRecv.setWorkerReceives(workerReceives);
alternateWorkerRecv.pos = receiveActionPos;
return alternateWorkerRecv;
}

private BLangWorkerReceive createSimpleWorkerReceive(Token workerRef) {
BLangWorkerReceive workerReceiveExpr = (BLangWorkerReceive) TreeBuilder.createWorkerReceiveNode();
workerReceiveExpr.setWorkerName(createIdentifier(workerRef));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2021,3 +2021,6 @@ error.inferred.query.construct.type.as.stream=\

error.invalid.binding.pattern.in.on.fail=\
invalid binding pattern in ''on fail'' clause: only a capture binding pattern or an error binding pattern is allowed

error.stream.receive.action.not.yet.supported=\
stream receive action not yet supported
Original file line number Diff line number Diff line change
Expand Up @@ -13076,63 +13076,147 @@ private STNode parseSyncSendToken() {
* multiple-receive-action := <- { receive-field (, receive-field)* }
* <br></br>
* alternate-receive-action := <- peer-worker (| peer-worker)*
* <br></br>
* stream-receive-action := <- stream `(` peer-worker (| peer-worker)* `)`
* </code>
*
* @return Receive action
*/
private STNode parseReceiveAction() {
STNode leftArrow = parseLeftArrowToken();
STNode receiveWorkers = parseReceiveWorkers();
return STNodeFactory.createReceiveActionNode(leftArrow, receiveWorkers);
STNode receiveRhs = parseReceiveActionRhs();
return STNodeFactory.createReceiveActionNode(leftArrow, receiveRhs);
}

private STNode parseReceiveWorkers() {
private STNode parseReceiveActionRhs() {
switch (peek().kind) {
case FUNCTION_KEYWORD:
case IDENTIFIER_TOKEN:
return parseSingleOrAlternateReceiveWorkers();
return parseSingleOrAlternateReceiveRhs();
case OPEN_BRACE_TOKEN:
return parseMultipleReceiveWorkers();
return parseMultipleReceiveRhs();
case STREAM_KEYWORD:
return parseStreamReceiveRhs();
default:
recover(peek(), ParserRuleContext.RECEIVE_WORKERS);
return parseReceiveWorkers();
recover(peek(), ParserRuleContext.RECEIVE_ACTION_RHS);
return parseReceiveActionRhs();
}
}

private STNode parseSingleOrAlternateReceiveWorkers() {
startContext(ParserRuleContext.SINGLE_OR_ALTERNATE_WORKER);
List<STNode> workers = new ArrayList<>();
// Parse first peer worker name, that has no leading comma
STNode peerWorker = parsePeerWorkerName();
workers.add(peerWorker);

/**
* Parse single or alternate receive action rhs.
* <p>
* <p><code>
* single-receive-rhs := peer-worker
* <br></br>
* alternate-receive-rhs := peer-worker (| peer-worker)*
* </code>
*
* @return Receive action
*/
private STNode parseSingleOrAlternateReceiveRhs() {
startContext(ParserRuleContext.SINGLE_OR_ALTERNATE_RECEIVE_RHS);
STNode firstPeerWorker = parsePeerWorkerName();
STToken nextToken = peek();
if (nextToken.kind != SyntaxKind.PIPE_TOKEN) {
endContext();
return peerWorker;
return STNodeFactory.createSingleReceiveNode(firstPeerWorker);
}

STNode peerWorkers = parsePeerWorkers(firstPeerWorker);
endContext();
return STNodeFactory.createAlternateReceiveNode(peerWorkers);
}

/**
* Parse peer worker list separated by `|`.
*
* @param firstPeerWorker first peer worker node
* @return Parsed node
*/
private STNode parsePeerWorkers(STNode firstPeerWorker) {
List<STNode> workers = new ArrayList<>();
workers.add(firstPeerWorker);

// Parse the remaining peer worker names
STToken nextToken = peek();
while (nextToken.kind == SyntaxKind.PIPE_TOKEN) {
STNode pipeToken = consume();
workers.add(pipeToken);
peerWorker = parsePeerWorkerName();
STNode peerWorker = parsePeerWorkerName();
workers.add(peerWorker);
nextToken = peek();
}

return STNodeFactory.createNodeList(workers);
}

/**
* Parse stream receive action rhs.
* <p>
* <code>
* stream-receive-rhs := stream `(` peer-worker (| peer-worker)* `)`
* </code>
*
* @return Parsed node
*/
private STNode parseStreamReceiveRhs() {
startContext(ParserRuleContext.STREAM_RECEIVE_RHS);
STNode streamKeyword = parseStreamKeyword();
STNode openParen = parseOpenParenthesis();
STNode workers = parseStreamPeerWorkers();
STNode closeParen = parseCloseParenthesis();
endContext();
return STNodeFactory.createAlternateReceiveWorkerNode(STNodeFactory.createNodeList(workers));
return STNodeFactory.createStreamReceiveNode(streamKeyword, openParen, workers, closeParen);
}

/**
* Parse stream-keyword.
*
* @return Parsed node
*/
private STNode parseStreamKeyword() {
STToken token = peek();
if (token.kind == SyntaxKind.STREAM_KEYWORD) {
return consume();
} else {
recover(token, ParserRuleContext.STREAM_KEYWORD);
return parseStreamKeyword();
}
}

/**
* Parse multiple worker receivers.
* Parse stream receive peer worker list.
* <p>
* <code>{ receive-field (, receive-field)* }</code>
* <code>
* stream-receive-peer-workers := peer-worker (| peer-worker)*
* </code>
*
* @return Multiple worker receiver node
* @return Parsed node
*/
private STNode parseStreamPeerWorkers() {
STNode firstPeerWorker = parsePeerWorkerName();

STToken nextToken = peek();
if (nextToken.kind != SyntaxKind.PIPE_TOKEN) {
return STNodeFactory.createNodeList(new ArrayList<>(Collections.singletonList(firstPeerWorker)));
}

return parsePeerWorkers(firstPeerWorker);
}

/**
* Parse multiple receive action rhs.
* <p>
* <code>
* multiple-receive-rhs := { receive-field (, receive-field)* }
* <br></br>
* receive-field := peer-worker | field-name : peer-worker
* </code>
*
* @return Parsed node
*/
private STNode parseMultipleReceiveWorkers() {
private STNode parseMultipleReceiveRhs() {
startContext(ParserRuleContext.MULTI_RECEIVE_WORKERS);
STNode openBrace = parseOpenBrace();
STNode receiveFields = parseReceiveFields();
Expand Down
Loading

0 comments on commit 1dfe02c

Please sign in to comment.