Skip to content

Commit

Permalink
Thread and Worker class for blocks(threading=True)
Browse files Browse the repository at this point in the history
Blockchain
* Check if all block could be fetched added
* Thread and Worker class for block fetching added
  • Loading branch information
holgern committed Jun 15, 2018
1 parent 8137a6a commit 3c0f1cc
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 52 deletions.
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ Changelog
* "time" and "expiration" are parsed to a datetime object inside all block objects
* The json() function returns the original not parsed json dict. It is available for Account, Block, BlockHeader, Comment, Vote and Witness
* json_transactions and json_operations added to Block, for returning all dates as string
* Issues #27 and #28 fixed (thanks to crokkon for reporting)
* Thread and Worker class for blockchain.blocks(threading=True)

0.19.37
-------
Expand Down
208 changes: 175 additions & 33 deletions beem/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
from builtins import str
from builtins import range
from builtins import object
import sys
import time
import hashlib
import json
import math
from threading import Thread, Event
from time import sleep
import logging
from datetime import datetime, timedelta
from .utils import formatTimeString, addTzInfo
Expand All @@ -20,13 +23,139 @@
from beem.instance import shared_steem_instance
from .amount import Amount
log = logging.getLogger(__name__)
FUTURES_MODULE = None
if not FUTURES_MODULE:
try:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
FUTURES_MODULE = "futures"
except ImportError:
FUTURES_MODULE = None
if sys.version_info < (3, 0):
from Queue import Queue
else:
from queue import Queue


# default exception handler. if you want to take some action on failed tasks
# maybe add the task back into the queue, then make your own handler and pass it in
def default_handler(name, exception, *args, **kwargs):
print('%s raised %s with args %s and kwargs %s' % (name, str(exception), repr(args), repr(kwargs)))
pass


class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, name, queue, results, abort, idle, exception_handler):
Thread.__init__(self)
self.name = name
self.queue = queue
self.results = results
self.abort = abort
self.idle = idle
self.exception_handler = exception_handler
self.daemon = True
self.start()

def run(self):
"""Thread work loop calling the function with the params"""
# keep running until told to abort
while not self.abort.is_set():
try:
# get a task and raise immediately if none available
func, args, kwargs = self.queue.get(False)
self.idle.clear()
except:
# no work to do
# if not self.idle.is_set():
# print >> stdout, '%s is idle' % self.name
self.idle.set()
continue

try:
# the function may raise
result = func(*args, **kwargs)
if(result is not None):
self.results.put(result)
except Exception as e:
# so we move on and handle it in whatever way the caller wanted
self.exception_handler(self.name, e, args, kwargs)
finally:
# task complete no matter what happened
self.queue.task_done()


# class for thread pool
class Pool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, thread_count, batch_mode=False, exception_handler=default_handler):
# batch mode means block when adding tasks if no threads available to process
self.queue = Queue(thread_count if batch_mode else 0)
self.resultQueue = Queue(0)
self.thread_count = thread_count
self.exception_handler = exception_handler
self.aborts = []
self.idles = []
self.threads = []

def __del__(self):
"""Tell my threads to quit"""
self.abort()

def run(self, block=False):
"""Start the threads, or restart them if you've aborted"""
# either wait for them to finish or return false if some arent
if block:
while self.alive():
sleep(1)
elif self.alive():
return False

# go start them
self.aborts = []
self.idles = []
self.threads = []
for n in range(self.thread_count):
abort = Event()
idle = Event()
self.aborts.append(abort)
self.idles.append(idle)
self.threads.append(Worker('thread-%d' % n, self.queue, self.resultQueue, abort, idle, self.exception_handler))
return True

def enqueue(self, func, *args, **kargs):
"""Add a task to the queue"""
self.queue.put((func, args, kargs))

def join(self):
"""Wait for completion of all the tasks in the queue"""
self.queue.join()

def abort(self, block=False):
"""Tell each worker that its done working"""
# tell the threads to stop after they are done with what they are currently doing
for a in self.aborts:
a.set()
# wait for them to finish if requested
while block and self.alive():
sleep(1)

def alive(self):
"""Returns True if any threads are currently running"""
return True in [t.is_alive() for t in self.threads]

def idle(self):
"""Returns True if all threads are waiting for work"""
return False not in [i.is_set() for i in self.idles]

def done(self):
"""Returns True if not tasks are left to be completed"""
return self.queue.empty()

def results(self, sleep_time=0):
"""Get the set of results that have been processed, repeatedly call until done"""
sleep(sleep_time)
results = []
try:
while True:
# get a result, raises empty exception immediately if none available
results.append(self.resultQueue.get(False))
self.resultQueue.task_done()
except:
pass
return results


@python_2_unicode_compatible
Expand Down Expand Up @@ -208,7 +337,7 @@ def block_timestamp(self, block_num):
).time()
return int(time.mktime(block_time.timetuple()))

def blocks(self, start=None, stop=None, max_batch_size=None, threading=False, thread_num=8, only_ops=False, only_virtual_ops=False):
def blocks(self, start=None, stop=None, max_batch_size=None, threading=False, thread_num=8, thread_limit=1000, only_ops=False, only_virtual_ops=False):
""" Yields blocks starting from ``start``.
:param int start: Starting block
Expand All @@ -217,6 +346,7 @@ def blocks(self, start=None, stop=None, max_batch_size=None, threading=False, th
Cannot be combined with threading
:param bool threading: Enables threading. Cannot be combined with batch calls
:param int thread_num: Defines the number of threads, when `threading` is set.
:param int thread_limit: Thread queue size (Default 1000)
:param bool only_ops: Only yield operations (default: False).
Cannot be combined with ``only_virtual_ops=True``.
:param bool only_virtual_ops: Only yield virtual operations (default: False)
Expand All @@ -242,42 +372,53 @@ def blocks(self, start=None, stop=None, max_batch_size=None, threading=False, th
else:
current_block_num = self.get_current_block_num()
head_block = current_block_num
if threading and FUTURES_MODULE and not head_block_reached:
pool = ThreadPoolExecutor(max_workers=thread_num + 1)
if threading and not head_block_reached:
# pool = ThreadPoolExecutor(max_workers=thread_num + 1)
pool = Pool(thread_num + 1)
# disable autoclean
auto_clean = current_block.get_cache_auto_clean()
current_block.set_cache_auto_clean(False)
latest_block = 0
for blocknum in range(start, head_block + 1, thread_num):
futures = []
i = blocknum
latest_block = start
for blocknum in range(start, head_block + 1, thread_limit):
# futures = []
i = 0
block_num_list = []

while i < blocknum + thread_num and i <= head_block:
block_num_list.append(i)
futures.append(pool.submit(Block, i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem))
results = []
while i < thread_limit and blocknum + i <= head_block:
block_num_list.append(blocknum + i)
pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
i += 1
try:
results = [r.result() for r in as_completed(futures)]
except Exception as e:
log.warning(str(e))
pool.run(True)
results = []
while not pool.done() or not pool.idle():
for result in pool.results():
results.append(result)
for result in pool.results():
results.append(result)
pool.abort()

result_block_nums = []
checked_results = []
for b in results:
result_block_nums.append(int(b.identifier))
if latest_block < int(b.identifier):
latest_block = int(b.identifier)
if isinstance(b, dict) and "transactions" in b:
if len(b.operations) > 0:
checked_results.append(b)
result_block_nums.append(int(b.identifier))

missing_block_num = list(set(block_num_list).difference(set(result_block_nums)))
if len(missing_block_num) > 0:
for blocknum in missing_block_num:
block = Block(blocknum, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
results.append(block)
checked_results.append(block)
from operator import itemgetter
blocks = sorted(results, key=itemgetter('id'))
blocks = sorted(checked_results, key=itemgetter('id'))
for b in blocks:
if latest_block < int(b.identifier):
latest_block = int(b.identifier)
yield b
current_block.clear_cache_from_expired_items()
if latest_block < head_block:
for blocknum in range(latest_block, head_block + 1):
for blocknum in range(latest_block + 1, head_block + 1):
block = Block(blocknum, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
yield block
current_block.set_cache_auto_clean(auto_clean)
Expand Down Expand Up @@ -391,7 +532,7 @@ def ops(self, start=None, stop=None, only_virtual_ops=False, **kwargs):
"""
raise DeprecationWarning('Blockchain.ops() is deprecated. Please use Blockchain.stream() instead.')

def ops_statistics(self, start, stop=None, add_to_ops_stat=None, verbose=False):
def ops_statistics(self, start, stop=None, add_to_ops_stat=None, with_virtual_ops=True, verbose=False):
""" Generates statistics for all operations (including virtual operations) starting from
``start``.
Expand Down Expand Up @@ -419,10 +560,11 @@ def ops_statistics(self, start, stop=None, add_to_ops_stat=None, verbose=False):
if verbose:
print(block["identifier"] + " " + block["timestamp"])
ops_stat = block.ops_statistics(add_to_ops_stat=ops_stat)
for block in self.blocks(start=start, stop=stop, only_ops=True, only_virtual_ops=True):
if verbose:
print(block["identifier"] + " " + block["timestamp"])
ops_stat = block.ops_statistics(add_to_ops_stat=ops_stat)
if with_virtual_ops:
for block in self.blocks(start=start, stop=stop, only_ops=True, only_virtual_ops=True):
if verbose:
print(block["identifier"] + " " + block["timestamp"])
ops_stat = block.ops_statistics(add_to_ops_stat=ops_stat)
return ops_stat

def stream(self, opNames=[], raw_ops=False, *args, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions examples/benchmark_beem.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
how_many_hours = 1
nodes = NodeList()
if node_setup == 0:
stm = Steem(node=nodes.get_nodes(normal=False, wss=False), num_retries=10)
stm = Steem(node=nodes.get_nodes(normal=True, wss=True), num_retries=10)
max_batch_size = None
threading = False
thread_num = 8
elif node_setup == 1:
stm = Steem(node=nodes.get_nodes(normal=False, wss=False), num_retries=10)
stm = Steem(node=nodes.get_nodes(normal=True, wss=True), num_retries=10)
max_batch_size = None
threading = True
thread_num = 8
Expand Down Expand Up @@ -60,9 +60,9 @@
for op in tx["operations"]:
total_transaction += 1
if "block" in entry:
block_time = parse_time(entry["block"]["timestamp"])
block_time = (entry["block"]["timestamp"])
else:
block_time = parse_time(entry["timestamp"])
block_time = (entry["timestamp"])

if block_time > stopTime:
total_duration = formatTimedelta(datetime.now() - startTime)
Expand Down
30 changes: 15 additions & 15 deletions tests/beem/test_blockchain_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,30 @@
from beem.instance import set_shared_steem_instance
from beem.nodelist import NodeList

wif = "5KQwrPbwdL6PhXujxW37FSSQZ1JiwsST4cqQzDeyXtP79zkvFD3"


class Testcases(unittest.TestCase):
@classmethod
def setUpClass(cls):
nodelist = NodeList()
nodelist.update_nodes(steem_instance=Steem(node=nodelist.get_nodes(normal=True, appbase=True), num_retries=10))
cls.bts = Steem(
node=nodelist.get_nodes(appbase=False),
node=nodelist.get_nodes(),
nobroadcast=True,
timeout=30,
num_retries=10,
keys={"active": wif},
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
set_shared_steem_instance(cls.bts)
cls.bts.set_default_account("test")

b = Blockchain(steem_instance=cls.bts)
num = b.get_current_block_num()
# b = Blockchain(steem_instance=cls.bts)
# num = b.get_current_block_num()
num = 23346630
cls.start = num - 100
cls.stop = num
cls.N_transfer = 121
cls.N_vote = 2825

def test_stream_threading(self):
bts = self.bts
Expand All @@ -48,10 +48,11 @@ def test_stream_threading(self):

for op in b.stream(opNames=opNames, start=self.start, stop=self.stop, threading=True, thread_num=8):
ops_stream.append(op)
self.assertEqual(self.N_transfer + self.N_vote, len(ops_stream))

# op_stat = b.ops_statistics(start=self.start, stop=self.stop)
# self.assertEqual(op_stat["vote"] + op_stat["transfer"], len(ops_stream))

self.assertTrue(len(ops_stream) > 0)
op_stat = b.ops_statistics(start=self.start, stop=self.stop)
self.assertEqual(op_stat["vote"] + op_stat["transfer"], len(ops_stream))
ops_blocks = []
last_id = self.start - 1
for op in b.blocks(start=self.start, stop=self.stop, threading=True, thread_num=8):
Expand All @@ -61,11 +62,10 @@ def test_stream_threading(self):
op_stat4 = {"transfer": 0, "vote": 0}
self.assertTrue(len(ops_blocks) > 0)
for block in ops_blocks:
for tran in block["transactions"]:
for op in tran['operations']:
if op[0] in opNames:
op_stat4[op[0]] += 1
for op in block.operations:
if op[0] in opNames:
op_stat4[op[0]] += 1
self.assertTrue(block.identifier >= self.start)
self.assertTrue(block.identifier <= self.stop)
self.assertEqual(op_stat["transfer"], op_stat4["transfer"])
self.assertEqual(op_stat["vote"], op_stat4["vote"])
self.assertEqual(self.N_transfer, op_stat4["transfer"])
self.assertEqual(self.N_vote, op_stat4["vote"])

0 comments on commit 3c0f1cc

Please sign in to comment.