-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPrime Numbers Counting Algorithm
More file actions
133 lines (104 loc) · 5.14 KB
/
Prime Numbers Counting Algorithm
File metadata and controls
133 lines (104 loc) · 5.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import numpy as np
from multiprocessing import Pool
from functools import lru_cache
from typing import Set, List, Generator
import time
# Оптимизирана проверка за простота с кеширане
@lru_cache(maxsize=10000)
def is_prime(n: int) -> bool:
"""Кеширана проверка за простота."""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(np.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
def generate_primes_numpy(limit: int) -> np.ndarray:
"""Генерира прости числа използвайки numpy."""
sieve = np.ones(limit, dtype=bool)
sieve[0] = sieve[1] = False
for i in range(2, int(np.sqrt(limit)) + 1):
if sieve[i]:
sieve[i*i::i] = False
return np.nonzero(sieve)[0]
def process_prime_chunk(args: tuple) -> Set[int]:
"""Обработва част от простите числа за паралелизация."""
p_chunk, q_primes, limit = args
results = set()
for p in p_chunk:
p_squared = p * p
if p_squared >= limit:
break
q_values = q_primes[q_primes * q_primes * 4 + p_squared < limit]
results_array = p_squared + 4 * np.square(q_values)
for result in results_array:
if is_prime(int(result)):
results.add(int(result))
return results
class PrimeNumberFinder:
def __init__(self, limit: int, num_processes: int = 4):
self.limit = limit
self.num_processes = num_processes
self.primes_cache = None
def generate_primes(self) -> np.ndarray:
"""Кеширано генериране на прости числа."""
if self.primes_cache is None:
self.primes_cache = generate_primes_numpy(int(np.sqrt(self.limit)) + 1)
return self.primes_cache
def find_p2_plus_4q2_primes_parallel(self) -> Set[int]:
"""Паралелно намиране на специални прости числа."""
primes = self.generate_primes()
# Разделяме простите числа на chunks за паралелна обработка
chunk_size = max(1, len(primes) // self.num_processes)
chunks = [primes[i:i + chunk_size] for i in range(0, len(primes), chunk_size)]
# Подготвяме аргументите за паралелна обработка
args = [(chunk, primes, self.limit) for chunk in chunks]
# Паралелна обработка
with Pool(processes=self.num_processes) as pool:
results = pool.map(process_prime_chunk, args)
# Обединяваме резултатите
return set().union(*results)
def benchmark_comparison(limit: int):
"""Сравнява производителността на различни методи."""
# Тест на паралелната версия
start_time = time.time()
finder = PrimeNumberFinder(limit)
parallel_results = finder.find_p2_plus_4q2_primes_parallel()
parallel_time = time.time() - start_time
print(f"\nРезултати до {limit}:")
print(f"Брой намерени числа: {len(parallel_results)}")
print(f"Време за паралелно изпълнение: {parallel_time:.2f} секунди")
print(f"Първите 10 намерени числа: {sorted(list(parallel_results))[:10]}")
# Генератор версия с numpy оптимизация
def generate_p2_plus_4q2_primes_stream() -> Generator[int, None, None]:
"""Оптимизиран генератор за специални прости числа."""
seen = set()
primes = generate_primes_numpy(1000) # Начален набор от прости числа
for p in primes:
p_squared = p * p
q_values = primes[primes < np.sqrt(10**6 - p_squared) // 2]
results = p_squared + 4 * np.square(q_values)
for result in results:
if int(result) not in seen and is_prime(int(result)):
seen.add(int(result))
yield int(result)
# Пример за използване
if __name__ == '__main__':
# Тестване на различни граници
for limit in [1000, 10000, 100000]:
benchmark_comparison(limit)
print("\nГенериране на поток от специални прости числа:")
generator = generate_p2_plus_4q2_primes_stream()
first_10 = [next(generator) for _ in range(10)]
print(f"Първите 10 числа от генератора: {first_10}")
#########################################################
# Бързо намиране на числа до определена граница
finder = PrimeNumberFinder(10000)
results = finder.find_p2_plus_4q2_primes_parallel()
# Генериране на безкраен поток от такива числа
generator = generate_p2_plus_4q2_primes_stream()
next_number = next(generator)