Skip to content

Commit

Permalink
fix: 更新项目格式
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Jul 25, 2023
1 parent a3a371e commit 989daae
Show file tree
Hide file tree
Showing 16 changed files with 553 additions and 509 deletions.
6 changes: 3 additions & 3 deletions forever-mode/forever_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ func TestRunWithTimeForever(t *testing.T) {
}, 3)
}()

<-time.After(time.Second * 20)
<-time.After(time.Second * 10)

}

func TestRunWithTimeWithChannel(t *testing.T) {
stopC := make(chan struct{})

go func() {
<-time.After(time.Second * 20)
<-time.After(time.Second * 10)
close(stopC)
}()

Expand All @@ -35,7 +35,7 @@ func TestRunWithTimeWithChannel(t *testing.T) {
}

func TestRunWithTimeWithContext(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer func() {
cancel()
}()
Expand Down
52 changes: 52 additions & 0 deletions future-mode/future.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package future_mode

import (
"io/ioutil"
"net/http"
)

// RequestFuture 启动一个goroutine,请求http,并把结果放入chan中
func RequestFuture(url string) <-chan []byte {
c := make(chan []byte, 1)
go func() {
var body []byte
defer func() {
c <- body
}()

res, err := http.Get(url)
if err != nil {
return
}
defer res.Body.Close()

body, _ = ioutil.ReadAll(res.Body)
}()

return c
}


// RequestFutureV2 支持返回error结果
func RequestFutureV2(url string) (<-chan []byte, <-chan error) {
c := make(chan []byte, 1)
errC := make(chan error, 1)
go func() {
var body []byte
defer func() {
c <- body
}()

res, err := http.Get(url)
if err != nil {
errC <- err
return
}

defer res.Body.Close()

body, _ = ioutil.ReadAll(res.Body)
}()

return c, errC
}
44 changes: 0 additions & 44 deletions future-mode/future1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,11 @@ package future_mode

import (
"fmt"
"io/ioutil"
"log"
"net/http"
"testing"
)

// RequestFuture 启动一个goroutine,请求http,并把结果放入chan中
func RequestFuture(url string) <-chan []byte {
c := make(chan []byte, 1)
go func() {
var body []byte
defer func() {
c <- body
}()

res, err := http.Get(url)
if err != nil {
return
}
defer res.Body.Close()

body, _ = ioutil.ReadAll(res.Body)
}()

return c
}

func TestFuture(test *testing.T) {
future := RequestFuture("https://api.github.com/users/octocat/orgs")
Expand All @@ -39,29 +18,6 @@ func TestFuture(test *testing.T) {
log.Printf("reponse length: %d", len(body))
}

// RequestFutureV2 支持返回error结果
func RequestFutureV2(url string) (<-chan []byte, <-chan error) {
c := make(chan []byte, 1)
errC := make(chan error, 1)
go func() {
var body []byte
defer func() {
c <- body
}()

res, err := http.Get(url)
if err != nil {
errC <- err
return
}

defer res.Body.Close()

body, _ = ioutil.ReadAll(res.Body)
}()

return c, errC
}

func TestFutureWithError(test *testing.T) {

Expand Down
38 changes: 38 additions & 0 deletions pipeline-mode/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pipeline_mode

/*
流水线的特点:
1. 每个阶段把数据通过channel传递给下一个阶段。
2. 每个阶段要创建1个goroutine和1个通道,这个goroutine向里面写数据,函数要返回这个通道。
3. 有1个函数来组织流水线,我们例子中是main函数。
*/

// producer 负责生产数据,返回一个chan,把准备好的数据放入chan中。
func producer(num ...int) <-chan int {
out := make(chan int)

// 异步启goroutine准备数据,并放入chan
go func() {
defer close(out)
for _, n := range num {
out <- n
}
}()

return out
}

// square 执行主要的业务逻辑,从准备好的chan中拿取数据,并执行业务逻辑,执行后放入chan中
func square(inputC <-chan int) <-chan int {

out := make(chan int)
// 异步启goroutine准备数据,并放入chan
go func() {
defer close(out)
for n := range inputC {
out <- n * n
}
}()
return out

}
102 changes: 102 additions & 0 deletions pipeline-mode/pipeline_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package pipeline_mode

import "fmt"

const (
UnExecute = "unexecuted"
Running = "running"
Failed = "failed"
Finished = "finished"
)

type Task struct {
Id int
Action string
Status string
Result string
}

func NewTask(action string) *Task {
task := &Task{
Id: 1,
Action: action,
Status: UnExecute,
}
return task
}

// Execute 执行任务
func (t *Task) Execute() *Task {

// 执行具体任务
err := t.handler()
if err != nil {
t.Status = Failed
return nil
}

// 完成任务
t.Status = Finished
return t
}

// handler 处理不同任务
func (t *Task) handler() error {

// task执行中
t.Status = Running

// 暂时这样写,可以调用具体方法
switch t.Action {
case "task1":
fmt.Println("execute task: ", t.Action)
t.Result = fmt.Sprintf("execute task: %v", t.Action)
case "task2":
fmt.Println("execute task: ", t.Action)
t.Result = fmt.Sprintf("execute task: %v", t.Action)
case "task3":
fmt.Println("execute task: ", t.Action)
t.Result = fmt.Sprintf("execute task: %v", t.Action)
}

return nil
}

// PrepareTask 负责准备任务,返回一个chan,把准备好的数据放入chan中。
func PrepareTask(tasks ...*Task) <-chan Task {
out := make(chan Task)

// 异步启goroutine准备数据,并放入chan
go func() {
defer close(out)
for _, task := range tasks {
// 这里可以执行task的预处理
//
out <- *task
}
}()

return out
}

// ExecuteTask 执行主要的业务逻辑
func ExecuteTask(inputC <-chan Task) <-chan *Task {

out := make(chan *Task)
// 异步启goroutine执行业务逻辑,并放入chan
go func() {
defer close(out)
for task := range inputC {
out <- task.Execute()
}
}()
return out
}

// AnalyzeTask 消费task任务的结果
func AnalyzeTask(resultTaskC <-chan *Task) {
for res := range resultTaskC {
fmt.Println(res.Result)
}
}

98 changes: 0 additions & 98 deletions pipeline-mode/pipeline_task_test.go
Original file line number Diff line number Diff line change
@@ -1,107 +1,9 @@
package pipeline_mode

import (
"fmt"
"testing"
)

const (
UnExecute = "unexecuted"
Running = "running"
Failed = "failed"
Finished = "finished"
)

type Task struct {
Id int
Action string
Status string
Result string
}

func NewTask(action string) *Task {
task := &Task{
Id: 1,
Action: action,
Status: UnExecute,
}
return task
}

// Execute 执行任务
func (t *Task) Execute() *Task {

// 执行具体任务
err := t.handler()
if err != nil {
t.Status = Failed
return nil
}

// 完成任务
t.Status = Finished
return t
}

// handler 处理不同任务
func (t *Task) handler() error {

// task执行中
t.Status = Running

// 暂时这样写,可以调用具体方法
switch t.Action {
case "task1":
fmt.Println("execute task: ", t.Action)
t.Result = fmt.Sprintf("execute task: %v", t.Action)
case "task2":
fmt.Println("execute task: ", t.Action)
t.Result = fmt.Sprintf("execute task: %v", t.Action)
case "task3":
fmt.Println("execute task: ", t.Action)
t.Result = fmt.Sprintf("execute task: %v", t.Action)
}

return nil
}

// PrepareTask 负责准备任务,返回一个chan,把准备好的数据放入chan中。
func PrepareTask(tasks ...*Task) <-chan Task {
out := make(chan Task)

// 异步启goroutine准备数据,并放入chan
go func() {
defer close(out)
for _, task := range tasks {
// 这里可以执行task的预处理
//
out <- *task
}
}()

return out
}

// ExecuteTask 执行主要的业务逻辑
func ExecuteTask(inputC <-chan Task) <-chan *Task {

out := make(chan *Task)
// 异步启goroutine执行业务逻辑,并放入chan
go func() {
defer close(out)
for task := range inputC {
out <- task.Execute()
}
}()
return out
}

// AnalyzeTask 消费task任务的结果
func AnalyzeTask(resultTaskC <-chan *Task) {
for res := range resultTaskC {
fmt.Println(res.Result)
}
}

func TestTaskPipeline(t *testing.T) {
task1 := NewTask("task1")
Expand Down
Loading

0 comments on commit 989daae

Please sign in to comment.