Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion python/sparknlp/annotator/ner/ner_dl.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ class NerDLApproach(AnnotatorApproach, NerApproach, EvaluationDLParams):
"Whether to check F1 Micro-average or F1 Macro-average as a final metric for the best model.",
TypeConverters.toString)

prefetchBatches = Param(Params._dummy(), "prefetchBatches",
"Number of batches to prefetch while training using memory optimizer. Has no effect if memory optimizer is disabled.",
TypeConverters.toInt)

optimizePartitioning = Param(Params._dummy(), "optimizePartitioning",
"Whether to repartition the dataset before training for optimal performance. Has no effect if memory optimizer is disabled.",
TypeConverters.toBoolean)

def setConfigProtoBytes(self, b):
"""Sets configProto from tensorflow, serialized into byte array.

Expand Down Expand Up @@ -377,6 +385,28 @@ def setBestModelMetric(self, value):
"""
return self._set(bestModelMetric=value)

def setPrefetchBatches(self, value):
"""Sets number of batches to prefetch while training using memory optimizer.
Has no effect if memory optimizer is disabled.

Parameters
----------
value : int
Number of batches to prefetch
"""
return self._set(prefetchBatches=value)

def setOptimizePartitioning(self, value):
"""Sets whether to repartition the dataset before training for optimal performance.
Has no effect if memory optimizer is disabled.

Parameters
----------
value: bool
Whether to optimize partitioning
"""
return self._set(optimizePartitioning=value)

def _create_model(self, java_model):
return NerDLModel(java_model=java_model)

Expand All @@ -400,7 +430,9 @@ def __init__(self):
enableOutputLogs=False,
enableMemoryOptimizer=False,
useBestModel=False,
bestModelMetric="f1_micro"
bestModelMetric="f1_micro",
prefetchBatches=0,
optimizePartitioning=True
)


Expand Down
59 changes: 59 additions & 0 deletions python/test/annotator/ner/ner_dl_approach_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2017-2025 John Snow Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest

import pytest

from sparknlp.annotator import *
from test.util import SparkSessionForTest


@pytest.mark.fast
class NerDLApproachTestSpec(unittest.TestCase):
def setUp(self):
self.spark = SparkSessionForTest.spark

def test_setters(self):
ner_approach = (
NerDLApproach()
.setLr(0.01)
.setPo(0.005)
.setBatchSize(16)
.setDropout(0.01)
.setGraphFolder("graph_folder")
.setConfigProtoBytes([])
.setUseContrib(False)
.setEnableMemoryOptimizer(True)
.setIncludeConfidence(True)
.setIncludeAllConfidenceScores(True)
.setUseBestModel(True)
.setPrefetchBatches(20)
.setOptimizePartitioning(True)
)

# Check param map
param_map = ner_approach.extractParamMap()
self.assertEqual(param_map[ner_approach.lr], 0.01)
self.assertEqual(param_map[ner_approach.po], 0.005)
self.assertEqual(param_map[ner_approach.batchSize], 16)
self.assertEqual(param_map[ner_approach.dropout], 0.01)
self.assertEqual(param_map[ner_approach.graphFolder], "graph_folder")
self.assertEqual(param_map[ner_approach.configProtoBytes], [])
self.assertEqual(param_map[ner_approach.useContrib], False)
self.assertEqual(param_map[ner_approach.enableMemoryOptimizer], True)
self.assertEqual(param_map[ner_approach.includeConfidence], True)
self.assertEqual(param_map[ner_approach.includeAllConfidenceScores], True)
self.assertEqual(param_map[ner_approach.useBestModel], True)
self.assertEqual(param_map[ner_approach.prefetchBatches], 20)
self.assertEqual(param_map[ner_approach.optimizePartitioning], True)
25 changes: 19 additions & 6 deletions src/main/scala/com/johnsnowlabs/nlp/annotators/common/Tagged.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ trait Tagged[T >: TaggedSentence <: TaggedSentence] extends Annotated[T] {
row.getAs[Seq[Row]](colNum).map(obj => Annotation(obj))
}

protected def getLabelsFromSentences(
def getAnnotations(row: Row, col: String): Seq[Annotation] = {
row.getAs[Seq[Row]](col).map(obj => Annotation(obj))
}

def getLabelsFromSentences(
sentences: Seq[WordpieceEmbeddingsSentence],
labelAnnotations: Seq[Annotation]): Seq[TextSentenceLabels] = {
val sortedLabels = labelAnnotations.sortBy(a => a.begin).toArray
Expand Down Expand Up @@ -203,16 +207,25 @@ object NerTagged extends Tagged[NerTaggedSentence] {
dataset: Dataset[Row],
sentenceCols: Seq[String],
labelColumn: String,
batchSize: Int): Iterator[Array[(TextSentenceLabels, WordpieceEmbeddingsSentence)]] = {
batchSize: Int,
shuffleInPartition: Boolean = true)
: Iterator[Array[(TextSentenceLabels, WordpieceEmbeddingsSentence)]] = {

new Iterator[Array[(TextSentenceLabels, WordpieceEmbeddingsSentence)]] {
import com.johnsnowlabs.nlp.annotators.common.DatasetHelpers._

// Send batches, don't collect(), only keeping a single batch in memory anytime
val it: util.Iterator[Row] = dataset
.select(labelColumn, sentenceCols: _*)
.randomize // to improve training
.toLocalIterator() // Uses as much memory as the largest partition, potentially all data if not careful
val it: util.Iterator[Row] = {
val selected = dataset
.select(labelColumn, sentenceCols: _*)
(
// to improve training
// NOTE: This might have implications on model performance, partitions are not shuffled
if (shuffleInPartition) selected.randomize
else
selected
).toLocalIterator() // Uses as much memory as the largest partition, potentially all data if not careful
}

// create a batch
override def next(): Array[(TextSentenceLabels, WordpieceEmbeddingsSentence)] = {
Expand Down
Loading
Loading