Skip to content

Commit

Permalink
Merge pull request #83 from leshchenko1979:leshchenko1979/issue82
Browse files Browse the repository at this point in the history
Исключение после нескольких запусков asyncio.run()
  • Loading branch information
leshchenko1979 authored Nov 12, 2020
2 parents 174283d + 06fe347 commit 52fb7ad
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
47 changes: 26 additions & 21 deletions fast_bitrix24/srh.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, webhook, verbose):
self._stopped_value = None
self.requests_per_second = BITRIX_RPS
self._pool_size = BITRIX_POOL_SIZE

self.session = None
self.tasks = []

Expand All @@ -49,34 +49,39 @@ def _standardize_webhook(self, webhook):

if webhook[-1] != '/':
webhook += '/'

return webhook


def run(self, coroutine):

async def async_wrapper(coroutine):
async with self:
result = await coroutine

if not _SLOW:
self.release_sem_task.cancel()

return result

loop = asyncio.get_event_loop()

try:
loop = asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()

result = loop.run_until_complete(async_wrapper(coroutine))

return result


def add_request_task(self, method, params):
self.tasks.append(asyncio.ensure_future(self._single_request(method, params)))


def get_server_serponses(self):
global _SLOW

tasks_to_process = len(self.tasks)

if not _SLOW:
Expand All @@ -91,7 +96,7 @@ def get_server_serponses(self):
else:
yield task
tasks_to_process -= 1


async def __aenter__(self):
global _SLOW
Expand Down Expand Up @@ -132,7 +137,7 @@ async def __aenter__(self):

async def __aexit__(self, a1, a2, a3):
self._stopped_time = time.monotonic()

if _SLOW:
# в slow-режиме обнуляем пул запросов, чтобы после выхода
# не выдать на сервер пачку запросов и не словить отказ
Expand Down Expand Up @@ -177,34 +182,34 @@ async def _acquire(self):
async with self._slow_lock:
# потом ждать основное время "остывания"
await asyncio.sleep(1 / _SLOW_RPS)
return True
return True
else:
return await self._sem.acquire()


async def _single_request(self, method, params=None):
await self._acquire()
async with self.session.post(url = self.webhook + method,
async with self.session.post(url = self.webhook + method,
json = params) as response:
r = await response.json(encoding='utf-8')
return ServerResponse(r)


def get_pbar(self, real_len, real_start):

class MutePBar():

def update(self, i):
pass

def close(self):
pass

if self._verbose:
return tqdm(total = real_len, initial = real_start)
else:
return MutePBar()


##########################################
#
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setuptools.setup(
name="fast_bitrix24",
version="0.4.6",
version="0.4.7",
author="Alexey Leshchenko",
author_email="leshchenko@gmail.com",
description="API wrapper для быстрого получения данных от Битрикс24 через REST API. Параллельные запросы к серверу, упаковка запросов в батчи, контроль скорости запросов.",
Expand Down

0 comments on commit 52fb7ad

Please sign in to comment.