Skip to content

Commit

Permalink
fix: 修改格式
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Jul 28, 2023
1 parent 989daae commit 6426f35
Show file tree
Hide file tree
Showing 29 changed files with 34 additions and 44 deletions.
Binary file added .DS_Store
Binary file not shown.
4 changes: 1 addition & 3 deletions concurrent-queue-mode/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@ func (queue *queue) Pop() (interface{}, error) {
return <-queue.datas, nil
}


func (queue *queue) IsEmpty() bool {
return queue.size == 0
}


func (queue *queue) IsFull() bool {
return queue.size >= queue.maxSize
}
Expand All @@ -62,4 +60,4 @@ func (queue *queue) SetCap(maxSize uint32) {

func (queue *queue) InitQueue() {
queue.datas = make(chan interface{}, queue.maxSize)
}
}
2 changes: 1 addition & 1 deletion concurrent-queue-mode/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func TestConcurrentQueue(t *testing.T) {
}(i)
}

time.Sleep(time.Second*3)
time.Sleep(time.Second * 3)
}
1 change: 0 additions & 1 deletion cron-task-mode/cronfunc/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,4 @@ func CronTry3() {
log.Println("test111")
}, 2*time.Second)


}
2 changes: 1 addition & 1 deletion event-processor-mode/event_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestEventProcessor(t *testing.T) {
defer wg.Done()
event := Event{
Type: Added,
Obj: i,
Obj: i,
}
if i%3 == 0 {
event.Type = Modified
Expand Down
2 changes: 1 addition & 1 deletion forever-mode/forever_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ func TestRunWithTimeWithContext(t *testing.T) {
fmt.Println("test-with-time-context")
return nil
}, ctx, 10)
}
}
1 change: 0 additions & 1 deletion future-mode/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func RequestFuture(url string) <-chan []byte {
return c
}


// RequestFutureV2 支持返回error结果
func RequestFutureV2(url string) (<-chan []byte, <-chan error) {
c := make(chan []byte, 1)
Expand Down
3 changes: 0 additions & 3 deletions future-mode/future1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"testing"
)



func TestFuture(test *testing.T) {
future := RequestFuture("https://api.github.com/users/octocat/orgs")
// 中间可以实现自己的业务逻辑。。。。。
Expand All @@ -18,7 +16,6 @@ func TestFuture(test *testing.T) {
log.Printf("reponse length: %d", len(body))
}


func TestFutureWithError(test *testing.T) {

res, errC := RequestFutureV2("https://api.github.com/users/octocat/orgs")
Expand Down
Binary file added kubelet-podworker-mode/.DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion kubelet-podworker-mode/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ func NewKubeletCommand() *cobra.Command {
},
}
return cmd
}
}
2 changes: 1 addition & 1 deletion kubelet-podworker-mode/kubelet/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,4 @@ func (c *Container) RemoveImage(s string) error {
klog.Infof("remove image %s for container %s", s, c.Name)
c.Status = ImageRemoving
return nil
}
}
6 changes: 3 additions & 3 deletions kubelet-podworker-mode/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Kubelet struct {
// node 节点名,记录本kubelet运行的节点
node string
// podWorkers 工作队列,负责对目前kubelet的pod进行生命周期管理
podWorkers *PodWorkers
podWorkers PodWorkers
// podManager pod记录管理器,负责记录目前kubelet运行的所有pod
podManager Manager

Expand Down Expand Up @@ -106,7 +106,7 @@ var _ SyncHandler = &Kubelet{}
// Run 执行
func (k *Kubelet) Run(updates <-chan PodUpdate) {
// 循环事件
k.syncLoop(context.Background(), updates, k)
go k.syncLoop(context.Background(), updates, k)
}

func (k *Kubelet) syncLoop(ctx context.Context, updates <-chan PodUpdate, handler SyncHandler) {
Expand Down Expand Up @@ -161,4 +161,4 @@ func (k *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan PodUpda
}
return true

}
}
14 changes: 6 additions & 8 deletions kubelet-podworker-mode/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestKubelet(t *testing.T) {
k.podWorkers = newPodWorkers(k.syncPod, k.syncTerminatedPod, k.syncTerminatingPod)
e := make(chan PodUpdate, 10)
// 运行kubelet
go k.Run(e)
k.Run(e)

pods := make([]*Pod, 0)

Expand Down Expand Up @@ -67,26 +67,24 @@ func TestKubelet(t *testing.T) {
}
e <- aa


//aa := PodUpdate{
// Op: ADD,
// Pods: podsa,
//}
//
aaa := PodUpdate{
Op: DELETE,
Pods: pods,
Op: DELETE,
Pods: pods,
}

aaaa := PodUpdate{
Op: UPDATE,
Pods: podsa,
Op: UPDATE,
Pods: podsa,
}


e <- aaa
e <- aaaa
//select {}
time.Sleep(time.Second * 10)

}
}
1 change: 1 addition & 0 deletions kubelet-podworker-mode/kubelet/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Manager interface {
DeletePod(pod *Pod)
}

// basicManager pod的本地缓存
type basicManager struct {
lock sync.RWMutex
// podByFullName 用来存储pod对象
Expand Down
2 changes: 1 addition & 1 deletion kubelet-podworker-mode/kubelet/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ type PodUpdate struct {
type UpdatePodOptions struct {
UpdateType SyncPodType
Pod *Pod
}
}
21 changes: 14 additions & 7 deletions kubelet-podworker-mode/kubelet/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ import (
"sync"
)

// PodWorkers 负责所有pod的生命周期
type PodWorkers struct {
// PodWorkers kubelet中主要干活的组件
type PodWorkers interface {
// 所有
UpdatePod(options UpdatePodOptions)
}

// podWorkers 负责所有pod的生命周期
type podWorkers struct {
// 管理每个pod的map
podUpdates map[string]chan PodWork
podLock sync.Mutex
Expand All @@ -20,8 +26,8 @@ type PodWorkers struct {
syncTerminatedPodFn syncTerminatedPodFnType
}

func newPodWorkers(syncPodFn syncPodFnType, syncTerminatingPodFn syncTerminatingPodFnType, syncTerminatedPodFn syncTerminatedPodFnType) *PodWorkers {
return &PodWorkers{
func newPodWorkers(syncPodFn syncPodFnType, syncTerminatingPodFn syncTerminatingPodFnType, syncTerminatedPodFn syncTerminatedPodFnType) *podWorkers {
return &podWorkers{
podUpdates: make(map[string]chan PodWork),
syncPodFn: syncPodFn,
syncTerminatingPodFn: syncTerminatingPodFn,
Expand Down Expand Up @@ -155,13 +161,14 @@ const (
)

// PodWork podWorker管理的对象,也就是pod外面再包装一层
// 记录本次的pod事件类型与pod对象本身
type PodWork struct {
WorkType PodWorkType
Pod *Pod
}

// UpdatePod 管理pod的主要逻辑
func (p *PodWorkers) UpdatePod(options UpdatePodOptions) {
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
pod := options.Pod
p.podLock.Lock()
defer p.podLock.Unlock()
Expand Down Expand Up @@ -204,7 +211,7 @@ func (p *PodWorkers) UpdatePod(options UpdatePodOptions) {

}

func (p *PodWorkers) managePodLoop(podUpdates <-chan PodWork) {
func (p *podWorkers) managePodLoop(podUpdates <-chan PodWork) {

// 不断从chan中取出pod
for update := range podUpdates {
Expand Down Expand Up @@ -241,4 +248,4 @@ func (p *PodWorkers) managePodLoop(podUpdates <-chan PodWork) {

}

}
}
1 change: 0 additions & 1 deletion pipeline-mode/pipeline_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,3 @@ func AnalyzeTask(resultTaskC <-chan *Task) {
fmt.Println(res.Result)
}
}

1 change: 0 additions & 1 deletion pipeline-mode/pipeline_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"
)


func TestTaskPipeline(t *testing.T) {
task1 := NewTask("task1")
task2 := NewTask("task2")
Expand Down
1 change: 0 additions & 1 deletion pipeline-mode/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
)


func TestPipeline(t *testing.T) {
// 流水线模式
in := producer(1, 2, 3, 4)
Expand Down
Binary file added scheduler-mode/.DS_Store
Binary file not shown.
1 change: 0 additions & 1 deletion scheduler-mode/sample-scheduler/sample_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,3 @@ func (s *scheduler) stop() {
}()

}

2 changes: 1 addition & 1 deletion singleflight-mode/singleflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ func TestExclusiveCallDoExDupSuppress(t *testing.T) {
if got := atomic.LoadInt32(&freshes); got != 1 {
t.Errorf("freshes = %d; want 1", got)
}
}
}
Binary file added task-job-mode/.DS_Store
Binary file not shown.
1 change: 0 additions & 1 deletion timeout-mode/timeout_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"
)


// RetryTimeout 重试超时模式
func RetryTimeout(ctx context.Context, retryInterval time.Duration, execute func(ctx context.Context) error) {
for {
Expand Down
2 changes: 0 additions & 2 deletions worker-pool-mode/pool_limit_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"time"
)



func TestLimitWaitGroup(test *testing.T) {

urls := []string{
Expand Down
1 change: 0 additions & 1 deletion worker-pool-mode/pool_mode1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"
)


func TestTaskPool(t *testing.T) {
p := NewPool(2)
p.Start() // 启动任务
Expand Down
1 change: 0 additions & 1 deletion worker-pool-mode/pool_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"
)


func TestObjPool(t *testing.T) {

pool := NewObjPool(5, false)
Expand Down
2 changes: 1 addition & 1 deletion workqueue-mode/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,4 @@ func (q *queue) IsShutDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.close
}
}
2 changes: 1 addition & 1 deletion workqueue-mode/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ func TestAddWhileProcessing(t *testing.T) {
producerWG.Wait()
q.ShutDown()
consumerWG.Wait()
}
}

0 comments on commit 6426f35

Please sign in to comment.