Skip to content

Commit b27c8b9

Browse files
authored
Merge pull request #238 from mayanayza/feat/open-ports-service
feat: unclaimed ports + additional generic services
2 parents 76e7a36 + ca14b42 commit b27c8b9

File tree

28 files changed

+668
-127
lines changed

28 files changed

+668
-127
lines changed

backend/src/bin/daemon.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,16 @@ use tower_http::{
1919
};
2020
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
2121

22-
#[tokio::main]
23-
async fn main() -> anyhow::Result<()> {
22+
fn main() -> anyhow::Result<()> {
23+
let runtime = tokio::runtime::Builder::new_multi_thread()
24+
.thread_stack_size(4 * 1024 * 1024) // 4MB stack for deep async scanning
25+
.enable_all()
26+
.build()?;
27+
28+
runtime.block_on(async_main())
29+
}
30+
31+
async fn async_main() -> anyhow::Result<()> {
2432
// Parse CLI and load config
2533
let cli = DaemonCli::parse();
2634
let config = AppConfig::load(cli)?;

backend/src/daemon/discovery/service/base.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
discovery::r#impl::types::{DiscoveryType, HostNamingFallback},
1616
groups::r#impl::base::Group,
1717
services::{
18-
definitions::docker_container::DockerContainer,
18+
definitions::{docker_container::DockerContainer, open_ports::OpenPorts},
1919
r#impl::{
2020
base::{
2121
DiscoverySessionServiceMatchParams, ServiceMatchBaselineParams,
@@ -462,15 +462,16 @@ pub trait DiscoversNetworkedEntities:
462462
sorted_service_definitions.sort_by_key(|s| {
463463
if !ServiceDefinitionExt::is_generic(s) {
464464
0 // Highest priority - non-generic services
465-
} else if ServiceDefinitionExt::is_generic(s)
466-
&& s.id() != DockerContainer.id()
467-
&& s.id() != Gateway.id()
468-
{
469-
1 // Generic services that aren't Docker Container or Gateway
470-
} else {
471-
// Docker Containers and Gateways need to go last
465+
} else if s.id() == OpenPorts.id() {
466+
// Catch-all for open ports, should be dead last
467+
3
468+
} else if s.id() == DockerContainer.id() || s.id() == Gateway.id() {
469+
// Docker Containers and Gateways need to go second to last last
472470
// Other generic services should be able to get matched first
473471
2
472+
} else {
473+
// Generic services that aren't Docker Container or Gateway
474+
1
474475
}
475476
});
476477

backend/src/daemon/discovery/service/network.rs

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::daemon::discovery::service::base::{
33
};
44
use crate::daemon::discovery::types::base::{DiscoveryCriticalError, DiscoverySessionUpdate};
55
use crate::daemon::utils::scanner::{
6-
arp_scan_host, scan_endpoints, scan_tcp_ports, scan_udp_ports,
6+
arp_scan_host, can_arp_scan, scan_endpoints, scan_tcp_ports, scan_udp_ports,
77
};
88
use crate::server::discovery::r#impl::types::{DiscoveryType, HostNamingFallback};
99
use crate::server::hosts::r#impl::{
@@ -195,14 +195,21 @@ impl DiscoveryRunner<NetworkScanDiscovery> {
195195
.map(|p| p.number())
196196
.collect();
197197

198-
// Partition IPs by whether their subnet is interfaced
199-
let (interfaced_ips, non_interfaced_ips): (Vec<_>, Vec<_>) =
198+
// Check ARP capability once before partitioning
199+
let arp_available = can_arp_scan();
200+
201+
// Partition IPs - only use ARP path if we have capability
202+
let (interfaced_ips, non_interfaced_ips): (Vec<_>, Vec<_>) = if arp_available {
200203
all_ips_with_subnets.into_iter().partition(|(_, subnet)| {
201204
subnet_cidr_to_mac
202205
.get(&subnet.base.cidr)
203206
.and_then(|m| *m)
204207
.is_some()
205-
});
208+
})
209+
} else {
210+
// No ARP capability - treat all as non-interfaced (port scan only)
211+
(Vec::new(), all_ips_with_subnets)
212+
};
206213

207214
// =============================================================
208215
// PHASE 1: Responsiveness check (0-50%)
@@ -374,45 +381,46 @@ impl DiscoveryRunner<NetworkScanDiscovery> {
374381

375382
let phase2_batches_done = Arc::new(AtomicUsize::new(0));
376383

384+
// In scan_and_process_hosts, box the deep scan futures
377385
let results = stream::iter(responsive_hosts)
378-
.map(|(ip, subnet, mac, phase1_ports)| {
379-
let cancel = cancel.clone();
380-
let gateway_ips = gateway_ips.clone();
381-
let phase2_batches_done = phase2_batches_done.clone();
382-
383-
async move {
384-
let result = self
385-
.deep_scan_host(DeepScanParams{
386-
ip,
387-
subnet: &subnet,
388-
mac,
389-
phase1_ports,
390-
cancel,
391-
port_scan_batch_size: ports_per_host_batch,
392-
gateway_ips: &gateway_ips,
393-
batches_done: &phase2_batches_done,
394-
total_batches,
395-
})
396-
.await;
397-
398-
match result {
399-
Ok(Some(host)) => Some(host),
400-
Ok(None) => None,
401-
Err(e) => {
402-
if DiscoveryCriticalError::is_critical_error(e.to_string()) {
403-
tracing::error!(ip = %ip, error = %e, "Critical error in deep scan");
404-
} else {
405-
tracing::warn!(ip = %ip, error = %e, "Deep scan failed");
406-
}
407-
None
386+
.map(|(ip, subnet, mac, phase1_ports)| {
387+
let cancel = cancel.clone();
388+
let gateway_ips = gateway_ips.clone();
389+
let batches_done = phase2_batches_done.clone();
390+
391+
Box::pin(async move {
392+
let result = self
393+
.deep_scan_host(DeepScanParams {
394+
ip,
395+
subnet: &subnet,
396+
mac,
397+
phase1_ports,
398+
cancel,
399+
port_scan_batch_size: ports_per_host_batch,
400+
gateway_ips: &gateway_ips,
401+
batches_done: &batches_done,
402+
total_batches,
403+
})
404+
.await;
405+
406+
match result {
407+
Ok(Some(host)) => Some(host),
408+
Ok(None) => None,
409+
Err(e) => {
410+
if DiscoveryCriticalError::is_critical_error(e.to_string()) {
411+
tracing::error!(ip = %ip, error = %e, "Critical error in deep scan");
412+
} else {
413+
tracing::warn!(ip = %ip, error = %e, "Deep scan failed");
408414
}
415+
None
409416
}
410417
}
411418
})
412-
.buffer_unordered(deep_scan_concurrency)
413-
.filter_map(|x| async { x })
414-
.collect::<Vec<Host>>()
415-
.await;
419+
})
420+
.buffer_unordered(deep_scan_concurrency)
421+
.filter_map(|x| async { x })
422+
.collect::<Vec<Host>>()
423+
.await;
416424

417425
self.report_discovery_update(DiscoverySessionUpdate::scanning(100))
418426
.await?;
@@ -513,7 +521,7 @@ impl DiscoveryRunner<NetworkScanDiscovery> {
513521
all_tcp_ports.extend(open_ports);
514522

515523
let done = batches_done.fetch_add(1, Ordering::Relaxed) + 1;
516-
let pct = (50 + done * 50 / total_batches.max(1)) as u8;
524+
let pct = (50 + done * 40 / total_batches.max(1)) as u8; // 50-90%
517525
let _ = self.report_scanning_progress(pct).await;
518526
}
519527

@@ -539,6 +547,9 @@ impl DiscoveryRunner<NetworkScanDiscovery> {
539547
.await?;
540548
open_ports.extend(udp_ports);
541549

550+
self.report_discovery_update(DiscoverySessionUpdate::scanning(95))
551+
.await?;
552+
542553
let mut ports_to_check = open_ports.clone();
543554
let endpoint_only_ports = Service::endpoint_only_ports();
544555
ports_to_check.extend(endpoint_only_ports);
@@ -554,6 +565,9 @@ impl DiscoveryRunner<NetworkScanDiscovery> {
554565
)
555566
.await?;
556567

568+
self.report_discovery_update(DiscoverySessionUpdate::scanning(98))
569+
.await?;
570+
557571
for endpoint_response in &endpoint_responses {
558572
let port = endpoint_response.endpoint.port_base;
559573
if !open_ports.contains(&port) {

backend/src/daemon/utils/scanner.rs

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use rsntp::AsyncSntpClient;
2121
use snmp2::{AsyncSession, Oid};
2222
use std::collections::HashMap;
2323
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
24+
use std::pin::Pin;
2425
use std::time::Duration;
2526
use tokio::net::UdpSocket;
2627
use tokio::{net::TcpStream, time::timeout};
@@ -57,22 +58,22 @@ where
5758
{
5859
let mut results = Vec::new();
5960
let mut item_iter = items.into_iter();
60-
let mut futures = FuturesUnordered::new();
6161

62-
// Fill initial batch
62+
let mut futures: FuturesUnordered<Pin<Box<dyn Future<Output = Option<O>> + Send>>> =
63+
FuturesUnordered::new();
64+
6365
for _ in 0..batch_size {
6466
if cancel.is_cancelled() {
6567
break;
6668
}
6769

6870
if let Some(item) = item_iter.next() {
69-
futures.push(scan_fn(item));
71+
futures.push(Box::pin(scan_fn(item)));
7072
} else {
7173
break;
7274
}
7375
}
7476

75-
// Process results and maintain constant parallelism
7677
while let Some(result) = futures.next().await {
7778
if cancel.is_cancelled() {
7879
break;
@@ -82,11 +83,9 @@ where
8283
results.push(output);
8384
}
8485

85-
// Immediately add next item(s) to maintain batch size
86-
// Keep adding until we're back at batch_size or out of items
8786
while futures.len() < batch_size && !cancel.is_cancelled() {
8887
if let Some(item) = item_iter.next() {
89-
futures.push(scan_fn(item));
88+
futures.push(Box::pin(scan_fn(item)));
9089
} else {
9190
break;
9291
}
@@ -96,6 +95,52 @@ where
9695
results
9796
}
9897

98+
/// Check if ARP scanning is available (requires elevated privileges on some OSes)
99+
pub fn can_arp_scan() -> bool {
100+
// Try to open a datalink channel on any suitable interface
101+
let interfaces = datalink::interfaces();
102+
103+
let suitable_interface = interfaces
104+
.into_iter()
105+
.find(|iface| iface.is_up() && !iface.is_loopback() && iface.mac.is_some());
106+
107+
let Some(interface) = suitable_interface else {
108+
tracing::debug!("No suitable interface found for ARP capability check");
109+
return false;
110+
};
111+
112+
let config = pnet::datalink::Config {
113+
read_timeout: Some(Duration::from_millis(100)),
114+
..Default::default()
115+
};
116+
117+
match datalink::channel(&interface, config) {
118+
Ok(_) => {
119+
tracing::debug!(interface = %interface.name, "ARP scanning available");
120+
true
121+
}
122+
Err(e) => {
123+
let err_str = e.to_string().to_lowercase();
124+
if err_str.contains("permission")
125+
|| err_str.contains("operation not permitted")
126+
|| err_str.contains("access denied")
127+
|| err_str.contains("requires root")
128+
{
129+
tracing::info!(
130+
error = %e,
131+
"ARP scanning unavailable (insufficient privileges), falling back to port scanning"
132+
);
133+
} else {
134+
tracing::warn!(
135+
error = %e,
136+
"ARP scanning unavailable, falling back to port scanning"
137+
);
138+
}
139+
false
140+
}
141+
}
142+
}
143+
99144
/// Send ARP request to a single IP and wait for response
100145
/// Returns the MAC address if the host responds, None otherwise
101146
pub async fn arp_scan_host(

backend/src/server/daemons/service.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,31 @@ impl DaemonService {
122122
path: "/api/discovery/initiate".to_string(),
123123
};
124124

125+
tracing::info!(
126+
daemon_id = %daemon_id,
127+
endpoint = %endpoint,
128+
session_id = %request.session_id,
129+
"Attempting to send discovery request to daemon"
130+
);
131+
125132
let response = self
126133
.client
127134
.post(format!("{}", endpoint))
128135
.json(&request)
129136
.send()
130-
.await?;
137+
.await
138+
.map_err(|e| {
139+
tracing::error!(
140+
daemon_id = %daemon_id,
141+
endpoint = %endpoint,
142+
error = %e,
143+
error_debug = ?e,
144+
is_connect = %e.is_connect(),
145+
is_timeout = %e.is_timeout(),
146+
"Failed to connect to daemon"
147+
);
148+
e
149+
})?;
131150

132151
if !response.status().is_success() {
133152
anyhow::bail!(
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use crate::server::hosts::r#impl::ports::PortBase;
2+
use crate::server::services::definitions::{ServiceDefinitionFactory, create_service};
3+
use crate::server::services::r#impl::categories::ServiceCategory;
4+
use crate::server::services::r#impl::definitions::ServiceDefinition;
5+
use crate::server::services::r#impl::patterns::Pattern;
6+
7+
#[derive(Default, Clone, Eq, PartialEq, Hash)]
8+
pub struct Amqp;
9+
10+
impl ServiceDefinition for Amqp {
11+
fn name(&self) -> &'static str {
12+
"AMQP"
13+
}
14+
fn description(&self) -> &'static str {
15+
"Advanced Message Queuing Protocol"
16+
}
17+
fn category(&self) -> ServiceCategory {
18+
ServiceCategory::MessageQueue
19+
}
20+
fn discovery_pattern(&self) -> Pattern<'_> {
21+
Pattern::AnyOf(vec![
22+
Pattern::Port(PortBase::AMQP),
23+
Pattern::Port(PortBase::AMQPTls),
24+
])
25+
}
26+
fn is_generic(&self) -> bool {
27+
true
28+
}
29+
}
30+
31+
inventory::submit!(ServiceDefinitionFactory::new(create_service::<Amqp>));

backend/src/server/services/definitions/docker_container.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ impl ServiceDefinition for DockerContainer {
4646
_ => true,
4747
})
4848
},
49+
|_| Vec::new(),
4950
"No other services with this container's ID have been matched",
5051
"A service with this container's ID has already been matched",
5152
MatchConfidence::Low,

0 commit comments

Comments
 (0)