Skip to content

Commit

Permalink
Update worker
Browse files Browse the repository at this point in the history
  • Loading branch information
wyhaya committed Sep 9, 2022
1 parent 84658e8 commit 0adacc2
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 84 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lok"
version = "0.2.1"
version = "0.2.2"
edition = "2021"
authors = ["wyhaya <wyhaya@gmail.com>"]
description = "Count the number of codes"
Expand All @@ -24,6 +24,6 @@ strip = "symbols"
[dependencies]
bright = "0.4.1"
clap = "2.34.0"
crossbeam-deque = "0.8.1"
crossbeam-deque = "0.8.2"
glob = "0.3.0"
walkdir = "2.3.2"
5 changes: 3 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::CONFIG;
use clap::{crate_name, crate_version, value_t_or_exit, App, AppSettings, Arg, SubCommand};
use glob::Pattern;
use std::path::PathBuf;
use std::str::FromStr;

pub struct Args {
pub work_dir: PathBuf,
Expand All @@ -25,7 +26,7 @@ pub enum Sort {
Size,
}

impl std::str::FromStr for Sort {
impl FromStr for Sort {
type Err = ();
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
Expand Down Expand Up @@ -147,7 +148,7 @@ fn print_language_list() {

for language in CONFIG.languages() {
let ext = language
.extension
.extensions
.iter()
.map(|e| format!(".{}", e))
.collect::<Vec<String>>()
Expand Down
10 changes: 4 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub struct Config(&'static [Language]);
#[derive(Debug)]
pub struct Language {
pub name: &'static str,
pub extension: &'static [&'static str],
pub extensions: &'static [&'static str],
pub single: &'static [&'static str],
pub multi: &'static [(&'static str, &'static str)],
}
Expand All @@ -13,7 +13,7 @@ macro_rules! language {
($name: expr, $ext: expr, $single: expr, $multi: expr) => {
Language {
name: $name,
extension: $ext,
extensions: $ext,
single: $single,
multi: $multi,
}
Expand All @@ -28,10 +28,8 @@ impl Config {
// Get language configuration by extension
pub fn get(&self, extension: &str) -> Option<&Language> {
for item in self.0 {
for ext in item.extension {
if *ext == extension {
return Some(item);
}
if item.extensions.contains(&extension) {
return Some(item);
}
}
None
Expand Down
91 changes: 20 additions & 71 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ mod config;
mod output;
mod parse;
mod util;
mod workder;

use cli::{Args, Sort};
use config::{Language, CONFIG};
use crossbeam_deque::{Stealer, Worker};
use config::CONFIG;
use output::Output;
use parse::{parser, Data, Value};
use std::path::PathBuf;
use util::num_cpus;
use walkdir::WalkDir;
use workder::Tasks;

fn main() {
let Args {
Expand All @@ -24,22 +23,6 @@ fn main() {
extension,
} = cli::parse();

let worker = Worker::new_fifo();
let cpus = num_cpus();
let mut threads = Vec::with_capacity(cpus);

// Created thread
for _ in 0..cpus {
let stealer = worker.stealer().clone();
threads.push(std::thread::spawn(move || {
let task = Task {
stealer,
print_error,
};
task.start()
}));
}

let files = WalkDir::new(work_dir).into_iter().filter_map(|item| {
let entry = match item {
Ok(entry) => entry,
Expand Down Expand Up @@ -94,23 +77,30 @@ fn main() {
.map(|config| (entry.path().to_path_buf(), config))
});

for (path, config) in files {
worker.push(Work::Parse(path, config));
}
let tasks = Tasks::new();

for _ in 0..cpus {
worker.push(Work::Quit);
for (path, config) in files {
tasks.push(move || {
match parser(path, config) {
Value::Ok(data) => return Some(data),
Value::Err(kind, p) => {
if print_error {
print_error!(kind, p)
}
}
Value::Invalid => {}
};
None
});
}

// Summary of all data
let mut total = Vec::new();

for thread in threads {
let task_data = thread.join().unwrap_or_else(|err| {
for rst in tasks.result() {
let datas = rst.unwrap_or_else(|err| {
exit!("Thread exits abnormally\n{:#?}", err);
});

for data in task_data {
for data in datas {
let find = total
.iter_mut()
.find(|item: &&mut Detail| item.language == data.language);
Expand Down Expand Up @@ -166,44 +156,3 @@ impl Detail {
self.file += 1;
}
}

enum Work<'a> {
Parse(PathBuf, &'a Language),
Quit,
}

struct Task<'a> {
stealer: Stealer<Work<'a>>,
print_error: bool,
}

impl<'a> Task<'a> {
fn start(self) -> Vec<Data> {
let mut result = Vec::new();

loop {
// Receive message
let work = match self.stealer.steal().success() {
Some(work) => work,
None => continue,
};

match work {
Work::Parse(path, config) => {
match parser(path, config) {
Value::Ok(data) => result.push(data),
Value::Err(kind, p) => {
if self.print_error {
print_error!(kind, p)
}
}
Value::Invalid => continue,
};
}
Work::Quit => break,
}
}

result
}
}
68 changes: 68 additions & 0 deletions src/workder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::util::num_cpus;
use crossbeam_deque::{Stealer, Worker};
use std::any::Any;
use std::thread::{self, JoinHandle};

#[derive(Debug)]
pub struct Tasks<T, R> {
worker: Worker<Work<T>>,
threads: Vec<JoinHandle<Vec<R>>>,
}

#[derive(Debug)]
enum Work<T> {
Exit,
Task(T),
}

impl<T, R> Tasks<T, R>
where
T: FnOnce() -> Option<R>,
T: Send + 'static,
R: Send + 'static,
{
pub fn new() -> Self {
let cpus = num_cpus();
let worker = Worker::new_fifo();
let mut threads = Vec::with_capacity(cpus);

for _ in 0..cpus {
let stealer: Stealer<Work<T>> = worker.stealer().clone();
threads.push(thread::spawn(move || {
let mut result = Vec::new();
loop {
let work = match stealer.steal().success() {
Some(work) => work,
None => continue,
};

match work {
Work::Task(task) => match task() {
Some(data) => result.push(data),
None => continue,
},
Work::Exit => break,
}
}
result
}));
}

Self { worker, threads }
}

pub fn push(&self, f: T) {
self.worker.push(Work::Task(f));
}

pub fn result(self) -> Vec<Result<Vec<R>, Box<dyn Any + Send + 'static>>> {
let mut rst = Vec::with_capacity(self.threads.len());
for _ in 0..self.threads.len() {
self.worker.push(Work::Exit);
}
for thread in self.threads {
rst.push(thread.join());
}
rst
}
}

0 comments on commit 0adacc2

Please sign in to comment.