|
18 | 18 | package processors_test
|
19 | 19 |
|
20 | 20 | import (
|
| 21 | + "fmt" |
21 | 22 | "testing"
|
22 | 23 | "time"
|
23 | 24 |
|
24 | 25 | "github.com/stretchr/testify/assert"
|
| 26 | + "github.com/stretchr/testify/require" |
25 | 27 |
|
26 | 28 | "github.com/elastic/beats/v7/libbeat/beat"
|
27 | 29 | "github.com/elastic/beats/v7/libbeat/processors"
|
28 | 30 | _ "github.com/elastic/beats/v7/libbeat/processors/actions"
|
29 | 31 | _ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
|
| 32 | + _ "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata" |
| 33 | + _ "github.com/elastic/beats/v7/libbeat/processors/convert" |
| 34 | + _ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields" |
| 35 | + _ "github.com/elastic/beats/v7/libbeat/processors/dissect" |
| 36 | + _ "github.com/elastic/beats/v7/libbeat/processors/extract_array" |
| 37 | + _ "github.com/elastic/beats/v7/libbeat/processors/urldecode" |
30 | 38 | conf "github.com/elastic/elastic-agent-libs/config"
|
31 | 39 | "github.com/elastic/elastic-agent-libs/logp"
|
32 | 40 | "github.com/elastic/elastic-agent-libs/mapstr"
|
@@ -566,3 +574,216 @@ func TestDropMissingFields(t *testing.T) {
|
566 | 574 |
|
567 | 575 | assert.Equal(t, expectedEvent, processedEvent.Fields)
|
568 | 576 | }
|
| 577 | + |
| 578 | +const ( |
| 579 | + fieldCount = 20 |
| 580 | + depth = 3 |
| 581 | +) |
| 582 | + |
| 583 | +func BenchmarkEventBackups(b *testing.B) { |
| 584 | + // listing all the processors that revert changes in case of an error |
| 585 | + yml := []map[string]interface{}{ |
| 586 | + { |
| 587 | + "append": map[string]interface{}{ |
| 588 | + "target_field": "append_target", |
| 589 | + "values": []interface{}{"third", "fourth"}, |
| 590 | + "fail_on_error": true, |
| 591 | + }, |
| 592 | + }, |
| 593 | + { |
| 594 | + "copy_fields": map[string]interface{}{ |
| 595 | + "fields": []map[string]interface{}{ |
| 596 | + { |
| 597 | + "from": "copy_from", |
| 598 | + "to": "copy.to", |
| 599 | + }, |
| 600 | + }, |
| 601 | + "fail_on_error": true, |
| 602 | + }, |
| 603 | + }, |
| 604 | + { |
| 605 | + "decode_base64_field": map[string]interface{}{ |
| 606 | + "field": map[string]interface{}{ |
| 607 | + "from": "base64_from", |
| 608 | + "to": "base64_to", |
| 609 | + }, |
| 610 | + "fail_on_error": true, |
| 611 | + }, |
| 612 | + }, |
| 613 | + { |
| 614 | + "decompress_gzip_field": map[string]interface{}{ |
| 615 | + "field": map[string]interface{}{ |
| 616 | + "from": "gzip_from", |
| 617 | + "to": "gzip_to", |
| 618 | + }, |
| 619 | + "fail_on_error": true, |
| 620 | + }, |
| 621 | + }, |
| 622 | + { |
| 623 | + "rename": map[string]interface{}{ |
| 624 | + "fields": []map[string]interface{}{ |
| 625 | + { |
| 626 | + "from": "rename_from", |
| 627 | + "to": "rename.to", |
| 628 | + }, |
| 629 | + }, |
| 630 | + "fail_on_error": true, |
| 631 | + }, |
| 632 | + }, |
| 633 | + { |
| 634 | + "replace": map[string]interface{}{ |
| 635 | + "fields": []map[string]interface{}{ |
| 636 | + { |
| 637 | + "field": "replace_test", |
| 638 | + "pattern": "to replace", |
| 639 | + "replacement": "replaced", |
| 640 | + }, |
| 641 | + }, |
| 642 | + "fail_on_error": true, |
| 643 | + }, |
| 644 | + }, |
| 645 | + { |
| 646 | + "truncate_fields": map[string]interface{}{ |
| 647 | + "fields": []interface{}{"to_truncate"}, |
| 648 | + "max_characters": 4, |
| 649 | + "fail_on_error": true, |
| 650 | + }, |
| 651 | + }, |
| 652 | + { |
| 653 | + "convert": map[string]interface{}{ |
| 654 | + "fields": []map[string]interface{}{ |
| 655 | + { |
| 656 | + "from": "convert_from", |
| 657 | + "to": "convert.to", |
| 658 | + "type": "integer", |
| 659 | + }, |
| 660 | + }, |
| 661 | + "fail_on_error": true, |
| 662 | + }, |
| 663 | + }, |
| 664 | + { |
| 665 | + "decode_csv_fields": map[string]interface{}{ |
| 666 | + "fields": map[string]interface{}{ |
| 667 | + "csv_from": "csv.to", |
| 668 | + }, |
| 669 | + "fail_on_error": true, |
| 670 | + }, |
| 671 | + }, |
| 672 | + // it creates a backup unless `ignore_failure` is true |
| 673 | + { |
| 674 | + "dissect": map[string]interface{}{ |
| 675 | + "tokenizer": "%{key1} %{key2}", |
| 676 | + "field": "to_dissect", |
| 677 | + }, |
| 678 | + }, |
| 679 | + { |
| 680 | + "extract_array": map[string]interface{}{ |
| 681 | + "field": "array_test", |
| 682 | + "mappings": map[string]interface{}{ |
| 683 | + "array_first": 0, |
| 684 | + "array_second": 1, |
| 685 | + }, |
| 686 | + "fail_on_error": true, |
| 687 | + }, |
| 688 | + }, |
| 689 | + { |
| 690 | + "urldecode": map[string]interface{}{ |
| 691 | + "fields": []map[string]interface{}{ |
| 692 | + { |
| 693 | + "from": "url_from", |
| 694 | + "to": "url.to", |
| 695 | + }, |
| 696 | + }, |
| 697 | + |
| 698 | + "fail_on_error": true, |
| 699 | + }, |
| 700 | + }, |
| 701 | + } |
| 702 | + |
| 703 | + processors := GetProcessors(b, yml) |
| 704 | + event := &beat.Event{ |
| 705 | + Timestamp: time.Now(), |
| 706 | + Meta: mapstr.M{}, |
| 707 | + Fields: mapstr.M{ |
| 708 | + "append_target": []interface{}{"first", "second"}, |
| 709 | + "copy_from": "to_copy", |
| 710 | + "base64_from": "dmFsdWU=", |
| 711 | + // "decompressed data" |
| 712 | + "gzip_from": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}), |
| 713 | + "rename_from": "renamed_value", |
| 714 | + "replace_test": "something to replace", |
| 715 | + "to_truncate": "something very long", |
| 716 | + "convert_from": "42", |
| 717 | + "csv_from": "1,2,3,4", |
| 718 | + "to_dissect": "some words", |
| 719 | + "array_test": []string{"first", "second"}, |
| 720 | + "url_from": "https%3A%2F%2Fwww.elastic.co%3Fsome", |
| 721 | + }, |
| 722 | + } |
| 723 | + |
| 724 | + expFields := mapstr.M{ |
| 725 | + "append_target": []interface{}{"first", "second", "third", "fourth"}, |
| 726 | + "copy_from": "to_copy", |
| 727 | + "copy": mapstr.M{ |
| 728 | + "to": "to_copy", |
| 729 | + }, |
| 730 | + "base64_from": "dmFsdWU=", |
| 731 | + "base64_to": "value", |
| 732 | + "gzip_from": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}), |
| 733 | + "gzip_to": "decompressed data", |
| 734 | + "rename": mapstr.M{"to": "renamed_value"}, |
| 735 | + "replace_test": "something replaced", |
| 736 | + "to_truncate": "some", |
| 737 | + "convert_from": "42", |
| 738 | + "convert": mapstr.M{"to": int32(42)}, |
| 739 | + "csv_from": "1,2,3,4", |
| 740 | + "csv": mapstr.M{"to": []string{"1", "2", "3", "4"}}, |
| 741 | + "to_dissect": "some words", |
| 742 | + "dissect": mapstr.M{ |
| 743 | + "key1": "some", |
| 744 | + "key2": "words", |
| 745 | + }, |
| 746 | + "array_test": []string{"first", "second"}, |
| 747 | + "array_first": "first", |
| 748 | + "array_second": "second", |
| 749 | + "url_from": "https%3A%2F%2Fwww.elastic.co%3Fsome", |
| 750 | + "url": mapstr.M{"to": "https://www.elastic.co?some"}, |
| 751 | + } |
| 752 | + |
| 753 | + generateFields(b, event.Meta, fieldCount, depth) |
| 754 | + generateFields(b, event.Fields, fieldCount, depth) |
| 755 | + |
| 756 | + var ( |
| 757 | + result *beat.Event |
| 758 | + clone *beat.Event |
| 759 | + err error |
| 760 | + ) |
| 761 | + |
| 762 | + b.Run("run processors that use backups", func(b *testing.B) { |
| 763 | + for i := 0; i < b.N; i++ { |
| 764 | + clone = event.Clone() // necessary for making and comparing changes |
| 765 | + result, err = processors.Run(clone) |
| 766 | + } |
| 767 | + require.NoError(b, err) |
| 768 | + require.NotNil(b, result) |
| 769 | + }) |
| 770 | + |
| 771 | + require.Equal(b, fmt.Sprintf("%p", clone), fmt.Sprintf("%p", result), "should be the same event") |
| 772 | + for key := range expFields { |
| 773 | + require.Equal(b, expFields[key], clone.Fields[key], fmt.Sprintf("%s does not match", key)) |
| 774 | + } |
| 775 | +} |
| 776 | + |
| 777 | +func generateFields(t require.TestingT, m mapstr.M, count, nesting int) { |
| 778 | + for i := 0; i < count; i++ { |
| 779 | + var err error |
| 780 | + if nesting == 0 { |
| 781 | + _, err = m.Put(fmt.Sprintf("field-%d", i), fmt.Sprintf("value-%d", i)) |
| 782 | + } else { |
| 783 | + nested := mapstr.M{} |
| 784 | + generateFields(t, nested, count, nesting-1) |
| 785 | + _, err = m.Put(fmt.Sprintf("field-%d", i), nested) |
| 786 | + } |
| 787 | + require.NoError(t, err) |
| 788 | + } |
| 789 | +} |
0 commit comments