Skip to content

Commit

Permalink
wip: ch dsl
Browse files Browse the repository at this point in the history
  • Loading branch information
stergiotis committed Jan 5, 2025
1 parent 178c3f8 commit 9704921
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 31 deletions.
4 changes: 4 additions & 0 deletions public/db/clickhouse/dsl/dsl.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (inst *Dsl) Parse(sql string) (err error) {
}
return
}
func (inst *Dsl) LoadDql(dql *ParsedDqlQuery) (err error) {
inst.Exprs = []chparser.Expr{dql.GetAst()}
return
}
func (inst *Dsl) Transform() (err error) {
err = inst.checkParsed()
if err != nil {
Expand Down
51 changes: 35 additions & 16 deletions public/db/clickhouse/dsl/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ package dsl

import (
chparser "github.com/AfterShip/clickhouse-sql-parser/parser"
"github.com/stergiotis/boxer/public/containers"
"github.com/stergiotis/boxer/public/observability/eh"
"github.com/stergiotis/boxer/public/observability/eh/eb"
"golang.org/x/exp/maps"
"iter"
)

type ParamSet struct {
type ParamSlotSet struct {
typesLu map[string][]*chparser.QueryParam
paramOccurrences int
}

func NewParamSet() *ParamSet {
return &ParamSet{typesLu: nil}
func NewParamSlotsSet() *ParamSlotSet {
return &ParamSlotSet{
typesLu: nil,
paramOccurrences: 0,
}
}

var ErrIncompatibleParam = eh.Errorf("a param with an incompatible type is already contained in param set")

func (inst *ParamSet) Add(param *chparser.QueryParam) (err error) {
func (inst *ParamSlotSet) Add(param *chparser.QueryParam) (err error) {
if param == nil {
return
}
Expand All @@ -42,7 +46,7 @@ func (inst *ParamSet) Add(param *chparser.QueryParam) (err error) {
inst.paramOccurrences++
return
}
func (inst *ParamSet) UnionMod(other *ParamSet) (err error) {
func (inst *ParamSlotSet) UnionMod(other *ParamSlotSet) (err error) {
for _, ps := range other.All() {
for _, p := range ps {
err = inst.Add(p)
Expand All @@ -54,16 +58,16 @@ func (inst *ParamSet) UnionMod(other *ParamSet) (err error) {
}
return
}
func (inst *ParamSet) TotalParamOccurrences() int {
func (inst *ParamSlotSet) TotalParamOccurrences() int {
return inst.paramOccurrences
}
func (inst *ParamSet) TotalDistinctParams() int {
func (inst *ParamSlotSet) TotalDistinctParams() int {
return len(inst.typesLu)
}
func (inst *ParamSet) IsEmpty() bool {
func (inst *ParamSlotSet) IsEmpty() bool {
return len(inst.typesLu) == 0
}
func (inst *ParamSet) All() iter.Seq2[string, []*chparser.QueryParam] {
func (inst *ParamSlotSet) All() iter.Seq2[string, []*chparser.QueryParam] {
return func(yield func(string, []*chparser.QueryParam) bool) {
for k, vs := range inst.typesLu {
if !yield(k, vs) {
Expand All @@ -72,7 +76,22 @@ func (inst *ParamSet) All() iter.Seq2[string, []*chparser.QueryParam] {
}
}
}
func (inst *ParamSet) Clear() {
func (inst *ParamSlotSet) NamesAndTypes() iter.Seq2[string, *containers.HashSet[string]] {
return func(yield func(string, *containers.HashSet[string]) bool) {
types := containers.NewHashSet[string](32)
defer types.Clear()
for k, vs := range inst.typesLu {
for _, v := range vs {
types.Add(v.Type.String())
}
if !yield(k, types) {
return
}
types.Clear()
}
}
}
func (inst *ParamSlotSet) Clear() {
if len(inst.typesLu) > 0 {
maps.Clear(inst.typesLu)
}
Expand All @@ -82,21 +101,21 @@ func isParamTypeCompatible(t1 string, t2 string) (compatible bool) {
return t1 == t2
}

type paramsDiscoverer struct {
type paramSlotsDiscoverer struct {
chparser.DefaultASTVisitor
params *ParamSet
params *ParamSlotSet
}

func newParamsDiscoverer() *paramsDiscoverer {
return &paramsDiscoverer{
func newParamSlotsDiscoverer() *paramSlotsDiscoverer {
return &paramSlotsDiscoverer{
DefaultASTVisitor: chparser.DefaultASTVisitor{},
params: nil,
}
}
func (inst *paramsDiscoverer) VisitQueryParam(expr *chparser.QueryParam) error {
func (inst *paramSlotsDiscoverer) VisitQueryParam(expr *chparser.QueryParam) error {
return inst.params.Add(expr)
}
func (inst *paramsDiscoverer) discover(ast chparser.Expr, params *ParamSet) (err error) {
func (inst *paramSlotsDiscoverer) discover(ast chparser.Expr, params *ParamSlotSet) (err error) {
if params == nil {
return eh.Errorf("paramset is nil")
}
Expand Down
43 changes: 38 additions & 5 deletions public/db/clickhouse/dsl/parseddqlquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,35 @@ import (
"fmt"
chparser "github.com/AfterShip/clickhouse-sql-parser/parser"
"github.com/fxamacker/cbor/v2"
"github.com/rs/zerolog/log"
"github.com/stergiotis/boxer/public/observability/eh"
"github.com/stergiotis/boxer/public/observability/eh/eb"
"slices"
"strings"
)

type ParsedDqlQuery struct {
inputSql string
ast *chparser.SelectQuery

paramSet *ParamSet
paramSet *ParamSlotSet
paramSetErr error
noParams bool
}

func (inst *ParsedDqlQuery) String() string {
return inst.ast.String()
}
func (inst *ParsedDqlQuery) GetParamSet() (paramSet *ParamSet, err error) {
func (inst *ParsedDqlQuery) GetAst() *chparser.SelectQuery {
return inst.ast
}
func (inst *ParsedDqlQuery) GetParamSlotSet() (paramSet *ParamSlotSet, err error) {
if inst.noParams {
return
}
if inst.paramSet == nil && inst.paramSetErr != nil {
ps := NewParamSet()
d := newParamsDiscoverer()
if inst.paramSet == nil && inst.paramSetErr == nil {
ps := NewParamSlotsSet()
d := newParamSlotsDiscoverer()
err = d.discover(inst.ast, ps)
if err != nil {
err = eh.Errorf("error while discovering paramset: %w", err)
Expand Down Expand Up @@ -56,6 +62,31 @@ func NewParsedDqlQuery(sql string) (inst *ParsedDqlQuery, err error) {
}
return
}
func removeParamSettingsFromExprs(exprs []chparser.Expr) (exprsOut []chparser.Expr) {
for _, expr := range exprs {
switch exprt := expr.(type) {
case *chparser.SetStmt:
items := slices.DeleteFunc(exprt.Settings.Items, func(list *chparser.SettingExprList) bool {
name := list.Name.Name
if strings.HasPrefix(list.Name.Name, "param_") {
log.Info().Str("name", name).Msg("removing set param value expression")
return true
}
return false
})
exprt.Settings.Items = items
break
}
}
exprsOut = slices.DeleteFunc(exprs, func(expr chparser.Expr) bool {
switch exprt := expr.(type) {
case *chparser.SetStmt:
return len(exprt.Settings.Items) == 0
}
return false
})
return
}
func (inst *ParsedDqlQuery) parse() (err error) {
p := chparser.NewParser(inst.inputSql)
var exprs []chparser.Expr
Expand All @@ -64,6 +95,8 @@ func (inst *ParsedDqlQuery) parse() (err error) {
err = eh.Errorf("unable to parse sql: %w", err)
return
}

exprs = removeParamSettingsFromExprs(exprs)
if len(exprs) != 1 {
err = eb.Build().Int("nExprs", len(exprs)).Errorf("sql must contain exactly on expression")
return
Expand Down
16 changes: 9 additions & 7 deletions public/db/clickhouse/dsl/tableidtransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ func (inst *TableIdTransformer) Apply(ast chparser.Expr) (err error) {
}
switch astt := ast.(type) {
case *chparser.SelectQuery:
if astt.With == nil {
astt.With = &chparser.WithClause{
WithPos: 0,
EndPos: 0,
CTEs: inst.ctes,
if len(inst.ctes) > 0 {
if astt.With == nil {
astt.With = &chparser.WithClause{
WithPos: 0,
EndPos: 0,
CTEs: inst.ctes,
}
} else {
astt.With.CTEs = append(inst.ctes, astt.With.CTEs...)
}
} else {
astt.With.CTEs = append(inst.ctes, astt.With.CTEs...)
}
break
default:
Expand Down
2 changes: 2 additions & 0 deletions public/imzero/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"github.com/rs/zerolog/log"
"github.com/stergiotis/boxer/public/db/clickhouse/dsl"
"github.com/stergiotis/boxer/public/dev"
"github.com/stergiotis/boxer/public/fffi/compiletime"
"github.com/stergiotis/boxer/public/imzero/demo"
Expand Down Expand Up @@ -47,6 +48,7 @@ func main() {
dev.DebuggerFlags...),
dev.IoOverrideFlags...),
Commands: []*cli.Command{
dsl.NewCommand(),

Check failure on line 51 in public/imzero/main.go

View workflow job for this annotation

GitHub Actions / gotestsum

undefined: dsl.NewCommand

Check failure on line 51 in public/imzero/main.go

View workflow job for this annotation

GitHub Actions / gotestsum

undefined: dsl.NewCommand
compiletime.NewCommand(nil, nil),
demo.NewCommand(),
{
Expand Down
15 changes: 12 additions & 3 deletions public/observability/logging/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logging

import (
"encoding/json"
"github.com/fxamacker/cbor/v2"
"github.com/stergiotis/boxer/public/observability/eh"
"github.com/yassinebenaid/godump"
Expand Down Expand Up @@ -92,7 +93,7 @@ var LoggingFlags = []cli.Flag{
var cbordiagmode cbor.DiagMode
var cborencmode cbor.EncMode
var err error
if false {
if true {
cborencmode, err = cbor.CanonicalEncOptions().EncMode()
if err != nil {
log.Warn().Err(err).Msg("unable to create cbor encoder, skipping")
Expand All @@ -109,6 +110,10 @@ var LoggingFlags = []cli.Flag{
MaxArrayElements: 0,
MaxMapPairs: 0,
}.DiagMode()
if err != nil {
log.Warn().Err(err).Msg("unable to create cbor diagmode, skipping")
err = nil
}
}
zerolog.InterfaceMarshalFunc = func(v any) ([]byte, error) {
if cborencmode != nil && cbordiagmode != nil {
Expand All @@ -120,8 +125,12 @@ var LoggingFlags = []cli.Flag{
}
}
}
return []byte(dumper.Sprintln(v)), nil
//return json.MarshalIndent(v, "", " ")
var js []byte
js, err = json.MarshalIndent(v, "", " ")
if err != nil {
return []byte(dumper.Sprintln(v)), nil
}
return js, nil
}
break
case "diag":
Expand Down

0 comments on commit 9704921

Please sign in to comment.