diff --git a/.chloggen/nslaughter_update-azure-sdk.yaml b/.chloggen/nslaughter_update-azure-sdk.yaml new file mode 100755 index 000000000000..3c8d4ebbba39 --- /dev/null +++ b/.chloggen/nslaughter_update-azure-sdk.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: azureeventhubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Migrate to recommended Azure SDK modules + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31252] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/go.mod b/go.mod index 2e3992496528..83cee2866fa1 100644 --- a/go.mod +++ b/go.mod @@ -232,7 +232,6 @@ require ( github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest v0.11.29 // indirect github.com/Azure/go-autorest/autorest/adal v0.9.23 // indirect - github.com/Azure/go-autorest/autorest/azure/auth v0.5.12 // indirect github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect diff --git a/go.sum b/go.sum index de053b0b9cd8..dfc4d1351f97 100644 --- a/go.sum +++ b/go.sum @@ -618,8 +618,8 @@ cloud.google.com/go/workflows v1.8.0/go.mod h1:ysGhmEajwZxGn1OhGOGKsTXc5PyxOc0vf cloud.google.com/go/workflows v1.9.0/go.mod h1:ZGkj1aFIOd9c8Gerkjjq7OW7I5+l6cSvT3ujaO/WwSA= cloud.google.com/go/workflows v1.10.0/go.mod h1:fZ8LmRmZQWacon9UCX1r/g/DfAXx5VcPALq2CxzdePw= code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= -code.cloudfoundry.org/clock v1.0.0 h1:kFXWQM4bxYvdBw2X8BbBeXwQNgfoWv1vqAk2ZZyBN2o= -code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= +code.cloudfoundry.org/clock v1.1.0 h1:XLzC6W3Ah/Y7ht1rmZ6+QfPdt1iGWEAAtIZXgiaj57c= +code.cloudfoundry.org/clock v1.1.0/go.mod h1:yA3fxddT9RINQL2XHS7PS+OXxKCGhfrZmlNUCIM6AKo= code.cloudfoundry.org/go-diodes v0.0.0-20211115184647-b584dd5df32c h1:N2GMlHc/SJQk7BkaME/kDHaciVTy4NuRmxVJLhnqKK8= code.cloudfoundry.org/go-diodes v0.0.0-20211115184647-b584dd5df32c/go.mod h1:o7lq/SmHshDVxHdRJ/fMT3VPcoXyE1HcRXbG8QibO3k= code.cloudfoundry.org/go-loggregator v7.4.0+incompatible h1:KqZYloMQWM5Zg/BQKunOIA4OODh7djZbk48qqbowNFI= @@ -686,18 +686,16 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg6 github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.11.12/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw= -github.com/Azure/go-autorest/autorest v0.11.24/go.mod h1:G6kyRlFnTuSbEYkQGawPfsCswgme4iYf6rfSKUDzbCc= github.com/Azure/go-autorest/autorest v0.11.29 h1:I4+HL/JDvErx2LjyzaVxllw2lRDB5/BT2Bm4g20iqYw= github.com/Azure/go-autorest/autorest v0.11.29/go.mod h1:ZtEzC4Jy2JDrZLxvWs8LrBWEBycl1hbT1eknI8MtfAs= github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A= -github.com/Azure/go-autorest/autorest/adal v0.9.18/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ= github.com/Azure/go-autorest/autorest/adal v0.9.22/go.mod h1:XuAbAEUv2Tta//+voMI038TrJBqjKam0me7qR+L8Cmk= github.com/Azure/go-autorest/autorest/adal v0.9.23 h1:Yepx8CvFxwNKpH6ja7RZ+sKX+DWYNldbLiALMC3BTz8= github.com/Azure/go-autorest/autorest/adal v0.9.23/go.mod h1:5pcMqFkdPhviJdlEy3kC/v1ZLnQl0MH6XA5YCcMhy4c= -github.com/Azure/go-autorest/autorest/azure/auth v0.5.12 h1:wkAZRgT/pn8HhFyzfe9UnqOjJYqlembgCTi72Bm/xKk= -github.com/Azure/go-autorest/autorest/azure/auth v0.5.12/go.mod h1:84w/uV8E37feW2NCJ08uT9VBfjfUHpgLVnG2InYD6cg= -github.com/Azure/go-autorest/autorest/azure/cli v0.4.5 h1:0W/yGmFdTIT77fvdlGZ0LMISoLHFJ7Tx4U0yeB+uFs4= -github.com/Azure/go-autorest/autorest/azure/cli v0.4.5/go.mod h1:ADQAXrkgm7acgWVUNamOgh8YNrv4p27l3Wc55oVfpzg= +github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 h1:iM6UAvjR97ZIeR93qTcwpKNMpV+/FTWjwEbuPD495Tk= +github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM= +github.com/Azure/go-autorest/autorest/azure/cli v0.3.1 h1:LXl088ZQlP0SBppGFsRZonW6hSvwgL5gRByMbvUbx8U= +github.com/Azure/go-autorest/autorest/azure/cli v0.3.1/go.mod h1:ZG5p860J94/0kI9mNJVoIoLgXcirM2gF5i2kWloofxw= github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw= github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= @@ -1385,7 +1383,6 @@ github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzq github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= -github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= @@ -2472,20 +2469,14 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= -golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2797,10 +2788,9 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -3298,42 +3288,10 @@ k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdz k8s.io/kubelet v0.29.3 h1:X9h0ZHzc+eUeNTaksbN0ItHyvGhQ7Z0HPjnQD2oHdwU= k8s.io/kubelet v0.29.3/go.mod h1:jDiGuTkFOUynyBKzOoC1xRSWlgAZ9UPcTYeFyjr6vas= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 h1:jgGTlFYnhF1PM1Ax/lAlxUPE+KfCIXHaathvJg1C3ak= -k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= -lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= -modernc.org/cc/v3 v3.36.0/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= -modernc.org/cc/v3 v3.36.2/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= -modernc.org/cc/v3 v3.36.3/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= -modernc.org/ccgo/v3 v3.0.0-20220428102840-41399a37e894/go.mod h1:eI31LL8EwEBKPpNpA4bU1/i+sKOwOrQy8D87zWUcRZc= -modernc.org/ccgo/v3 v3.0.0-20220430103911-bc99d88307be/go.mod h1:bwdAnOoaIt8Ax9YdWGjxWsdkPcZyRPHqrOvJxaKAKGw= -modernc.org/ccgo/v3 v3.16.4/go.mod h1:tGtX0gE9Jn7hdZFeU88slbTh1UtCYKusWOoCJuvkWsQ= -modernc.org/ccgo/v3 v3.16.6/go.mod h1:tGtX0gE9Jn7hdZFeU88slbTh1UtCYKusWOoCJuvkWsQ= -modernc.org/ccgo/v3 v3.16.8/go.mod h1:zNjwkizS+fIFDrDjIAgBSCLkWbJuHF+ar3QRn+Z9aws= -modernc.org/ccgo/v3 v3.16.9/go.mod h1:zNMzC9A9xeNUepy6KuZBbugn3c0Mc9TeiJO4lgvkJDo= -modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ= -modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM= -modernc.org/libc v0.0.0-20220428101251-2d5f3daf273b/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA= -modernc.org/libc v1.16.0/go.mod h1:N4LD6DBE9cf+Dzf9buBlzVJndKr/iJHG97vGLHYnb5A= -modernc.org/libc v1.16.1/go.mod h1:JjJE0eu4yeK7tab2n4S1w8tlWd9MxXLRzheaRnAKymU= -modernc.org/libc v1.16.17/go.mod h1:hYIV5VZczAmGZAnG15Vdngn5HSF5cSkbvfz2B7GRuVU= -modernc.org/libc v1.16.19/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA= -modernc.org/libc v1.17.0/go.mod h1:XsgLldpP4aWlPlsjqKRdHPqCxCjISdHfM/yeWC5GyW0= -modernc.org/libc v1.17.1/go.mod h1:FZ23b+8LjxZs7XtFMbSzL/EhPxNbfZbErxEHc7cbD9s= -modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/memory v1.1.1/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw= -modernc.org/memory v1.2.0/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw= -modernc.org/memory v1.2.1/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU= -modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= -modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= -modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4= -modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw= -modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= -modernc.org/tcl v1.13.1/go.mod h1:XOLfOwzhkljL4itZkK6T72ckMgvj0BDsnKNdZVUOecw= -modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= -modernc.org/z v1.5.1/go.mod h1:eWFB510QWW5Th9YGZT81s+LwvaAs3Q2yr4sP0rmLkv8= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/pkg/translator/azure/resourcelogs_to_logs.go b/pkg/translator/azure/resourcelogs_to_logs.go index b2c96d28f568..9585218172fe 100644 --- a/pkg/translator/azure/resourcelogs_to_logs.go +++ b/pkg/translator/azure/resourcelogs_to_logs.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "strconv" + "time" jsoniter "github.com/json-iterator/go" "github.com/relvacode/iso8601" @@ -109,12 +110,18 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) { log := logs[i] nanos, err := getTimestamp(log) if err != nil { - r.Logger.Warn("Unable to convert timestamp from log", zap.String("timestamp", log.Time)) + r.Logger.Warn("Unable to convert timestamp from log", zap.String("time", log.Time), zap.String("timestamp", log.Timestamp)) + r.Logger.Debug("unable to convert timestamp log record", zap.Any("record", log)) continue } lr := logRecords.AppendEmpty() - lr.SetTimestamp(nanos) + // only set timestamp if the record sent included a valid value + if err == nil { + lr.SetTimestamp(nanos) + } + // always set observed timestamp to time observed by Collector + lr.SetObservedTimestamp(pcommon.Timestamp(time.Now().UnixNano())) if log.Level != nil { severity := asSeverity(*log.Level) diff --git a/pkg/translator/azure/resourcelogs_to_logs_test.go b/pkg/translator/azure/resourcelogs_to_logs_test.go index 37bb67df9987..f243c3a63cf6 100644 --- a/pkg/translator/azure/resourcelogs_to_logs_test.go +++ b/pkg/translator/azure/resourcelogs_to_logs_test.go @@ -462,7 +462,7 @@ func TestUnmarshalLogs(t *testing.T) { logs, err := sut.UnmarshalLogs(data) assert.NoError(t, err) - assert.NoError(t, plogtest.CompareLogs(tt.expected, logs)) + assert.NoError(t, plogtest.CompareLogs(tt.expected, logs, plogtest.IgnoreObservedTimestamp())) }) } } diff --git a/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler.go index 1665c9f2d833..01073fcc5fd4 100644 --- a/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler.go @@ -4,7 +4,7 @@ package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver" import ( - eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" @@ -17,7 +17,6 @@ type AzureResourceLogsEventUnmarshaler struct { } func newAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventLogsUnmarshaler { - return AzureResourceLogsEventUnmarshaler{ unmarshaler: &azure.ResourceLogsUnmarshaler{ Version: buildInfo.Version, @@ -32,7 +31,6 @@ func newAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap. // log record appears as fields and attributes in the // OpenTelemetry representation; the bodies of the // OpenTelemetry log records are empty. -func (r AzureResourceLogsEventUnmarshaler) UnmarshalLogs(event *eventhub.Event) (plog.Logs, error) { - - return r.unmarshaler.UnmarshalLogs(event.Data) +func (r AzureResourceLogsEventUnmarshaler) UnmarshalLogs(event *azeventhubs.ReceivedEventData) (plog.Logs, error) { + return r.unmarshaler.UnmarshalLogs(event.Body) } diff --git a/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler_test.go b/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler_test.go new file mode 100644 index 000000000000..82a1f2a589ba --- /dev/null +++ b/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler_test.go @@ -0,0 +1,118 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package azureeventhubreceiver + +import ( + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +// TestUnmarshalLogs_Body should succeed regardless of body content type +func TestUnmarshalLogs_Body(t *testing.T) { + logger := zap.NewNop() + unmarshaler := newRawLogsUnmarshaler(logger) + + testCases := []struct { + name string + body []byte + expect []byte + }{ + { + name: "empty body", + body: []byte(""), + // note that zero length body sets otlp Body to []byte(nil) not []byte{} + expect: []byte(nil), + }, + { + name: "nil body", + body: []byte(nil), + expect: []byte(nil), + }, + { + name: "invalid json", + body: []byte("{malformed-json"), + expect: []byte("{malformed-json"), + }, + { + name: "valid json", + body: []byte(`{"key": "value"}`), + expect: []byte(`{"key": "value"}`), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + event := &azeventhubs.ReceivedEventData{ + EventData: azeventhubs.EventData{ + Body: tc.body, + Properties: map[string]interface{}{"someKey": "someValue"}, + }, + } + logs, err := unmarshaler.UnmarshalLogs(event) + require.NoError(t, err, "Expected no error for valid event data but got one") + require.Equal(t, 1, logs.ResourceLogs().Len(), "Expected 1 ResourceLog") + otlpBody := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().Bytes().AsRaw() + require.Equal(t, otlpBody, tc.expect) + }) + } +} + +func TestUnmarshalLogs_Attributes(t *testing.T) { + logger := zap.NewNop() + unmarshaler := newRawLogsUnmarshaler(logger) + + testCases := []struct { + name string + properties map[string]any + expect map[string]any + }{ + { + name: "empty properties", + properties: map[string]any{}, + expect: map[string]any(nil), + }, + { + name: "nil properties", + properties: map[string]any(nil), + expect: map[string]any(nil), + }, + { + name: "single property", + properties: map[string]interface{}{"someKey": "someValue"}, + expect: map[string]interface{}{"someKey": "someValue"}, + }, + { + name: "multiple properties", + properties: map[string]interface{}{"someKey": "someValue", "anotherKey": "anotherValue"}, + expect: map[string]interface{}{"someKey": "someValue", "anotherKey": "anotherValue"}, + }, + } + + var et = time.Now() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + event := &azeventhubs.ReceivedEventData{ + EnqueuedTime: &et, + SystemProperties: map[string]interface{}{"syskey1": "sysval1", "syskey2": "sysval2"}, + EventData: azeventhubs.EventData{ + Body: []byte(""), + Properties: tc.properties, + }, + } + logs, err := unmarshaler.UnmarshalLogs(event) + require.NoError(t, err, "Expected no error for valid event data but got one") + resAttrs := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw() + // this is because + require.Equal(t, len(resAttrs), len(tc.expect), "Expected %d attributes, got %d", len(tc.expect), len(resAttrs)) + if len(resAttrs) == 0 { + return + } + + require.Equal(t, resAttrs, tc.expect, "Not equal:\nexpected: %v\nactual: %v", tc.expect, resAttrs) + }) + } +} diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index 16614c680a99..2e739d194ce2 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -9,7 +9,7 @@ import ( "strings" "time" - eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" jsoniter "github.com/json-iterator/go" "github.com/relvacode/iso8601" "go.opentelemetry.io/collector/component" @@ -49,8 +49,7 @@ type azureMetricRecord struct { Average float64 `json:"average"` } -func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventMetricsUnmarshaler { - +func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) azureResourceMetricsUnmarshaler { return azureResourceMetricsUnmarshaler{ buildInfo: buildInfo, logger: logger, @@ -62,12 +61,12 @@ func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *z // an OpenTelemetry pmetric.Metrics object. The data in the Azure // metric record appears as fields and attributes in the // OpenTelemetry representation; -func (r azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *eventhub.Event) (pmetric.Metrics, error) { +func (r azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *azeventhubs.ReceivedEventData) (pmetric.Metrics, error) { md := pmetric.NewMetrics() var azureMetrics azureMetricRecords - decoder := jsoniter.NewDecoder(bytes.NewReader(event.Data)) + decoder := jsoniter.NewDecoder(bytes.NewReader(event.EventData.Body)) err := decoder.Decode(&azureMetrics) if err != nil { return md, err @@ -150,9 +149,8 @@ func (r azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *eventhub.Event) return md, nil } -// asTimestamp will parse an ISO8601 string into an OpenTelemetry -// nanosecond timestamp. If the string cannot be parsed, it will -// return zero and the error. +// asTimestamp will parse an ISO8601 string into an OpenTelemetry nanosecond timestamp. +// If the string cannot be parsed, it will return zero and the error. func asTimestamp(s string) (pcommon.Timestamp, error) { t, err := iso8601.ParseString(s) if err != nil { diff --git a/receiver/azureeventhubreceiver/eventhubhandler.go b/receiver/azureeventhubreceiver/eventhubhandler.go index 0a8aeadd0d62..e5c1031ef395 100644 --- a/receiver/azureeventhubreceiver/eventhubhandler.go +++ b/receiver/azureeventhubreceiver/eventhubhandler.go @@ -5,13 +5,20 @@ package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry import ( "context" + "errors" + "strings" + "sync" + "time" - eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" +) - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" +const ( + batchCount = 100 ) type hubWrapper interface { @@ -20,26 +27,42 @@ type hubWrapper interface { Close(ctx context.Context) error } -type hubWrapperImpl struct { - hub *eventhub.Hub +type consumerClientWrapperImpl struct { + consumerClient *azeventhubs.ConsumerClient +} + +func newConsumerClientWrapperImplementation(cfg *Config) (*consumerClientWrapperImpl, error) { + splits := strings.Split(cfg.Connection, "/") + eventhubName := splits[len(splits)-1] + // if that didn't work it's ok as the SDK will try to parse it to create the client + + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(cfg.Connection, eventhubName, cfg.ConsumerGroup, nil) + if err != nil { + return nil, err + } + return &consumerClientWrapperImpl{ + consumerClient: consumerClient, + }, nil +} + +func (c *consumerClientWrapperImpl) GetEventHubProperties(ctx context.Context, options *azeventhubs.GetEventHubPropertiesOptions) (azeventhubs.EventHubProperties, error) { + return c.consumerClient.GetEventHubProperties(ctx, options) } -func (h *hubWrapperImpl) GetRuntimeInformation(ctx context.Context) (*eventhub.HubRuntimeInformation, error) { - return h.hub.GetRuntimeInformation(ctx) +func (c *consumerClientWrapperImpl) GetPartitionProperties(ctx context.Context, partitionID string, options *azeventhubs.GetPartitionPropertiesOptions) (azeventhubs.PartitionProperties, error) { + return c.consumerClient.GetPartitionProperties(ctx, partitionID, options) } -func (h *hubWrapperImpl) Receive(ctx context.Context, partitionID string, handler eventhub.Handler, opts ...eventhub.ReceiveOption) (listerHandleWrapper, error) { - l, err := h.hub.Receive(ctx, partitionID, handler, opts...) - return l, err +func (c *consumerClientWrapperImpl) NewConsumer(_ context.Context, _ *azeventhubs.ConsumerClientOptions) (*azeventhubs.ConsumerClient, error) { + return c.consumerClient, nil } -func (h *hubWrapperImpl) Close(ctx context.Context) error { - return h.hub.Close(ctx) +func (c *consumerClientWrapperImpl) NewPartitionClient(partitionID string, options *azeventhubs.PartitionClientOptions) (*azeventhubs.PartitionClient, error) { + return c.consumerClient.NewPartitionClient(partitionID, options) } -type listerHandleWrapper interface { - Done() <-chan struct{} - Err() error +func (c *consumerClientWrapperImpl) Close(ctx context.Context) error { + return c.consumerClient.Close(ctx) } type eventhubHandler struct { @@ -53,122 +76,168 @@ type eventhubHandler struct { func (h *eventhubHandler) run(ctx context.Context, host component.Host) error { ctx, h.cancel = context.WithCancel(ctx) - storageClient, err := adapter.GetStorageClient(ctx, host, h.config.StorageID, h.settings.ID) + processor, err := azeventhubs.NewProcessor(consumerClientImpl.consumerClient, checkpointStore, nil) if err != nil { - h.settings.Logger.Debug("Error connecting to Storage", zap.Error(err)) + h.settings.Logger.Debug("Error creating Processor", zap.Error(err)) return err } - if h.hub == nil { // set manually for testing. - hub, newHubErr := eventhub.NewHubFromConnectionString(h.config.Connection, eventhub.HubWithOffsetPersistence(&storageCheckpointPersister{storageClient: storageClient})) - if newHubErr != nil { - h.settings.Logger.Debug("Error connecting to Event Hub", zap.Error(newHubErr)) - return newHubErr + processorCtx, processorCancel := context.WithCancel(ctx) + go h.dispatchPartitionClients(processor) + defer processorCancel() + + return processor.Run(processorCtx) +} + +func (h *eventhubHandler) dispatchPartitionClients(processor *azeventhubs.Processor) { + var wg sync.WaitGroup + for { + partitionClient := processor.NextPartitionClient(context.TODO()) + + if partitionClient == nil { + break + } + + wg.Add(1) + go func(pc *azeventhubs.ProcessorPartitionClient) { + defer wg.Done() + if err := h.processEventsForPartition(pc); err != nil { + h.settings.Logger.Error("Error processing partition", zap.Error(err)) + } + }(partitionClient) + } + wg.Wait() +} + +func (h *eventhubHandler) processEventsForPartition(partitionClient *azeventhubs.ProcessorPartitionClient) error { + defer partitionClient.Close(context.TODO()) + + for { + receiveCtx, cancelReceive := context.WithTimeout(context.TODO(), time.Minute) + events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) + cancelReceive() + + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + var eventHubError *azeventhubs.Error + if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost { + return nil + } + return err + } + + if len(events) == 0 { + continue } - h.hub = &hubWrapperImpl{ - hub: hub, + + for _, event := range events { + if err := h.newMessageHandler(context.TODO(), event); err != nil { + h.settings.Logger.Error("Error handling event", zap.Error(err)) + } + } + + if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil { + h.settings.Logger.Error("Error updating checkpoint", zap.Error(err)) } } +} +func (h *eventhubHandler) runWithConsumerClient(ctx context.Context, _ component.Host) error { + if h.consumerClient == nil { + if err := h.init(ctx); err != nil { + return err + } + } if h.config.Partition == "" { - // listen to each partition of the Event Hub - var runtimeInfo *eventhub.HubRuntimeInformation - runtimeInfo, err = h.hub.GetRuntimeInformation(ctx) + properties, err := h.consumerClient.GetEventHubProperties(ctx, nil) if err != nil { - h.settings.Logger.Debug("Error getting Runtime Information", zap.Error(err)) + h.settings.Logger.Debug("Error getting Event Hub properties", zap.Error(err)) return err } - for _, partitionID := range runtimeInfo.PartitionIDs { - err = h.setUpOnePartition(ctx, partitionID, false) + for _, partitionID := range properties.PartitionIDs { + err = h.setupPartition(ctx, partitionID) if err != nil { h.settings.Logger.Debug("Error setting up partition", zap.Error(err)) return err } } } else { - err = h.setUpOnePartition(ctx, h.config.Partition, true) + err := h.setupPartition(ctx, h.config.Partition) if err != nil { h.settings.Logger.Debug("Error setting up partition", zap.Error(err)) return err } } + return nil +} - if h.hub != nil { - return nil - } - - hub, err := eventhub.NewHubFromConnectionString(h.config.Connection) +func (h *eventhubHandler) setupPartition(ctx context.Context, partitionID string) error { + cc, err := h.consumerClient.NewConsumer(ctx, nil) if err != nil { return err } + if cc == nil { + return errors.New("failed to initialize consumer client") + } + defer cc.Close(ctx) - h.hub = &hubWrapperImpl{ - hub: hub, + pcOpts := &azeventhubs.PartitionClientOptions{ + StartPosition: azeventhubs.StartPosition{ + Earliest: to.Ptr(true), + }, } - runtimeInfo, err := hub.GetRuntimeInformation(ctx) + pc, err := cc.NewPartitionClient(partitionID, pcOpts) if err != nil { return err } - - for _, partitionID := range runtimeInfo.PartitionIDs { - _, err := hub.Receive(ctx, partitionID, h.newMessageHandler, eventhub.ReceiveWithLatestOffset()) - if err != nil { - return err - } + if pc == nil { + return errors.New("failed to initialize partition client") } + defer func() { + if pc != nil { + pc.Close(ctx) + } + }() + + go h.receivePartitionEvents(ctx, pc) return nil } -func (h *eventhubHandler) setUpOnePartition(ctx context.Context, partitionID string, applyOffset bool) error { - - receiverOptions := []eventhub.ReceiveOption{} - if applyOffset && h.config.Offset != "" { - receiverOptions = append(receiverOptions, eventhub.ReceiveWithStartingOffset(h.config.Offset)) - } else { - receiverOptions = append(receiverOptions, eventhub.ReceiveWithLatestOffset()) - } - - if h.config.ConsumerGroup != "" { - receiverOptions = append(receiverOptions, eventhub.ReceiveWithConsumerGroup(h.config.ConsumerGroup)) - } - - handle, err := h.hub.Receive(ctx, partitionID, h.newMessageHandler, receiverOptions...) - if err != nil { - return err - } - go func() { - <-handle.Done() - err := handle.Err() - if err != nil { - h.settings.Logger.Error("Error reported by event hub", zap.Error(err)) +func (h *eventhubHandler) receivePartitionEvents(ctx context.Context, pc *azeventhubs.PartitionClient) { + for { + rcvCtx, rcvCtxCancel := context.WithTimeout(context.TODO(), time.Second*10) + events, err := pc.ReceiveEvents(rcvCtx, batchCount, nil) + rcvCtxCancel() + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + h.settings.Logger.Error("Error receiving events", zap.Error(err)) } - }() - return nil + for _, event := range events { + if err := h.newMessageHandler(ctx, event); err != nil { + h.settings.Logger.Error("Error handling event", zap.Error(err)) + } + } + } } -func (h *eventhubHandler) newMessageHandler(ctx context.Context, event *eventhub.Event) error { - +func (h *eventhubHandler) newMessageHandler(ctx context.Context, event *azeventhubs.ReceivedEventData) error { err := h.dataConsumer.consume(ctx, event) if err != nil { - h.settings.Logger.Error("error decoding message", zap.Error(err)) + h.settings.Logger.Error("Error decoding message", zap.Error(err)) return err } - return nil } func (h *eventhubHandler) close(ctx context.Context) error { - - if h.hub != nil { - err := h.hub.Close(ctx) + if h.consumerClient != nil { + err := h.consumerClient.Close(ctx) if err != nil { return err } - h.hub = nil + h.consumerClient = nil } if h.cancel != nil { h.cancel() @@ -178,7 +247,6 @@ func (h *eventhubHandler) close(ctx context.Context) error { } func (h *eventhubHandler) setDataConsumer(dataConsumer dataConsumer) { - h.dataConsumer = dataConsumer } diff --git a/receiver/azureeventhubreceiver/eventhubhandler_test.go b/receiver/azureeventhubreceiver/eventhubhandler_test.go index 772b6faed724..6226713e845a 100644 --- a/receiver/azureeventhubreceiver/eventhubhandler_test.go +++ b/receiver/azureeventhubreceiver/eventhubhandler_test.go @@ -8,80 +8,80 @@ import ( "testing" "time" - eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/mock" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver/internal/metadata" ) -type mockHubWrapper struct { +type MockConsumerClientWrapper struct { + mock.Mock } -func (m mockHubWrapper) GetRuntimeInformation(_ context.Context) (*eventhub.HubRuntimeInformation, error) { - return &eventhub.HubRuntimeInformation{ - Path: "foo", - CreatedAt: time.Now(), - PartitionCount: 1, - PartitionIDs: []string{"foo"}, - }, nil +func (m *MockConsumerClientWrapper) GetEventHubProperties(ctx context.Context, options *azeventhubs.GetEventHubPropertiesOptions) (azeventhubs.EventHubProperties, error) { + args := m.Called(ctx, options) + return args.Get(0).(azeventhubs.EventHubProperties), args.Error(1) } -func (m mockHubWrapper) Receive(ctx context.Context, _ string, _ eventhub.Handler, _ ...eventhub.ReceiveOption) (listerHandleWrapper, error) { - return &mockListenerHandleWrapper{ - ctx: ctx, - }, nil +func (m *MockConsumerClientWrapper) GetPartitionProperties(ctx context.Context, partitionID string, options *azeventhubs.GetPartitionPropertiesOptions) (azeventhubs.PartitionProperties, error) { + args := m.Called(ctx, partitionID, options) + return args.Get(0).(azeventhubs.PartitionProperties), args.Error(1) } -func (m mockHubWrapper) Close(_ context.Context) error { - return nil +func (m *MockConsumerClientWrapper) NewConsumer(ctx context.Context, options *azeventhubs.ConsumerClientOptions) (*azeventhubs.ConsumerClient, error) { + args := m.Called(ctx, options) + return args.Get(0).(*azeventhubs.ConsumerClient), args.Error(1) } -type mockListenerHandleWrapper struct { - ctx context.Context +func (m *MockConsumerClientWrapper) NewPartitionClient(partitionID string, options *azeventhubs.PartitionClientOptions) (*azeventhubs.PartitionClient, error) { + args := m.Called(partitionID, options) + return args.Get(0).(*azeventhubs.PartitionClient), args.Error(1) } -func (m *mockListenerHandleWrapper) Done() <-chan struct{} { - return m.ctx.Done() +func (m *MockConsumerClientWrapper) Close(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(1) } -func (m mockListenerHandleWrapper) Err() error { - return nil -} +func TestEventHubHandler_Start(t *testing.T) { + logger := zap.NewNop() + settings := receiver.CreateSettings{ + TelemetrySettings: component.TelemetrySettings{Logger: logger}, + } + config := &Config{Connection: "Endpoint=sb://namespace.servicebus.windows.net/;EntityPath=hubName", ConsumerGroup: "$Default"} -type mockDataConsumer struct { - logsUnmarshaler eventLogsUnmarshaler - nextLogsConsumer consumer.Logs - obsrecv *receiverhelper.ObsReport -} + mockConsumerClient := new(MockConsumerClientWrapper) -func (m *mockDataConsumer) setNextLogsConsumer(nextLogsConsumer consumer.Logs) { - m.nextLogsConsumer = nextLogsConsumer -} + handler := newEventhubHandler(config, settings) + handler.consumerClient = mockConsumerClient -func (m *mockDataConsumer) setNextMetricsConsumer(_ consumer.Metrics) {} + host := componenttest.NewNopHost() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() -func (m *mockDataConsumer) consume(ctx context.Context, event *eventhub.Event) error { + err := handler.run(ctx, host) + assert.NoError(t, err) - logsContext := m.obsrecv.StartLogsOp(ctx) + mockConsumerClient.AssertExpectations(t) +} - logs, err := m.logsUnmarshaler.UnmarshalLogs(event) - if err != nil { - return err +func TestEventHubHandler_HandleEvent(t *testing.T) { + logger := zap.NewNop() + settings := receiver.CreateSettings{ + TelemetrySettings: component.TelemetrySettings{Logger: logger}, } + config := &Config{Connection: "Endpoint=sb://namespace.servicebus.windows.net/;EntityPath=hubName", ConsumerGroup: "$Default"} - err = m.nextLogsConsumer.ConsumeLogs(logsContext, logs) - m.obsrecv.EndLogsOp(logsContext, metadata.Type.String(), 1, err) - - return err -} + mockConsumerClient := new(MockConsumerClientWrapper) + event := &azeventhubs.ReceivedEventData{ + EventData: azeventhubs.EventData{ + Body: []byte(`{"message":"test"}`), + }, + } func TestEventhubHandler_Start(t *testing.T) { config := createDefaultConfig() @@ -124,26 +124,14 @@ func TestEventhubHandler_newMessageHandler(t *testing.T) { assert.NoError(t, ehHandler.run(context.Background(), componenttest.NewNopHost())) - now := time.Now() - err = ehHandler.newMessageHandler(context.Background(), &eventhub.Event{ - Data: []byte("hello"), - PartitionKey: nil, - Properties: map[string]any{"foo": "bar"}, - ID: "11234", - SystemProperties: &eventhub.SystemProperties{ - SequenceNumber: nil, - EnqueuedTime: &now, - Offset: nil, - PartitionID: nil, - PartitionKey: nil, - Annotations: nil, - }, - }) +func (m *MockDataConsumer) consume(ctx context.Context, event *azeventhubs.ReceivedEventData) error { + args := m.Called(ctx, event) + return args.Error(0) +} - assert.NoError(t, err) - assert.Len(t, sink.AllLogs(), 1) - assert.Equal(t, 1, sink.AllLogs()[0].LogRecordCount()) - assert.Equal(t, []byte("hello"), sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().Bytes().AsRaw()) +func (m *MockDataConsumer) setNextLogsConsumer(c consumer.Logs) { + _ = m.Called(c) +} read, ok := sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Get("foo") assert.True(t, ok) diff --git a/receiver/azureeventhubreceiver/factory_test.go b/receiver/azureeventhubreceiver/factory_test.go index 3d005c23b62d..6d03ec288eb0 100644 --- a/receiver/azureeventhubreceiver/factory_test.go +++ b/receiver/azureeventhubreceiver/factory_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver" +package azureeventhubreceiver // import "github.com.open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver" import ( "context" diff --git a/receiver/azureeventhubreceiver/go.mod b/receiver/azureeventhubreceiver/go.mod index 9e6404524319..ac1d47285e4c 100644 --- a/receiver/azureeventhubreceiver/go.mod +++ b/receiver/azureeventhubreceiver/go.mod @@ -4,7 +4,8 @@ go 1.21.0 require ( github.com/Azure/azure-amqp-common-go/v4 v4.2.0 - github.com/Azure/azure-event-hubs-go/v3 v3.6.2 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 + github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.1.0 github.com/json-iterator/go v1.1.12 github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.102.0 @@ -26,16 +27,9 @@ require ( ) require ( - github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect - github.com/Azure/go-amqp v1.0.2 // indirect - github.com/Azure/go-autorest v14.2.0+incompatible // indirect - github.com/Azure/go-autorest/autorest v0.11.28 // indirect - github.com/Azure/go-autorest/autorest/adal v0.9.21 // indirect - github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect - github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect - github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect - github.com/Azure/go-autorest/logger v0.2.1 // indirect - github.com/Azure/go-autorest/tracing v0.6.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 // indirect + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 // indirect + github.com/Azure/go-amqp v1.0.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -47,13 +41,11 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang-jwt/jwt/v4 v4.4.3 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/jpillora/backoff v1.0.0 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect @@ -76,6 +68,7 @@ require ( github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/valyala/fastjson v1.6.4 // indirect diff --git a/receiver/azureeventhubreceiver/go.sum b/receiver/azureeventhubreceiver/go.sum index 4cc44225885f..8d7151b407d1 100644 --- a/receiver/azureeventhubreceiver/go.sum +++ b/receiver/azureeventhubreceiver/go.sum @@ -1,36 +1,22 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/Azure/azure-amqp-common-go/v4 v4.2.0 h1:q/jLx1KJ8xeI8XGfkOWMN9XrXzAfVTkyvCxPvHCjd2I= github.com/Azure/azure-amqp-common-go/v4 v4.2.0/go.mod h1:GD3m/WPPma+621UaU6KNjKEo5Hl09z86viKwQjTpV0Q= -github.com/Azure/azure-event-hubs-go/v3 v3.6.2 h1:7rNj1/iqS/i3mUKokA2n2eMYO72TB7lO7OmpbKoakKY= -github.com/Azure/azure-event-hubs-go/v3 v3.6.2/go.mod h1:n+ocYr9j2JCLYqUqz9eI+lx/TEAtL/g6rZzyTFSuIpc= -github.com/Azure/azure-sdk-for-go v68.0.0+incompatible h1:fcYLmCpyNYRnvJbPerq7U0hS+6+I79yEDJBqVNcqUzU= -github.com/Azure/azure-sdk-for-go v68.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/go-amqp v1.0.2 h1:zHCHId+kKC7fO8IkwyZJnWMvtRXhYC0VJtD0GYkHc6M= -github.com/Azure/go-amqp v1.0.2/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= -github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= -github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= -github.com/Azure/go-autorest/autorest v0.11.28 h1:ndAExarwr5Y+GaHE6VCaY1kyS/HwwGGyuimVhWsHOEM= -github.com/Azure/go-autorest/autorest v0.11.28/go.mod h1:MrkzG3Y3AH668QyF9KRk5neJnGgmhQ6krbhR8Q5eMvA= -github.com/Azure/go-autorest/autorest/adal v0.9.18/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ= -github.com/Azure/go-autorest/autorest/adal v0.9.21 h1:jjQnVFXPfekaqb8vIsv2G1lxshoW+oGv4MDlhRtnYZk= -github.com/Azure/go-autorest/autorest/adal v0.9.21/go.mod h1:zua7mBUaCc5YnSLKYgGJR/w5ePdMDA6H56upLsHzA9U= -github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 h1:iM6UAvjR97ZIeR93qTcwpKNMpV+/FTWjwEbuPD495Tk= -github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM= -github.com/Azure/go-autorest/autorest/azure/cli v0.3.1 h1:LXl088ZQlP0SBppGFsRZonW6hSvwgL5gRByMbvUbx8U= -github.com/Azure/go-autorest/autorest/azure/cli v0.3.1/go.mod h1:ZG5p860J94/0kI9mNJVoIoLgXcirM2gF5i2kWloofxw= -github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw= -github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= -github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= -github.com/Azure/go-autorest/autorest/mocks v0.4.2 h1:PGN4EDXnuQbojHbU0UWoNvmu9AGVwYHG9/fkDYhtAfw= -github.com/Azure/go-autorest/autorest/mocks v0.4.2/go.mod h1:Vy7OitM9Kei0i1Oj+LvyAWMXJHeKH1MVlzFugfVrmyU= -github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk= -github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= -github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac= -github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E= -github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg= -github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= -github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= -github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+6GTssUXdANk6aJ7T1ZxnsQ= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aMclParm9/5Vgp+TY51uBQ= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.1.0 h1:vEe09cdSBy7evqoVUvuitnsjyozsSzI4TbGgwu01+TI= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.1.0/go.mod h1:PgOlzIlvwIagKI8N6hCsfFDpAijHCmlHqOwA5GsSh9w= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0 h1:BWeAAEzkCnL0ABVJqs+4mYudNch7oFGPtTlSmIWL8ms= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0/go.mod h1:Y3gnVwfaz8h6L1YHar+NfWORtBoVUSB5h4GlGkdeF7Q= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt2H7QXzZs0q8UBjgRbl56qo8GYM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E= +github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= +github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 h1:DzHpqpoJVaCgOUdVHxE8QB52S6NiVdDQvGlny1qvPqA= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -45,10 +31,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA= -github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= -github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TRo4= -github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -68,15 +50,15 @@ github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsM github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= -github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= -github.com/golang-jwt/jwt/v4 v4.4.3 h1:Hxl6lhQFj4AnOX6MLrsCb/+7tCj7DxP7VA+2rDIq5AU= -github.com/golang-jwt/jwt/v4 v4.4.3/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= +github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -131,8 +113,6 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= -github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= -github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c h1:cqn374mizHuIWj+OSJCajGr/phAmuMug9qIX3l9CflE= github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -142,6 +122,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= @@ -166,8 +148,6 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= -github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= -github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -328,7 +308,6 @@ golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -391,3 +370,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/receiver/azureeventhubreceiver/persister.go b/receiver/azureeventhubreceiver/persister.go index 7d9be7390104..9db7cd09ce40 100644 --- a/receiver/azureeventhubreceiver/persister.go +++ b/receiver/azureeventhubreceiver/persister.go @@ -7,20 +7,44 @@ import ( "context" "fmt" - "github.com/Azure/azure-event-hubs-go/v3/persist" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/experimental/storage" + "go.opentelemetry.io/collector/receiver" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" ) +// "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" +// "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" + const ( + // storageKeyFormat the format of the key used to store the checkpoint in the storage. storageKeyFormat = "%s/%s/%s/%s" + // StartOfStream is a constant defined to represent the start of a partition stream in EventHub. + StartOfStream = "-1" + + // EndOfStream is a constant defined to represent the current end of a partition stream in EventHub. + // This can be used as an offset argument in receiver creation to start receiving from the latest + // event, instead of a specific offset or point in time. + EndOfStream = "@latest" ) +// The Checkpoint type is now maintained here to eliminate the dependence on the deprecated eventhub SDK for this datatype. +// Preserving the previously used structure and tags keeps the receiver compatible with existing checkpoints and avoids +// any need to migrate data. +type Checkpoint struct { + Offset string `json:"offset"` + SequenceNumber int64 `json:"sequenceNumber"` + EnqueueTime string `json:"enqueueTime"` // ": "0001-01-01T00:00:00Z" +} + type storageCheckpointPersister struct { storageClient storage.Client } -func (s *storageCheckpointPersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint persist.Checkpoint) error { +func (s *storageCheckpointPersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error { b, err := jsoniter.Marshal(checkpoint) if err != nil { return err @@ -28,15 +52,52 @@ func (s *storageCheckpointPersister) Write(namespace, name, consumerGroup, parti return s.storageClient.Set(context.Background(), fmt.Sprintf(storageKeyFormat, namespace, name, consumerGroup, partitionID), b) } -func (s *storageCheckpointPersister) Read(namespace, name, consumerGroup, partitionID string) (persist.Checkpoint, error) { - var checkpoint persist.Checkpoint +func (s *storageCheckpointPersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error) { + var checkpoint Checkpoint bytes, err := s.storageClient.Get(context.Background(), fmt.Sprintf(storageKeyFormat, namespace, name, consumerGroup, partitionID)) if err != nil { - return persist.NewCheckpointFromStartOfStream(), err - } - if len(bytes) == 0 { - return persist.NewCheckpointFromStartOfStream(), err + // error reading checkpoint + return Checkpoint{}, err + } else if len(bytes) == 0 { + // nil or empty checkpoint + return NewCheckpointFromStartOfStream(), nil } err = jsoniter.Unmarshal(bytes, &checkpoint) return checkpoint, err } + +// wrappers and stubs to implement azeventhubs.CheckpointStore +func (s *storageCheckpointPersister) ClaimOwnership(_ context.Context, _ []azeventhubs.Ownership, _ *azeventhubs.ClaimOwnershipOptions) ([]azeventhubs.Ownership, error) { + return nil, nil +} + +func (s *storageCheckpointPersister) ListCheckpoints(_ context.Context, _ string, _ string, _ string, _ *azeventhubs.ListCheckpointsOptions) ([]azeventhubs.Checkpoint, error) { + return nil, nil +} + +func (s *storageCheckpointPersister) ListOwnership(_ context.Context, _ string, _ string, _ string, _ *azeventhubs.ListOwnershipOptions) ([]azeventhubs.Ownership, error) { + return nil, nil +} + +func (s *storageCheckpointPersister) SetCheckpoint(_ context.Context, _ azeventhubs.Checkpoint, _ *azeventhubs.SetCheckpointOptions) error { + return nil +} + +var _ azeventhubs.CheckpointStore = &storageCheckpointPersister{} + +// NewCheckpointFromStartOfStream returns a checkpoint for the start of the stream +func NewCheckpointFromStartOfStream() Checkpoint { + return Checkpoint{ + Offset: StartOfStream, + } +} + +func createCheckpointStore(ctx context.Context, host component.Host, cfg *Config, s receiver.CreateSettings) (*storageCheckpointPersister, error) { + storageClient, err := adapter.GetStorageClient(ctx, host, cfg.StorageID, s.ID) + if err != nil { + return nil, err + } + return &storageCheckpointPersister{ + storageClient: storageClient, + }, nil +} diff --git a/receiver/azureeventhubreceiver/persister_test.go b/receiver/azureeventhubreceiver/persister_test.go index 123cae0fbc93..aa2c52ba9f55 100644 --- a/receiver/azureeventhubreceiver/persister_test.go +++ b/receiver/azureeventhubreceiver/persister_test.go @@ -8,9 +8,7 @@ import ( "errors" "sync" "testing" - "time" - "github.com/Azure/azure-event-hubs-go/v3/persist" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -28,10 +26,10 @@ func TestStorageOffsetPersisterUnknownCheckpoint(t *testing.T) { func TestStorageOffsetPersisterWithKnownCheckpoint(t *testing.T) { client := newMockClient() s := storageCheckpointPersister{storageClient: client} - checkpoint := persist.Checkpoint{ + checkpoint := Checkpoint{ Offset: "foo", SequenceNumber: 2, - EnqueueTime: time.Now(), + EnqueueTime: "0001-01-01T00:00:00Z", } err := s.Write("foo", "bar", "foobar", "foobarfoo", checkpoint) assert.NoError(t, err) @@ -39,7 +37,7 @@ func TestStorageOffsetPersisterWithKnownCheckpoint(t *testing.T) { assert.NoError(t, err) assert.Equal(t, checkpoint.Offset, read.Offset) assert.Equal(t, checkpoint.SequenceNumber, read.SequenceNumber) - assert.True(t, checkpoint.EnqueueTime.Equal(read.EnqueueTime)) + assert.True(t, checkpoint.EnqueueTime == read.EnqueueTime) } // copied from pkg/stanza/adapter/mocks_test.go diff --git a/receiver/azureeventhubreceiver/rawlogs_unmarshaler.go b/receiver/azureeventhubreceiver/rawlogs_unmarshaler.go index 8d7811086859..672c8cd263a5 100644 --- a/receiver/azureeventhubreceiver/rawlogs_unmarshaler.go +++ b/receiver/azureeventhubreceiver/rawlogs_unmarshaler.go @@ -4,8 +4,8 @@ package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver" import ( - eventhub "github.com/Azure/azure-event-hubs-go/v3" - "go.opentelemetry.io/collector/pdata/pcommon" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + // "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" ) @@ -15,25 +15,21 @@ type rawLogsUnmarshaler struct { } func newRawLogsUnmarshaler(logger *zap.Logger) eventLogsUnmarshaler { - return rawLogsUnmarshaler{ logger: logger, } } -func (r rawLogsUnmarshaler) UnmarshalLogs(event *eventhub.Event) (plog.Logs, error) { - +func (r rawLogsUnmarshaler) UnmarshalLogs(event *azeventhubs.ReceivedEventData) (plog.Logs, error) { + r.logger.Debug("started unmarshaling logs", zap.Any("eventBody", event.Body)) l := plog.NewLogs() lr := l.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() slice := lr.Body().SetEmptyBytes() - slice.Append(event.Data...) - if event.SystemProperties.EnqueuedTime != nil { - lr.SetTimestamp(pcommon.NewTimestampFromTime(*event.SystemProperties.EnqueuedTime)) - } - + slice.Append(event.Body...) if err := lr.Attributes().FromRaw(event.Properties); err != nil { + r.logger.Error("failed extracting attributes from raw event properties", zap.Error(err)) return l, err } - + r.logger.Debug("successfully unmarshaled logs", zap.Any("logRecords", lr)) return l, nil } diff --git a/receiver/azureeventhubreceiver/receiver.go b/receiver/azureeventhubreceiver/receiver.go index 3bfd55500bde..769e41e4c4db 100644 --- a/receiver/azureeventhubreceiver/receiver.go +++ b/receiver/azureeventhubreceiver/receiver.go @@ -8,7 +8,7 @@ import ( "errors" "fmt" - eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" @@ -21,17 +21,17 @@ import ( ) type dataConsumer interface { - consume(ctx context.Context, event *eventhub.Event) error + consume(ctx context.Context, event *azeventhubs.ReceivedEventData) error setNextLogsConsumer(nextLogsConsumer consumer.Logs) setNextMetricsConsumer(nextLogsConsumer consumer.Metrics) } type eventLogsUnmarshaler interface { - UnmarshalLogs(event *eventhub.Event) (plog.Logs, error) + UnmarshalLogs(event *azeventhubs.ReceivedEventData) (plog.Logs, error) } type eventMetricsUnmarshaler interface { - UnmarshalMetrics(event *eventhub.Event) (pmetric.Metrics, error) + UnmarshalMetrics(event *azeventhubs.ReceivedEventData) (pmetric.Metrics, error) } type eventhubReceiver struct { @@ -74,7 +74,7 @@ func (receiver *eventhubReceiver) consume(ctx context.Context, event *eventhub.E } } -func (receiver *eventhubReceiver) consumeLogs(ctx context.Context, event *eventhub.Event) error { +func (receiver *eventhubReceiver) consumeLogs(ctx context.Context, event *azeventhubs.ReceivedEventData) error { if receiver.nextLogsConsumer == nil { return nil @@ -98,7 +98,7 @@ func (receiver *eventhubReceiver) consumeLogs(ctx context.Context, event *eventh return err } -func (receiver *eventhubReceiver) consumeMetrics(ctx context.Context, event *eventhub.Event) error { +func (receiver *eventhubReceiver) consumeMetrics(ctx context.Context, event *azeventhubs.ReceivedEventData) error { if receiver.nextMetricsConsumer == nil { return nil