Skip to content

Commit

Permalink
Merge pull request #657 from vertexmc/fiforace
Browse files Browse the repository at this point in the history
Fixes race condition in FIFO resync/put/ack
  • Loading branch information
vertexmc authored Feb 9, 2018
2 parents ea504cc + ff89dd4 commit 24bf21c
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 6 deletions.
11 changes: 6 additions & 5 deletions synapse/lib/fifo.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import synapse.lib.msgpack as s_msgpack
import synapse.lib.atomfile as s_atomfile

logger = logging.getLogger(__file__)
logger = logging.getLogger(__name__)

class Fifo(s_config.Config):

Expand Down Expand Up @@ -86,7 +86,8 @@ def xmit(qent):
fifo.resync(xmit=xmit)
'''
return self.wind.resync(xmit=xmit)
with self.lock:
return self.wind.resync(xmit=xmit)

def _findFifoAtom(self, nseq):
# if they specify an un-aligned sequence, find the prev
Expand Down Expand Up @@ -281,8 +282,9 @@ def resync(self, xmit=None):
if xmit is not None:
self._xmit = xmit

for item in self.dequ:
self._xmit(item)
with self.lock:
for item in self.dequ:
self._xmit(item)

def _run_xmit(self, item):
if self._xmit is not None:
Expand All @@ -300,7 +302,6 @@ def ack(self, seqn):
Args:
seqn (int): The sequence number to acknowledge.
'''

if seqn == -1:
self.resync()
return False
Expand Down
105 changes: 104 additions & 1 deletion synapse/tests/test_lib_fifo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from synapse.tests.common import *

import synapse.lib.fifo as s_fifo
Expand Down Expand Up @@ -207,3 +206,107 @@ def test_fifo_puts(self):
fifo.puts(('foo', 'bar'))

self.eq(tuple(sent), ((0, 4, 'foo'), (4, 8, 'bar')))

def test_fifo_resync_race_put(self):
with self.getTestDir() as dirn:
N = 1000
conf = {'dir': dirn}
evt = threading.Event()
items = ['foo' + str(i) for i in range(N)]
sent = []

def race(data):
evt.set()
time.sleep(0)
sent.append(data[2])

@firethread
def otherwrite():
evt.wait()
fifo.puts(('attempting to mutate', 'during iteration'))

with s_fifo.Fifo(conf) as fifo:
fifo.puts(items)
thr = otherwrite()
fifo.resync(xmit=race)
thr.join()

self.len(N + 2, sent)
self.eq(sent[0:N], items)
self.eq(sent[N:N + 1], ['attempting to mutate'])
self.eq(sent[N + 1:N + 2], ['during iteration'])

with s_fifo.Fifo(conf) as fifo:
fifo.resync(xmit=race)

self.len(2 * (N + 2), sent)
self.eq(sent[0:N], items)
self.eq(sent[N:N + 1], ['attempting to mutate'])
self.eq(sent[N + 1:N + 2], ['during iteration'])
self.eq(sent[N + 2:2 * N + 2], items)
self.eq(sent[2 * N + 2:2 * N + 3], ['attempting to mutate'])
self.eq(sent[2 * N + 3:2 * N + 4], ['during iteration'])

def test_fifo_resync_race_ack(self):
with self.getTestDir() as dirn:
N = 1000
conf = {'dir': dirn}
evt = threading.Event()
items = ['foo' + str(i) for i in range(N)]
sent = []

def race(data):
evt.set()
time.sleep(0)
sent.append(data[2])

@firethread
def otherwrite():
evt.wait()

# This call to ack will not actually cull anything because
# it won't run until after iteration has completed.
fifo.ack(100)

with s_fifo.Fifo(conf) as fifo:
fifo.puts(items)
thr = otherwrite()
fifo.resync(xmit=race)
thr.join()

# The end result should be all of the items in order.
self.len(N, sent)
self.eq(sent, items)

def test_fifo_resync_race_ack_resync(self):
with self.getTestDir() as dirn:
N = 1000
conf = {'dir': dirn}
evt = threading.Event()
items = ['foo' + str(i) for i in range(N)]
sent = []

def race(data):
evt.set()
time.sleep(0)
sent.append(data[2])

@firethread
def otherwrite():
evt.wait()

# This call to ack will not actually cull anything because
# its seqn is -1. Instead, it will call resync, which won't
# until after iteration has completed.
fifo.ack(-1)

with s_fifo.Fifo(conf) as fifo:
fifo.puts(items)
thr = otherwrite()
fifo.resync(xmit=race)
thr.join()

# The end result should be all of the items in order, followed by all of the items in order again
self.len(2 * N, sent)
self.eq(sent[0:N], items)
self.eq(sent[N:2 * N], items)

0 comments on commit 24bf21c

Please sign in to comment.