diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..ed33076 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,76 @@ +name: Lint + +on: + pull_request: + push: + branches: + - main + +jobs: + check: + timeout-minutes: 1 + runs-on: ubuntu-latest + outputs: + run_job: ${{ steps.check.outputs.run_job }} + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + fetch-depth: 2 + + - name: check modified files + id: check + run: | + echo "=============== list modified files ===============" + git diff --name-only HEAD^ HEAD + + echo "========== check paths of modified files ==========" + git diff --name-only HEAD^ HEAD > files.txt + while IFS= read -r file + do + echo $file + if [[ $file == *.go ]]; then + echo "run job" + echo "::set-output name=run_job::true" + break + fi + done < files.txt + + lint: + name: Lint + needs: check + if: needs.check.outputs.run_job == 'true' + runs-on: ubuntu-latest + env: + GO_VERSION: '1.16.7' + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + fetch-depth: 2 + + - name: Setup go + id: go + uses: actions/setup-go@v2 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Cache go + uses: actions/cache@v2.1.6 + with: + path: | + ~/.cache/go-build + ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go- + + - name: Golangci lint + uses: golangci/golangci-lint-action@v2 + with: + version: v1.41.1 + skip-go-installation: true + skip-pkg-cache: true + skip-build-cache: true + only-new-issues: true + args: -v ./... diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..b024ff0 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,46 @@ +# This file contains all available configuration options +# with their default values. + +# options for analysis running +run: + # timeout for analysis, e.g. 30s, 5m, default is 1m + timeout: 5m + + # include test files or not, default is true + tests: false + +linters: + disable: + - scopelint + enable: + - errcheck + - goimports + - gofmt + - revive + - exportloopref + - prealloc + - lll + - staticcheck + - govet + - whitespace + - unconvert + - goconst + - gocritic + presets: + - bugs + - unused + +linters-settings: + lll: + line-length: 160 + revive: + ignore-generated-header: true + rules: + - name: unexported-return + disabled: true + +issues: + exclude-rules: + - linters: + - lll + source: "^//go:generate " \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..369bea2 --- /dev/null +++ b/LICENSE @@ -0,0 +1,28 @@ +Copyright (c) 2016, Actionpay +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the distribution. + + * Neither the name of the Actionpay nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md index 83a4f87..0a2ab07 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ # PostmanQ +[![license](https://img.shields.io/github/license/halfi/postmanq?style=flat-square)](https://github.com/Halfi/postmanq/blob/main/LICENSE) +[![go version](https://img.shields.io/github/go-mod/go-version/Halfi/postmanq?style=flat-square)](https://github.com/Halfi/postmanq/) +[![go version](https://goreportcard.com/badge/github.com/Halfi/postmanq?style=flat-square)](https://goreportcard.com/report/github.com/Halfi/postmanq) + +--- + PostmanQ - это высокопроизводительный почтовый сервер(MTA). На сервере под управлением Ubuntu 12.04 с 8-ми ядерным процессором и 32ГБ оперативной памяти PostmanQ рассылает более 300 писем в секунду. diff --git a/analyser/aggregate.go b/analyser/aggregate.go index 328acca..1be6191 100644 --- a/analyser/aggregate.go +++ b/analyser/aggregate.go @@ -1,6 +1,6 @@ package analyser -// автор таблиц, агрегирующий отчеты по ключу, например по коду ошибки +// KeyAggregateTableWriter автор таблиц, агрегирующий отчеты по ключу, например по коду ошибки type KeyAggregateTableWriter struct { *AbstractTableWriter } @@ -12,7 +12,7 @@ func newKeyAggregateTableWriter(fields []interface{}) TableWriter { } } -// записывает данные в таблицу +// Show записывает данные в таблицу func (t *KeyAggregateTableWriter) Show() { t.Clean() for key, ids := range t.ids { @@ -21,7 +21,7 @@ func (t *KeyAggregateTableWriter) Show() { t.Print() } -// автор таблиц, агрегирующий данные +// AggregateTableWriter автор таблиц, агрегирующий данные type AggregateTableWriter struct { *AbstractTableWriter } @@ -33,7 +33,7 @@ func newAggregateTableWriter(fields []interface{}) TableWriter { } } -// записывает данные в таблицу +// Show записывает данные в таблицу func (a *AggregateTableWriter) Show() { a.Clean() for _, row := range a.rows { diff --git a/analyser/detail.go b/analyser/detail.go index 801ac13..759b812 100644 --- a/analyser/detail.go +++ b/analyser/detail.go @@ -2,9 +2,10 @@ package analyser import ( "fmt" - "github.com/Halfi/postmanq/common" "regexp" "strings" + + "github.com/Halfi/postmanq/common" ) // автор таблиц, выводящий детализированные отчеты об ошибке diff --git a/analyser/rows.go b/analyser/rows.go index 5a18c97..9a36c63 100644 --- a/analyser/rows.go +++ b/analyser/rows.go @@ -1,33 +1,34 @@ package analyser import ( - "github.com/byorty/clitable" "regexp" "time" + + "github.com/byorty/clitable" ) -// отчет об ошибке +// Report отчет об ошибке type Report struct { - // идентификатор + // Id идентификатор Id int - // отправитель + // Envelope отправитель Envelope string - // получатель + // Recipient получатель Recipient string - // код ошибки + // Code код ошибки Code int - // сообщение об ошибке + // Message сообщение об ошибке Message string - // даты отправок + // CreatedDates даты отправок CreatedDates []time.Time } -// записывает отчет в таблицу +// Write записывает отчет в таблицу func (r Report) Write(table *clitable.Table, valueRegex *regexp.Regexp) { if valueRegex == nil || (valueRegex != nil && @@ -44,10 +45,10 @@ func (r Report) Write(table *clitable.Table, valueRegex *regexp.Regexp) { } } -// агрегированная строка +// AggregateRow агрегированная строка type AggregateRow []int -// записывает строку в таблицу +// Write записывает строку в таблицу func (a AggregateRow) Write(table *clitable.Table, valueRegex *regexp.Regexp) { table.AddRow(a[0], a[1], a[2], a[3]) } diff --git a/analyser/writer.go b/analyser/writer.go index 9770832..a410e1f 100644 --- a/analyser/writer.go +++ b/analyser/writer.go @@ -1,9 +1,10 @@ package analyser import ( - "github.com/byorty/clitable" "regexp" "sort" + + "github.com/byorty/clitable" ) // автор таблиц diff --git a/application/abstract.go b/application/abstract.go index 0b8e89c..20d5a8c 100644 --- a/application/abstract.go +++ b/application/abstract.go @@ -30,7 +30,7 @@ var ( } ) -// базовое приложение +// Abstract базовое приложение type Abstract struct { // путь до конфигурационного файла configFilename string @@ -49,7 +49,7 @@ type Abstract struct { CommonTimeout common.Timeout `yaml:"timeouts"` } -// проверяет валидность пути к файлу с настройками +// IsValidConfigFilename проверяет валидность пути к файлу с настройками func (a *Abstract) IsValidConfigFilename(filename string) bool { return len(filename) > 0 && filename != common.ExampleConfigYaml } @@ -84,12 +84,12 @@ func (a Abstract) GetConfigFilename() string { return a.configFilename } -// устанавливает путь к файлу с настройками +// SetConfigFilename устанавливает путь к файлу с настройками func (a *Abstract) SetConfigFilename(configFilename string) { a.configFilename = configFilename } -// устанавливает канал событий приложения +// SetEvents устанавливает канал событий приложения func (a *Abstract) SetEvents(events chan *common.ApplicationEvent) { a.events = events } @@ -125,7 +125,7 @@ func (a *Abstract) SendEvents(ev *common.ApplicationEvent) bool { return true } -// возвращает канал завершения приложения +// Done возвращает канал завершения приложения func (a *Abstract) Done() <-chan bool { return a.done } @@ -137,33 +137,33 @@ func (a *Abstract) Close() { } } -// возвращает сервисы, используемые приложением +// Services возвращает сервисы, используемые приложением func (a *Abstract) Services() []interface{} { return a.services } -// инициализирует сервисы +// FireInit инициализирует сервисы func (a *Abstract) FireInit(event *common.ApplicationEvent, abstractService interface{}) { service := abstractService.(common.Service) service.OnInit(event) } -// инициализирует приложение +// Init инициализирует приложение func (a *Abstract) Init(event *common.ApplicationEvent) {} -// запускает приложение +// Run запускает приложение func (a *Abstract) Run() {} -// запускает приложение с аргументами +// RunWithArgs запускает приложение с аргументами func (a *Abstract) RunWithArgs(args ...interface{}) {} -// запускает сервисы приложения +// FireRun запускает сервисы приложения func (a *Abstract) FireRun(event *common.ApplicationEvent, abstractService interface{}) {} -// останавливает сервисы приложения +// FireFinish останавливает сервисы приложения func (a *Abstract) FireFinish(event *common.ApplicationEvent, abstractService interface{}) {} -// возвращает таймауты приложения +// Timeout возвращает таймауты приложения func (a *Abstract) Timeout() common.Timeout { return a.CommonTimeout } diff --git a/application/grep.go b/application/grep.go index b2571a4..4572700 100644 --- a/application/grep.go +++ b/application/grep.go @@ -5,17 +5,17 @@ import ( "github.com/Halfi/postmanq/grep" ) -// приложение, ищущее логи по адресату или получателю +// Grep приложение, ищущее логи по адресату или получателю type Grep struct { Abstract } -// создает новое приложение +// NewGrep создает новое приложение func NewGrep() common.Application { return new(Grep) } -// запускает приложение с аргументами +// RunWithArgs запускает приложение с аргументами func (g *Grep) RunWithArgs(args ...interface{}) { common.App = g g.services = []interface{}{ @@ -31,7 +31,7 @@ func (g *Grep) RunWithArgs(args ...interface{}) { g.run(g, event) } -// запускает сервисы приложения +// FireRun запускает сервисы приложения func (g *Grep) FireRun(event *common.ApplicationEvent, abstractService interface{}) { service := abstractService.(common.GrepService) go service.OnGrep(event) diff --git a/application/post.go b/application/post.go index ea2980a..778cba2 100644 --- a/application/post.go +++ b/application/post.go @@ -3,6 +3,8 @@ package application import ( "runtime" + "gopkg.in/yaml.v3" + "github.com/Halfi/postmanq/common" "github.com/Halfi/postmanq/connector" "github.com/Halfi/postmanq/consumer" @@ -10,23 +12,22 @@ import ( "github.com/Halfi/postmanq/limiter" "github.com/Halfi/postmanq/logger" "github.com/Halfi/postmanq/mailer" - "gopkg.in/yaml.v3" ) -// приложение, рассылающее письма +// Post приложение, рассылающее письма type Post struct { Abstract - // количество отправителей + // Workers количество отправителей Workers int `yaml:"workers"` } -// создает новое приложение +// NewPost создает новое приложение func NewPost() common.Application { return new(Post) } -// запускает приложение +// Run запускает приложение func (p *Post) Run() { common.App = p common.Services = []interface{}{ @@ -42,12 +43,11 @@ func (p *Post) Run() { limiter.Inst(), connector.Inst(), mailer.Inst(), - //recipient.Inst(), } p.run(p, common.NewApplicationEvent(common.InitApplicationEventKind)) } -// инициализирует приложение +// Init инициализирует приложение func (p *Post) Init(event *common.ApplicationEvent) { // получаем настройки err := yaml.Unmarshal(event.Data, p) @@ -61,13 +61,13 @@ func (p *Post) Init(event *common.ApplicationEvent) { } } -// запускает сервисы приложения +// FireRun запускает сервисы приложения func (p *Post) FireRun(event *common.ApplicationEvent, abstractService interface{}) { service := abstractService.(common.SendingService) go service.OnRun() } -// останавливает сервисы приложения +// FireFinish останавливает сервисы приложения func (p *Post) FireFinish(event *common.ApplicationEvent, abstractService interface{}) { service := abstractService.(common.SendingService) go service.OnFinish() diff --git a/application/publish.go b/application/publish.go index b972a2c..3c86609 100644 --- a/application/publish.go +++ b/application/publish.go @@ -5,17 +5,17 @@ import ( "github.com/Halfi/postmanq/consumer" ) -// приложение, перекладывающее письма из очереди в очередь +// Publish приложение, перекладывающее письма из очереди в очередь type Publish struct { Abstract } -// создает новое приложение +// NewPublish создает новое приложение func NewPublish() common.Application { return new(Publish) } -// запускает приложение с аргументами +// RunWithArgs запускает приложение с аргументами func (p *Publish) RunWithArgs(args ...interface{}) { common.App = p p.services = []interface{}{ @@ -32,7 +32,7 @@ func (p *Publish) RunWithArgs(args ...interface{}) { p.run(p, event) } -// запускает сервисы приложения +// FireRun запускает сервисы приложения func (p *Publish) FireRun(event *common.ApplicationEvent, abstractService interface{}) { service := abstractService.(common.PublishService) go service.OnPublish(event) diff --git a/cmd/postmanq/main.go b/cmd/postmanq/main.go index 1bdb306..aa1432f 100644 --- a/cmd/postmanq/main.go +++ b/cmd/postmanq/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "github.com/Halfi/postmanq/application" "github.com/Halfi/postmanq/common" ) diff --git a/cmd/tools/pmq-grep/main.go b/cmd/tools/pmq-grep/main.go index 017c37b..134b439 100644 --- a/cmd/tools/pmq-grep/main.go +++ b/cmd/tools/pmq-grep/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "github.com/Halfi/postmanq/application" "github.com/Halfi/postmanq/common" ) diff --git a/cmd/tools/pmq-report/main.go b/cmd/tools/pmq-report/main.go index da3a8bc..9721a0d 100644 --- a/cmd/tools/pmq-report/main.go +++ b/cmd/tools/pmq-report/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "github.com/Halfi/postmanq/application" "github.com/Halfi/postmanq/common" ) diff --git a/common/application.go b/common/application.go index 0abebe4..9efb4ba 100644 --- a/common/application.go +++ b/common/application.go @@ -8,44 +8,44 @@ import ( ) const ( - // используется в примерах использования + // ExampleConfigYaml используется в примерах использования ExampleConfigYaml = "/path/to/config/file.yaml" - // невалидная строка, введенная пользователем + // InvalidInputString невалидная строка, введенная пользователем InvalidInputString = "" - // невалидное число, введенное пользователем + // InvalidInputInt невалидное число, введенное пользователем InvalidInputInt = 0 ) var ( - // объект текущего приложения, иногда необходим сервисам, для отправки событий приложению + // App объект текущего приложения, иногда необходим сервисам, для отправки событий приложению App Application - // сервисы, используются для создания итератора + // Services сервисы, используются для создания итератора Services []interface{} - // количество goroutine, может измениться для инициализации приложения + // DefaultWorkersCount количество goroutine, может измениться для инициализации приложения DefaultWorkersCount = runtime.NumCPU() - // используется в нескольких пакетах, поэтому вынес сюда + // FilenameRegex используется в нескольких пакетах, поэтому вынес сюда FilenameRegex = regexp.MustCompile(`[^\\/]+\.[^\\/]+`) - // печает аргументы, используемые приложением + // PrintUsage печает аргументы, используемые приложением PrintUsage = func(f *flag.Flag) { format := " -%s %s\n" fmt.Printf(format, f.Name, f.Usage) } ) -// проект содержит несколько приложений: pmq-grep, pmq-publish, pmq-report, postmanq и т.д. +// Application проект содержит несколько приложений: pmq-grep, pmq-publish, pmq-report, postmanq и т.д. // чтобы упростить и стандартизировать приложения, разработан этот интерфейс type Application interface { GetConfigFilename() string - // устанавливает путь к файлу с настройками + // SetConfigFilename устанавливает путь к файлу с настройками SetConfigFilename(string) - // проверяет валидность пути к файлу с настройками + // IsValidConfigFilename проверяет валидность пути к файлу с настройками IsValidConfigFilename(string) bool // InitChannels init channels @@ -60,33 +60,33 @@ type Application interface { // SendEvents send event to the channel SendEvents(ev *ApplicationEvent) bool - // возвращает канал завершения приложения + // Done возвращает канал завершения приложения Done() <-chan bool // Close main app Close() - // возвращает сервисы, используемые приложением + // Services возвращает сервисы, используемые приложением Services() []interface{} - // инициализирует сервисы + // FireInit инициализирует сервисы FireInit(*ApplicationEvent, interface{}) - // запускает сервисы приложения + // FireRun запускает сервисы приложения FireRun(*ApplicationEvent, interface{}) - // останавливает сервисы приложения + // FireFinish останавливает сервисы приложения FireFinish(*ApplicationEvent, interface{}) - // инициализирует приложение + // Init инициализирует приложение Init(*ApplicationEvent) - // запускает приложение + // Run запускает приложение Run() - // запускает приложение с аргументами + // RunWithArgs запускает приложение с аргументами RunWithArgs(...interface{}) - // возвращает таймауты приложения + // Timeout возвращает таймауты приложения Timeout() Timeout } diff --git a/common/client.go b/common/client.go index c86740e..7e57869 100644 --- a/common/client.go +++ b/common/client.go @@ -1,63 +1,68 @@ package common import ( + "fmt" "net" "net/smtp" "time" ) -// статус клиента почтового сервера +// SmtpClientStatus статус клиента почтового сервера type SmtpClientStatus int const ( - // отсылает письмо + // WorkingSmtpClientStatus отсылает письмо WorkingSmtpClientStatus SmtpClientStatus = iota - // ожидает письма + // WaitingSmtpClientStatus ожидает письма WaitingSmtpClientStatus - // отсоединен + // DisconnectedSmtpClientStatus отсоединен DisconnectedSmtpClientStatus ) -// клиент почтового сервера +// SmtpClient клиент почтового сервера type SmtpClient struct { // идертификатор клиента для удобства в логах Id int - // соединение к почтовому серверу + // Conn соединение к почтовому серверу Conn net.Conn - // реальный smtp клиент + // Worker реальный smtp клиент Worker *smtp.Client - // дата создания или изменения статуса клиента + // ModifyDate дата создания или изменения статуса клиента ModifyDate time.Time - // статус + // Status статус Status SmtpClientStatus // таймер, по истечении которого, соединение к почтовому сервису будет разорвано timer *time.Timer } -// сстанавливайт таймаут на чтение и запись соединения -func (s *SmtpClient) SetTimeout(timeout time.Duration) { - s.Conn.SetDeadline(time.Now().Add(timeout)) +// SetTimeout останавливает таймаут на чтение и запись соединения +func (s *SmtpClient) SetTimeout(timeout time.Duration) error { + err := s.Conn.SetDeadline(time.Now().Add(timeout)) + if err != nil { + return fmt.Errorf("can't set smtp connection timeout: %w", err) + } + return nil } -// переводит клиента в ожидание +// Wait переводит клиента в ожидание // после окончания ожидания соединение разрывается, а статус меняется на отсоединенный func (s *SmtpClient) Wait() { s.Status = WaitingSmtpClientStatus s.timer = time.AfterFunc(App.Timeout().Waiting, func() { s.Status = DisconnectedSmtpClientStatus - s.Worker.Quit() + _ = s.Worker.Quit() s.timer = nil }) } -// переводит клиента в рабочее состояние +// Wakeup переводит клиента в рабочее состояние // если клиент был в ожидании, ожидание прерывается func (s *SmtpClient) Wakeup() { s.Status = WorkingSmtpClientStatus diff --git a/common/event.go b/common/event.go index ed1ae8f..12352ae 100644 --- a/common/event.go +++ b/common/event.go @@ -1,98 +1,100 @@ package common -import "time" +import ( + "time" +) -// тип события приложения +// ApplicationEventKind тип события приложения type ApplicationEventKind int const ( - // инициализации сервисов + // InitApplicationEventKind инициализации сервисов InitApplicationEventKind ApplicationEventKind = iota - // запуск сервисов + // RunApplicationEventKind запуск сервисов RunApplicationEventKind - // завершение сервисов + // FinishApplicationEventKind завершение сервисов FinishApplicationEventKind ) -// событие приложения +// ApplicationEvent событие приложения type ApplicationEvent struct { - // тип события + // Kind тип события Kind ApplicationEventKind - // данные из файла настроек + // Data данные из файла настроек Data []byte - // аргументы командной строки + // Args аргументы командной строки Args map[string]interface{} } -// возвращает аргумент, как булевый тип +// GetBoolArg возвращает аргумент, как булевый тип func (e *ApplicationEvent) GetBoolArg(key string) bool { return e.Args[key].(bool) } -// возвращает аргумент, как число +// GetIntArg возвращает аргумент, как число func (e *ApplicationEvent) GetIntArg(key string) int { return e.Args[key].(int) } -// возвращает аргумент, как строку +// GetStringArg возвращает аргумент, как строку func (e *ApplicationEvent) GetStringArg(key string) string { return e.Args[key].(string) } -// создает событие с указанным типом +// NewApplicationEvent создает событие с указанным типом func NewApplicationEvent(kind ApplicationEventKind) *ApplicationEvent { return &ApplicationEvent{Kind: kind} } -// результат отправки письма +// SendEventResult результат отправки письма type SendEventResult int const ( - // успех + // SuccessSendEventResult успех SuccessSendEventResult SendEventResult = iota - // превышение лимита + // OverlimitSendEventResult превышение лимита OverlimitSendEventResult - // ошибка + // ErrorSendEventResult ошибка ErrorSendEventResult - // повторная отправка через некоторое время + // DelaySendEventResult повторная отправка через некоторое время DelaySendEventResult - // отмена отправки + // RevokeSendEventResult отмена отправки RevokeSendEventResult ) -// событие отправки письма +// SendEvent событие отправки письма type SendEvent struct { - // елиент для отправки писем + // Client клиент для отправки писем Client *SmtpClient - // письмо, полученное из очереди + // Message письмо, полученное из очереди Message *MailMessage - // дата создания необходима при получении подключения к почтовому сервису + // CreateDate дата создания необходима при получении подключения к почтовому сервису CreateDate time.Time - // результат + // Result результат Result chan SendEventResult - // количество попыток отправок письма + // TryCount количество попыток отправок письма TryCount int - // итератор сервисов, участвующих в отправке письма + // Iterator итератор сервисов, участвующих в отправке письма Iterator *Iterator - // очередь, в которую необходимо будет положить клиента после отправки письма + // Queue очередь, в которую необходимо будет положить клиента после отправки письма Queue *LimitedQueue } -// создает событие отправки сообщения +// NewSendEvent создает событие отправки сообщения func NewSendEvent(message *MailMessage) *SendEvent { event := new(SendEvent) event.Message = message diff --git a/common/iterator.go b/common/iterator.go index ea0363c..7b8444e 100644 --- a/common/iterator.go +++ b/common/iterator.go @@ -1,6 +1,6 @@ package common -// итератор, используется для слабой связи между сервисами приложения +// Iterator итератор, используется для слабой связи между сервисами приложения type Iterator struct { // элементы items []interface{} @@ -9,17 +9,17 @@ type Iterator struct { current int } -// создает итератор +// NewIterator создает итератор func NewIterator(items []interface{}) *Iterator { return &Iterator{items: items, current: -1} } -// отдает первый элемент +// First отдает первый элемент func (i Iterator) First() interface{} { return i.items[0] } -// отдает следующий элемент +// Next отдает следующий элемент func (i *Iterator) Next() interface{} { var item interface{} i.current++ @@ -34,7 +34,7 @@ func (i *Iterator) isValidCurrent() bool { return i.current < len(i.items) } -// отдает текущий элемент +// Current отдает текущий элемент func (i Iterator) Current() interface{} { var item interface{} if i.isValidCurrent() { @@ -43,7 +43,7 @@ func (i Iterator) Current() interface{} { return item } -// сигнализирует об окончании итерации +// IsDone сигнализирует об окончании итерации func (i Iterator) IsDone() bool { return i.current >= len(i.items) } diff --git a/common/post.go b/common/post.go index 31dfb27..787756c 100644 --- a/common/post.go +++ b/common/post.go @@ -14,7 +14,8 @@ const ( ) var ( - // Регулярка для проверки адреса почты, сразу компилируем, чтобы при отправке не терять на этом время + // EmailRegexp Регулярка для проверки адреса почты, сразу компилируем, чтобы при отправке не терять на этом время + //nolint:lll EmailRegexp = regexp.MustCompile("^(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|\"(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21\\x23-\\x5b\\x5d-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])*\")@((?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\\[(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?|[a-z0-9-]*[a-z0-9]:(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21-\\x5a\\x53-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])+)\\]))$") EmptyStrSlice = []string{} ) diff --git a/common/queue.go b/common/queue.go index d1bb089..1abf425 100644 --- a/common/queue.go +++ b/common/queue.go @@ -2,7 +2,7 @@ package common import "sync" -// потоко-безопасная очередь +// Queue потоко-безопасная очередь type Queue struct { // флаг, сигнализирующий, что очередь пуста empty bool @@ -14,7 +14,7 @@ type Queue struct { mutex *sync.Mutex } -// создает новую очередь +// NewQueue создает новую очередь func NewQueue() *Queue { return &Queue{ empty: true, @@ -23,7 +23,7 @@ func NewQueue() *Queue { } } -// добавляет элемент в конец очереди +// Push добавляет элемент в конец очереди func (q *Queue) Push(item interface{}) { q.mutex.Lock() if q.empty { @@ -33,7 +33,7 @@ func (q *Queue) Push(item interface{}) { q.mutex.Unlock() } -// достает первый элемент из очереди +// Pop достает первый элемент из очереди func (q *Queue) Pop() interface{} { var item interface{} q.mutex.Lock() @@ -51,7 +51,7 @@ func (q *Queue) Pop() interface{} { return item } -// сигнализирует, что очередь пуста +// Empty сигнализирует, что очередь пуста func (q *Queue) Empty() bool { var empty bool q.mutex.Lock() @@ -60,7 +60,7 @@ func (q *Queue) Empty() bool { return empty } -// возвращает длину очереди +// Len возвращает длину очереди func (q *Queue) Len() int { var itemsLen int q.mutex.Lock() @@ -80,7 +80,7 @@ const ( unlimitedQueueStatus ) -// лимитированная очередь, в ней будут храниться клиенты к почтовым сервисам +// LimitedQueue лимитированная очередь, в ней будут храниться клиенты к почтовым сервисам type LimitedQueue struct { *Queue @@ -91,7 +91,7 @@ type LimitedQueue struct { maxLen int } -// создает новую лимитированную очередь +// NewLimitQueue создает новую лимитированную очередь func NewLimitQueue() *LimitedQueue { return &LimitedQueue{ Queue: NewQueue(), @@ -99,7 +99,7 @@ func NewLimitQueue() *LimitedQueue { } } -// сигнализирует, что очередь имеет лимит +// HasLimit сигнализирует, что очередь имеет лимит func (l *LimitedQueue) HasLimit() bool { l.mutex.Lock() hasLimit := l.status == limitedQueueStatus @@ -107,14 +107,14 @@ func (l *LimitedQueue) HasLimit() bool { return hasLimit } -// устанавливает лимит очереди +// HasLimitOn устанавливает лимит очереди func (l *LimitedQueue) HasLimitOn() { if l.MaxLen() > 0 && !l.HasLimit() { l.setStatus(limitedQueueStatus) } } -// снимает лимит очереди +// HasLimitOff снимает лимит очереди func (l *LimitedQueue) HasLimitOff() { l.setStatus(unlimitedQueueStatus) } @@ -126,7 +126,7 @@ func (l *LimitedQueue) setStatus(status queueStatus) { l.mutex.Unlock() } -// максимальная длина очереди до того момента, как был установлен лимит +// MaxLen максимальная длина очереди до того момента, как был установлен лимит func (l *LimitedQueue) MaxLen() int { l.mutex.Lock() maxLen := l.maxLen @@ -134,7 +134,7 @@ func (l *LimitedQueue) MaxLen() int { return maxLen } -// увеличивает максимальную длину очереди +// AddMaxLen увеличивает максимальную длину очереди func (l *LimitedQueue) AddMaxLen() { l.mutex.Lock() l.maxLen++ diff --git a/common/service.go b/common/service.go index dca4665..5cb6f57 100644 --- a/common/service.go +++ b/common/service.go @@ -1,46 +1,46 @@ package common -// программа отправки почты получилась довольно сложной, т.к. она выполняет обработку и отправку писем, +// Программа отправки почты получилась довольно сложной, т.к. она выполняет обработку и отправку писем, // работает с диском и с сетью, ведет логирование и проверяет ограничения перед отправкой // из - за такого насыщенного функционала, было принято решение разбить программу на логические части - сервисы // сервис - это модуль программы, отвечающий за выполнение одной конкретной задачи, например логирование // сервис может сам выполнять эту задачу, либо передавать выполнение задачи внутренним обработчикам -// сервис требующий инициализиции -// данные для инициализиции берутся из файла настроек +// Service сервис требующий инициализации +// данные для инициализации берутся из файла настроек type Service interface { OnInit(*ApplicationEvent) OnFinish() } -// сервис получающий событие отправки письма +// EventService сервис получающий событие отправки письма // используется сервисами для передачи события друг другу type EventService interface { Event(ev *SendEvent) bool } -// сервис принимающий участие в отправке письма +// SendingService сервис принимающий участие в отправке письма type SendingService interface { Service EventService OnRun() } -// сервис принимающий участие в агрегации и выводе в консоль писем с ошибками +// ReportService сервис принимающий участие в агрегации и выводе в консоль писем с ошибками type ReportService interface { Service EventService OnShowReport() } -// сервис перекладывающий письма из очереди в очередь +// PublishService сервис перекладывающий письма из очереди в очередь type PublishService interface { Service EventService OnPublish(*ApplicationEvent) } -// сервис ищущий записи в логе по письму +// GrepService сервис ищущий записи в логе по письму type GrepService interface { Service OnGrep(*ApplicationEvent) diff --git a/connector/preparer.go b/connector/preparer.go index ca5014d..5537d1a 100644 --- a/connector/preparer.go +++ b/connector/preparer.go @@ -3,10 +3,11 @@ package connector import ( "errors" "fmt" + "time" + "github.com/Halfi/postmanq/common" "github.com/Halfi/postmanq/logger" "github.com/Halfi/postmanq/mailer" - "time" ) // заготовщик, подготавливает событие соединения @@ -56,11 +57,9 @@ connectToMailServer: errors.New(fmt.Sprintf("511 preparer#%d-%d can't lookup %s", p.id, event.Message.Id, event.Message.HostnameTo)), ) } - return waitLookup: logger.By(event.Message.HostnameFrom).Debug("preparer#%d-%d wait ending look up mail server %s...", p.id, event.Message.Id, event.Message.HostnameTo) time.Sleep(common.App.Timeout().Sleep) goto connectToMailServer - return } diff --git a/connector/seeker.go b/connector/seeker.go index 90eca96..7731f08 100644 --- a/connector/seeker.go +++ b/connector/seeker.go @@ -1,10 +1,11 @@ package connector import ( - "github.com/Halfi/postmanq/logger" "net" "strings" "sync" + + "github.com/Halfi/postmanq/logger" ) var ( diff --git a/connector/server.go b/connector/server.go index 051252c..83fc4ee 100644 --- a/connector/server.go +++ b/connector/server.go @@ -1,8 +1,9 @@ package connector import ( - "github.com/Halfi/postmanq/common" "net" + + "github.com/Halfi/postmanq/common" ) // статус почтового сервис @@ -39,9 +40,6 @@ type MxServer struct { // ip сервера ips []net.IP - // клиенты сервера - clients []*common.SmtpClient - // А запись сервера realServerName string diff --git a/connector/service.go b/connector/service.go index 77202f8..3c25de1 100644 --- a/connector/service.go +++ b/connector/service.go @@ -37,7 +37,7 @@ var ( } ) -// сервис, управляющий соединениями к почтовым сервисам +// Service сервис, управляющий соединениями к почтовым сервисам // письма могут отсылаться в несколько потоков, почтовый сервис может разрешить несколько подключений с одного IP // количество подключений может быть не равно количеству отсылающих потоков // если доверить управление подключениями отправляющим потокам, тогда это затруднит общее управление подключениями @@ -49,7 +49,7 @@ type Service struct { Configs map[string]*Config `yaml:"postmans"` } -// создает новый сервис соединений +// Inst создает новый сервис соединений func Inst() *Service { if service == nil { service = new(Service) @@ -57,7 +57,7 @@ func Inst() *Service { return service } -// инициализирует сервис соединений +// OnInit инициализирует сервис соединений func (s *Service) OnInit(event *common.ApplicationEvent) { err := yaml.Unmarshal(event.Data, s) if err == nil { @@ -121,7 +121,7 @@ func (s *Service) init(conf *Config, hostname string) { } } -// запускает горутины +// OnRun запускает горутины func (s *Service) OnRun() { for i := 0; i < s.ConnectorsCount; i++ { id := i + 1 @@ -141,7 +141,7 @@ func (s *Service) Event(ev *common.SendEvent) bool { return true } -// завершает работу сервиса соединений +// OnFinish завершает работу сервиса соединений func (s *Service) OnFinish() { if !eventsClosed { eventsClosed = true @@ -151,14 +151,6 @@ func (s *Service) OnFinish() { func (s Service) getTlsConfig(hostname string) *tls.Config { if conf, ok := s.Configs[hostname]; ok { - //tlsConfig := new(tls.Config) - //tlsConfig.Certificates = conf.certs - //tlsConfig.RootCAs = conf.pool - //tlsConfig.ClientCAs = conf.pool - //tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert - //tlsConfig.CipherSuites = cipherSuites - //tlsConfig.MinVersion = tls.VersionTLS12 - //tlsConfig.SessionTicketsDisabled = true return conf.tlsConfig } else { logger.By(hostname).Err("connection service can't make tls config by %s", hostname) @@ -193,7 +185,7 @@ func (s Service) getHostname(hostname string) string { } } -// событие создания соединения +// ConnectionEvent событие создания соединения type ConnectionEvent struct { *common.SendEvent @@ -211,16 +203,16 @@ type ConnectionEvent struct { } type Config struct { - // путь до файла с закрытым ключом + // PrivateKeyFilename путь до файла с закрытым ключом PrivateKeyFilename string `yaml:"privateKey"` - // путь до файла с сертификатом + // CertFilename путь до файла с сертификатом CertFilename string `yaml:"certificate"` - // ip с которых будем рассылать письма + // Addresses ip с которых будем рассылать письма Addresses []string `yaml:"ips"` - // hostname, на котором будет слушаться 25 порт + // MXHostname hostname, на котором будет слушаться 25 порт MXHostname string `yaml:"mxHostname"` // количество ip diff --git a/consumer/binding.go b/consumer/binding.go index 6f1de78..3abb620 100644 --- a/consumer/binding.go +++ b/consumer/binding.go @@ -2,10 +2,12 @@ package consumer import ( "fmt" + "time" + + "github.com/streadway/amqp" + "github.com/Halfi/postmanq/common" "github.com/Halfi/postmanq/logger" - "github.com/streadway/amqp" - "time" ) // тип точки обмена @@ -13,8 +15,8 @@ type ExchangeType string const ( DirectExchangeType ExchangeType = "direct" - FanoutExchangeType = "fanout" - TopicExchangeType = "topic" + FanoutExchangeType ExchangeType = "fanout" + TopicExchangeType ExchangeType = "topic" ) // тип точки обмена для неотправленного письма @@ -210,6 +212,4 @@ type AssistantBinding struct { Binding Binding `yaml:",inline"` Dest map[string]string `yaml:"dest"` - - destBindings []*Binding } diff --git a/consumer/consumer.go b/consumer/consumer.go index 8edc981..eb90b32 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -23,10 +23,9 @@ var ( // получатель сообщений из очереди type Consumer struct { - id int - connect *amqp.Connection - binding *Binding - deliveries <-chan amqp.Delivery + id int + connect *amqp.Connection + binding *Binding } // создает нового получателя @@ -111,7 +110,7 @@ func (c *Consumer) consumeDeliveries(id int, channel *amqp.Channel, deliveries < event = nil } else { failureBinding := c.binding.failureBindings[TechnicalFailureBindingType] - err = channel.Publish( + err := channel.Publish( failureBinding.Exchange, failureBinding.Routing, false, @@ -122,29 +121,31 @@ func (c *Consumer) consumeDeliveries(id int, channel *amqp.Channel, deliveries < DeliveryMode: amqp.Transient, }, ) - logger.All().Warn("consumer#%d can't unmarshal delivery body, body should be json, %s given", c.id, string(delivery.Body)) + logger.All().WarnWithErr(err, "consumer#%d can't unmarshal delivery body, body should be json, %s given", c.id, string(delivery.Body)) } // всегда подтверждаем получение сообщения // даже если во время отправки письма возникли ошибки, // мы уже положили это письмо в другую очередь - delivery.Ack(true) + _ = delivery.Ack(true) } } // обрабатывает письма, которые не удалось отправить func (c *Consumer) handleErrorSend(channel *amqp.Channel, message *common.MailMessage) { - // если есть ошибка при отправке, значит мы попали в серый список https://ru.wikipedia.org/wiki/%D0%A1%D0%B5%D1%80%D1%8B%D0%B9_%D1%81%D0%BF%D0%B8%D1%81%D0%BE%D0%BA + // если есть ошибка при отправке, значит мы попали в серый список + // https://ru.wikipedia.org/wiki/%D0%A1%D0%B5%D1%80%D1%8B%D0%B9_%D1%81%D0%BF%D0%B8%D1%81%D0%BE%D0%BA // или получили какую то ошибку от почтового сервиса, что он не может // отправить письмо указанному адресату или выполнить какую то команду var failureBinding *Binding // если ошибка связана с невозможностью отправить письмо адресату // перекладываем письмо в очередь для плохих писем // и пусть отправители сами с ними разбираются - if message.Error.Code >= 500 && message.Error.Code < 600 { + switch { + case message.Error.Code >= 500 && message.Error.Code < 600: failureBinding = c.binding.failureBindings[errorSignsMap.BindingType(message)] - } else if message.Error.Code == 450 || message.Error.Code == 451 { // мы точно попали в серый список, надо повторить отправку письма попозже + case message.Error.Code == 450 || message.Error.Code == 451: failureBinding = delayedBindings[common.ThirtyMinutesDelayedBinding] - } else { + default: failureBinding = c.binding.failureBindings[UnknownFailureBindingType] } jsonMessage, err := json.Marshal(message) @@ -176,7 +177,7 @@ func (c *Consumer) handleErrorSend(channel *amqp.Channel, message *common.MailMe logger. By(message.HostnameFrom). Debug( - "consumer#%d-%d can't publish failure mail to queue %s, message: %s, code: %d, publish error% %v", + "consumer#%d-%d can't publish failure mail to queue %s, message: %s, code: %d, publish error %v", c.id, message.Id, failureBinding.Queue, @@ -256,7 +257,7 @@ func (c *Consumer) publishDelayedMessage(channel *amqp.Channel, bindingType comm false, amqp.Publishing{ ContentType: "text/plain", - Body: []byte(jsonMessage), + Body: jsonMessage, DeliveryMode: amqp.Transient, }, ) @@ -366,9 +367,15 @@ func (c *Consumer) consumeAndPublishMessages(event *common.ApplicationEvent, gro }, ) if err == nil { - delivery.Ack(true) + err := delivery.Ack(true) + if err != nil { + logger.All().WarnWithErr(err, "can't acknowledge message") + } } else { - delivery.Nack(true, true) + err := delivery.Nack(true, true) + if err != nil { + logger.All().WarnWithErr(err, "can't not acknowledge message") + } } } group.Done() diff --git a/consumer/service.go b/consumer/service.go index 2714cc0..8c74cf5 100644 --- a/consumer/service.go +++ b/consumer/service.go @@ -145,7 +145,6 @@ func (s *Service) notifyCloseError(config *Config, closeErrors chan *amqp.Error) connect, err := amqp.Dial(config.URI) if err == nil { s.connections[config.URI] = connect - closeErrors = nil if apps, ok := s.consumers[config.URI]; ok { for _, app := range apps { app.connect = connect diff --git a/consumer/sign.go b/consumer/sign.go index bf06d7d..c2dc95c 100644 --- a/consumer/sign.go +++ b/consumer/sign.go @@ -1,8 +1,9 @@ package consumer import ( - "github.com/Halfi/postmanq/common" "strings" + + "github.com/Halfi/postmanq/common" ) var ( diff --git a/consumer/waiter.go b/consumer/waiter.go index b42aa31..4d17afb 100644 --- a/consumer/waiter.go +++ b/consumer/waiter.go @@ -5,7 +5,7 @@ import ( "time" ) -// ожидающий +// Waiter ожидающий type Waiter struct { *time.Ticker } diff --git a/go.mod b/go.mod index b1b52ca..1af451f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/Halfi/postmanq -go 1.15 +go 1.16 require ( github.com/byorty/clitable v0.0.0-20150722055417-9f60651b8308 diff --git a/guardian/guardian.go b/guardian/guardian.go index 43caed6..ff0cda8 100644 --- a/guardian/guardian.go +++ b/guardian/guardian.go @@ -5,7 +5,7 @@ import ( "github.com/Halfi/postmanq/logger" ) -// защитник, блокирует отправку на указанные почтовые сервисы +// Guardian защитник, блокирует отправку на указанные почтовые сервисы type Guardian struct { // идентификатор для логов id int diff --git a/guardian/service.go b/guardian/service.go index 34ce4e4..a32db89 100644 --- a/guardian/service.go +++ b/guardian/service.go @@ -8,23 +8,23 @@ import ( ) var ( - // сервис блокирующий отправку писем + // Сервис блокирующий отправку писем service *Service - // канал для приема событий отправки писем + // Канал для приема событий отправки писем events = make(chan *common.SendEvent) eventsClosed bool ) -// сервис блокирующий отправку писем +// Service сервис блокирующий отправку писем type Service struct { - // количество горутин блокирующий отправку писем к почтовым сервисам + // GuardiansCount количество горутин блокирующий отправку писем к почтовым сервисам GuardiansCount int `yaml:"workers"` Configs map[string]*Config `yaml:"postmans"` } -// создает новый сервис блокировок +// Inst создает новый сервис блокировок func Inst() common.SendingService { if service == nil { service = new(Service) @@ -32,7 +32,7 @@ func Inst() common.SendingService { return service } -// инициализирует сервис блокировок +// OnInit инициализирует сервис блокировок func (s *Service) OnInit(event *common.ApplicationEvent) { logger.All().Debug("init guardians...") err := yaml.Unmarshal(event.Data, s) @@ -45,7 +45,7 @@ func (s *Service) OnInit(event *common.ApplicationEvent) { } } -// запускает горутины +// OnRun запускает горутины func (s *Service) OnRun() { for i := 0; i < s.GuardiansCount; i++ { go newGuardian(i + 1) @@ -62,7 +62,7 @@ func (s *Service) Event(ev *common.SendEvent) bool { return true } -// завершает работу сервиса соединений +// OnFinish завершает работу сервиса соединений func (s *Service) OnFinish() { if !eventsClosed { eventsClosed = true @@ -73,12 +73,11 @@ func (s *Service) OnFinish() { func (s Service) getExcludes(hostname string) []string { if conf, ok := s.Configs[hostname]; ok { return conf.Excludes - } else { - return common.EmptyStrSlice } + return common.EmptyStrSlice } type Config struct { - // хосты, на которую блокируется отправка писем + // Excludes хосты, на которую блокируется отправка писем Excludes []string `yaml:"exclude"` } diff --git a/limiter/cleaner.go b/limiter/cleaner.go index 83429b9..733a895 100644 --- a/limiter/cleaner.go +++ b/limiter/cleaner.go @@ -5,7 +5,7 @@ import ( "time" ) -// чистильщик, проверяет значения ограничений и обнуляет значения ограничений +// Cleaner чистильщик, проверяет значения ограничений и обнуляет значения ограничений type Cleaner struct{} // создает нового чистильщика diff --git a/limiter/limit.go b/limiter/limit.go index 2043be7..f100bae 100644 --- a/limiter/limit.go +++ b/limiter/limit.go @@ -1,8 +1,9 @@ package limiter import ( - "github.com/Halfi/postmanq/common" "time" + + "github.com/Halfi/postmanq/common" ) // тип ограничения @@ -10,9 +11,9 @@ type Kind string const ( SecondKind Kind = "second" - MinuteKind = "minute" - HourKind = "hour" - DayKind = "day" + MinuteKind Kind = "minute" + HourKind Kind = "hour" + DayKind Kind = "day" ) var ( diff --git a/limiter/limiter.go b/limiter/limiter.go index 535d14b..4f3e11b 100644 --- a/limiter/limiter.go +++ b/limiter/limiter.go @@ -7,7 +7,7 @@ import ( "github.com/Halfi/postmanq/logger" ) -// ограничитель, проверяет количество отправленных писем почтовому сервису +// Limiter ограничитель, проверяет количество отправленных писем почтовому сервису type Limiter struct { // идентификатор для логов id int diff --git a/limiter/service.go b/limiter/service.go index e21115a..9300ab7 100644 --- a/limiter/service.go +++ b/limiter/service.go @@ -21,15 +21,15 @@ var ( eventsClosed bool ) -// сервис ограничений, следит за тем, чтобы почтовым сервисам не отправилось больше писем, чем нужно +// Service сервис ограничений, следит за тем, чтобы почтовым сервисам не отправилось больше писем, чем нужно type Service struct { - // количество горутин проверяющих количество отправленных писем + // LimitersCount количество горутин проверяющих количество отправленных писем LimitersCount int `yaml:"workers"` Configs map[string]*Config `yaml:"postmans"` } -// создает сервис ограничений +// Inst создает сервис ограничений func Inst() common.SendingService { if service == nil { service = new(Service) @@ -38,7 +38,7 @@ func Inst() common.SendingService { return service } -// инициализирует сервис +// OnInit инициализирует сервис func (s *Service) OnInit(event *common.ApplicationEvent) { logger.All().Debug("init limits...") err := yaml.Unmarshal(event.Data, s) @@ -66,7 +66,7 @@ func (s *Service) init(conf *Config, hostname string) { } } -// запускает проверку ограничений и очистку значений лимитов +// OnRun запускает проверку ограничений и очистку значений лимитов func (s *Service) OnRun() { // сразу запускаем проверку значений ограничений go newCleaner() @@ -85,7 +85,7 @@ func (s *Service) Event(ev *common.SendEvent) bool { return true } -// завершает работу сервиса соединений +// OnFinish завершает работу сервиса соединений func (s *Service) OnFinish() { if !eventsClosed { eventsClosed = true diff --git a/logger/message.go b/logger/message.go index e1b7f6d..acdbacf 100644 --- a/logger/message.go +++ b/logger/message.go @@ -3,10 +3,14 @@ package logger import ( "runtime/debug" + "github.com/rs/zerolog" + "github.com/Halfi/postmanq/common" ) -// запись логирования +var logger zerolog.Logger + +// Message запись логирования type Message struct { Hostname string @@ -23,20 +27,26 @@ func By(hostname string) *Message { } } -// пишет ошибку в лог +// Err пишет ошибку в лог func (m *Message) Err(message string, args ...interface{}) { go func() { logger.Error().Str("hostname", m.Hostname).Str("stack", string(debug.Stack())).Msgf(message, args...) }() } -// пишет произвольную ошибку в лог и завершает программу +func (m *Message) ErrErr(err error) { + go func() { + logger.Error().Str("hostname", m.Hostname).Str("stack", string(debug.Stack())).Interface("error", err).Err(err).Send() + }() +} + +// FailExit пишет произвольную ошибку в лог и завершает программу func (m *Message) FailExit(message string, args ...interface{}) { m.Err(message, args...) common.App.SendEvents(common.NewApplicationEvent(common.FinishApplicationEventKind)) } -// пишет ошибку с сообщением в лог и завершает программу +// FailExitWithErr пишет ошибку с сообщением в лог и завершает программу func (m *Message) FailExitWithErr(err error, message string, args ...interface{}) { go func() { l := logger.Error().Str("hostname", m.Hostname).Str("stack", string(debug.Stack())) @@ -49,7 +59,7 @@ func (m *Message) FailExitWithErr(err error, message string, args ...interface{} common.App.SendEvents(common.NewApplicationEvent(common.FinishApplicationEventKind)) } -// пишет системную ошибку в лог и завершает программу +// FailExitErr пишет системную ошибку в лог и завершает программу func (m *Message) FailExitErr(err error) { go func() { l := logger.Error().Str("hostname", m.Hostname).Str("stack", string(debug.Stack())) @@ -63,14 +73,14 @@ func (m *Message) FailExitErr(err error) { common.App.SendEvents(common.NewApplicationEvent(common.FinishApplicationEventKind)) } -// пишет произвольное предупреждение +// Warn пишет произвольное предупреждение func (m *Message) Warn(message string, args ...interface{}) { go func() { logger.Warn().Str("hostname", m.Hostname).Msgf(message, args...) }() } -// пишет системное предупреждение +// WarnErr пишет системное предупреждение func (m *Message) WarnErr(err error) { go func() { l := logger.Warn().Str("hostname", m.Hostname) @@ -81,7 +91,7 @@ func (m *Message) WarnErr(err error) { }() } -// пишет ошибку с сообщением +// WarnWithErr пишет ошибку с сообщением func (m *Message) WarnWithErr(err error, message string, args ...interface{}) { go func() { l := logger.Warn().Str("hostname", m.Hostname) @@ -92,12 +102,12 @@ func (m *Message) WarnWithErr(err error, message string, args ...interface{}) { }() } -// пишет информационное сообщение +// Info пишет информационное сообщение func (m *Message) Info(message string, args ...interface{}) { go func() { logger.Info().Str("hostname", m.Hostname).Msgf(message, args...) }() } -// пишет сообщение для отладки +// Debug пишет сообщение для отладки func (m *Message) Debug(message string, args ...interface{}) { go func() { logger.Debug().Str("hostname", m.Hostname).Msgf(message, args...) diff --git a/logger/service.go b/logger/service.go index 3f527c9..f0574f7 100644 --- a/logger/service.go +++ b/logger/service.go @@ -13,7 +13,6 @@ import ( var ( service *Service - logger zerolog.Logger ) type Config struct { @@ -61,9 +60,6 @@ func (s *Service) OnInit(event *common.ApplicationEvent) { // callerMarshalFunc adds only last 2 parts zerolog.CallerMarshalFunc = callerMarshalFunc log.Logger = log.With().Caller().Logger() - - logger = log.With().Logger() - } else { All().FailExitErr(err) } diff --git a/mailer/mailer.go b/mailer/mailer.go index 58ae3f9..aa7cb45 100644 --- a/mailer/mailer.go +++ b/mailer/mailer.go @@ -1,17 +1,18 @@ package mailer import ( - "errors" "fmt" - "github.com/Halfi/postmanq/common" - "github.com/Halfi/postmanq/logger" - "github.com/byorty/dkim" "io" "strconv" "strings" + + "github.com/byorty/dkim" + + "github.com/Halfi/postmanq/common" + "github.com/Halfi/postmanq/logger" ) -// отправитель письма +// Mailer отправитель письма type Mailer struct { // идентификатор для логов id int @@ -37,7 +38,7 @@ func (m *Mailer) sendMail(event *common.SendEvent) { m.prepare(message) m.send(event) } else { - ReturnMail(event, errors.New(fmt.Sprintf("511 service#%d can't send mail#%d, envelope or ricipient is invalid", m.id, message.Id))) + ReturnMail(event, fmt.Errorf("511 service#%d can't send mail#%d, envelope or ricipient is invalid", m.id, message.Id)) } } @@ -63,28 +64,45 @@ func (m *Mailer) prepare(message *common.MailMessage) { func (m *Mailer) send(event *common.SendEvent) { message := event.Message worker := event.Client.Worker + logger.By(event.Message.HostnameFrom).Info("mailer#%d-%d begin sending mail", m.id, message.Id) logger.By(message.HostnameFrom).Debug("mailer#%d-%d receive smtp client#%d", m.id, message.Id, event.Client.Id) success := false - event.Client.SetTimeout(common.App.Timeout().Mail) + toErr := event.Client.SetTimeout(common.App.Timeout().Mail) + if toErr != nil { + logger.By(message.HostnameFrom).ErrErr(toErr) + } + err := worker.Mail(message.Envelope) if err == nil { logger.By(message.HostnameFrom).Debug("mailer#%d-%d send command MAIL FROM: %s", m.id, message.Id, message.Envelope) - event.Client.SetTimeout(common.App.Timeout().Rcpt) + + toErr := event.Client.SetTimeout(common.App.Timeout().Rcpt) + if toErr != nil { + logger.By(message.HostnameFrom).ErrErr(toErr) + } + err = worker.Rcpt(message.Recipient) if err == nil { logger.By(message.HostnameFrom).Debug("mailer#%d-%d send command RCPT TO: %s", m.id, message.Id, message.Recipient) - event.Client.SetTimeout(common.App.Timeout().Data) + + toErr := event.Client.SetTimeout(common.App.Timeout().Data) + if toErr != nil { + logger.By(message.HostnameFrom).ErrErr(toErr) + } + var wc io.WriteCloser wc, err = worker.Data() if err == nil { logger.By(message.HostnameFrom).Debug("mailer#%d-%d send command DATA", m.id, message.Id) + _, err = wc.Write(message.Body) if err == nil { _ = wc.Close() logger.By(message.HostnameFrom).Debug("%s", message.Body) logger.By(message.HostnameFrom).Debug("mailer#%d-%d send command .", m.id, message.Id) + // стараемся слать письма через уже созданное соединение, // поэтому после отправки письма не закрываем соединение err = worker.Reset() @@ -122,7 +140,7 @@ func ReturnMail(event *common.SendEvent, err error) { // и создать ошибку // письмо с ошибкой вернется в другую очередь, отличную от письмо без ошибки if e == nil { - event.Message.Error = &common.MailError{errorMessage, code} + event.Message.Error = &common.MailError{Message: errorMessage, Code: code} } } else { logger.All().Err("can't get err code from error: %s", err) diff --git a/recipient/recipient.go b/recipient/recipient.go deleted file mode 100644 index 4d4b79c..0000000 --- a/recipient/recipient.go +++ /dev/null @@ -1,99 +0,0 @@ -package recipient - -import ( - "net" - "net/textproto" -) - -//type RecipientState int -// -//const ( -// Ehlo RecipientState = iota -// Helo -// Mail -// Rcpt -// Data -// Rset -// Noop -// Vrfy -// Quit -//) -// -//var ( -// nexts = map[RecipientState][]RecipientState { -// Ehlo: []RecipientState{Mail, Rset, Noop, Quit, Vrfy}, -// Helo: []RecipientState{Mail, Rset, Noop, Quit, Vrfy}, -// Mail: []RecipientState{Rcpt, Rset, Noop, Quit}, -// Rcpt: []RecipientState{Data, Rcpt, Rset, Noop, Quit}, -// Data: []RecipientState{Mail, Rset, Noop, Quit}, -// Rset: []RecipientState{Mail, Rset, Noop, Quit}, -// Noop: []RecipientState{}, -// Vrfy: []RecipientState{}, -// Quit: []RecipientState{}, -// } -//) -// -//func (r RecipientState) getName() []byte {} - -type Recipient struct { - id int - state State - conn net.Conn - txt *textproto.Conn -} - -func newRecipient(id int, events chan *Event) { - mail := new(MailState) - mail.SetPossibles([]State{}) - - ehlo := new(EhloState) - ehlo.SetNext(mail) - ehlo.SetPossibles([]State{}) - - conn := new(ConnectState) - conn.SetNext(ehlo) - conn.SetPossibles([]State{}) - - recipient := &Recipient{ - id: id, - state: conn, - } - for event := range events { - recipient.handle(event) - } -} - -func (r *Recipient) handle(event *Event) { - r.txt = textproto.NewConn(event.conn) - - for { - r.state.SetEvent(event) - status := r.state.Read(r.txt) - goto handleStatus - - handleStatus: - switch status { - case SuccessStatus: - r.state.Write(r.txt) - r.state = r.state.GetNext() - - case FailureStatus: - r.state.Write(r.txt) - - case PossibleStatus: - var possibleStatus StateStatus - for _, possible := range r.state.GetPossibles() { - possible.SetEvent(event) - possibleStatus = possible.Read(r.txt) - if possibleStatus == SuccessStatus { - r.state = possible - status = possibleStatus - goto handleStatus - } - } - r.txt.Cmd("500 Syntax error, command unrecognized") - - } - return - } -} diff --git a/recipient/service.go b/recipient/service.go deleted file mode 100644 index 403602a..0000000 --- a/recipient/service.go +++ /dev/null @@ -1,117 +0,0 @@ -package recipient - -import ( - "fmt" - "net" - "strings" - - "gopkg.in/yaml.v3" - - "github.com/Halfi/postmanq/common" - "github.com/Halfi/postmanq/logger" -) - -var ( - service *Service -) - -func Inst() common.SendingService { - if service == nil { - service = new(Service) - } - return service -} - -type Config struct { - ListenerCount int `yaml:"listenerCount"` - Inbox string `yaml:"inbox"` - - // hostname, на котором будет слушаться 25 порт - MXHostname string `yaml:"mxHostname"` - - mxHostnames []string -} - -type Event struct { - serverHostname string - serverMxHostname string - clientHostname []byte - clientAddr net.Addr - conn *net.TCPConn - message *common.MailMessage -} - -type Service struct { - Configs map[string]*Config `yaml:"postmans"` -} - -func (s *Service) OnInit(event *common.ApplicationEvent) { - err := yaml.Unmarshal(event.Data, s) - if err == nil { - for name, config := range s.Configs { - if config.MXHostname != "" { - name = config.MXHostname - } - s.init(config, name) - } - } else { - logger.All().FailExitErr(err) - } -} - -func (s *Service) init(conf *Config, hostname string) { - mxes, err := net.LookupMX(hostname) - if err == nil { - conf.mxHostnames = make([]string, len(mxes)) - for i, mx := range mxes { - conf.mxHostnames[i] = strings.TrimRight(mx.Host, ".") - } - if conf.ListenerCount == 0 { - conf.ListenerCount = common.DefaultWorkersCount - } - } else { - logger.By(hostname).FailExit("recipient service - can't lookup mx for %s", hostname) - } -} - -func (s *Service) OnRun() { - for hostname, conf := range s.Configs { - for _, mxHostname := range conf.mxHostnames { - tcpAddr := fmt.Sprintf("%s:25", mxHostname) - addr, err := net.ResolveTCPAddr("tcp", tcpAddr) - if err == nil { - logger.By(hostname).Info("recipient service - resolve %s success", tcpAddr) - listener, err := net.ListenTCP("tcp", addr) - if err == nil { - go s.run(hostname, mxHostname, conf, listener) - } else { - logger.By(hostname).WarnWithErr(err, "recipient service - can't listen %s", tcpAddr) - } - } else { - logger.By(hostname).WarnWithErr(err, "recipient service - can't resolve %s", tcpAddr) - } - } - } -} - -func (s *Service) run(hostname, mxHostname string, conf *Config, listener *net.TCPListener) { - events := make(chan *Event) - for i := 0; i < conf.ListenerCount; i++ { - go newRecipient(i, events) - } - for { - conn, err := listener.AcceptTCP() - if err == nil { - events <- &Event{serverHostname: hostname, serverMxHostname: mxHostname, clientAddr: conn.RemoteAddr(), conn: conn} - } else { - logger.By(hostname).WarnWithErr(err, "recipient service - can't accept %s", hostname) - } - } -} - -// Event send event -func (s *Service) Event(_ *common.SendEvent) bool { - return true -} - -func (s *Service) OnFinish() {} diff --git a/recipient/state.go b/recipient/state.go deleted file mode 100644 index b3f7770..0000000 --- a/recipient/state.go +++ /dev/null @@ -1,128 +0,0 @@ -package recipient - -import ( - "bytes" - "net/textproto" - "regexp" -) - -type StateStatus int - -const ( - SuccessStatus StateStatus = iota - FailureStatus - PossibleStatus -) - -type StateError struct { - code int - message string -} - -type State interface { - SetEvent(*Event) - GetNext() State - SetNext(State) - GetPossibles() []State - SetPossibles([]State) - Read(*textproto.Conn) StateStatus - Write(*textproto.Conn) -} - -var ( - crlf = []byte("\r\n") - sp = []byte(" ") - - hostnameRegex = regexp.MustCompile(`^[\w\d\.\-]+\.\w{2,5}$`) - - ehlo = []byte("EHLO") - ehloLen = len(ehlo) - helo = []byte("HELO") - heloLen = len(helo) - mailCmd = []byte("MAIL FROM:") - mailCmdLen = len(mailCmd) -) - -type BaseState struct { - event *Event - next State - possibles []State - error *StateError -} - -func (b *BaseState) SetEvent(event *Event) { - b.event = event -} - -func (b BaseState) GetNext() State { - return b.next -} - -func (b *BaseState) SetNext(next State) { - b.next = next -} - -func (b BaseState) GetPossibles() []State { - return b.possibles -} - -func (b *BaseState) SetPossibles(possibles []State) { - b.possibles = possibles -} - -type ConnectState struct { - BaseState -} - -func (c *ConnectState) Read(conn *textproto.Conn) StateStatus { - return SuccessStatus -} - -func (c *ConnectState) Write(conn *textproto.Conn) { - conn.Cmd("220 %s ESMTP", c.event.serverHostname) -} - -type EhloState struct { - BaseState - useEhlo bool -} - -func (e *EhloState) Read(conn *textproto.Conn) StateStatus { - line, err := conn.ReadLineBytes() - if err == nil { - if bytes.Equal(ehlo, line[:ehloLen]) { - e.useEhlo = true - if hostnameRegex.Match(line[ehloLen:]) { - e.event.clientHostname = line[ehloLen:] - } - return SuccessStatus - } else if bytes.Equal(helo, line[:heloLen]) { - return SuccessStatus - } - } - return FailureStatus -} - -func (e *EhloState) Write(conn *textproto.Conn) { - if e.error == nil { - if e.useEhlo { - - } else { - - } - } else { - conn.Cmd(e.error.message, e.error.code) - } -} - -type MailState struct { - BaseState -} - -func (m *MailState) Read(conn *textproto.Conn) StateStatus { - return FailureStatus -} - -func (m *MailState) Write(conn *textproto.Conn) { - -}