Skip to content

Commit

Permalink
Merge branch 'beta'
Browse files Browse the repository at this point in the history
Main change that fix memory leak:
queue/inmemory replace backing slice with ring buffer implementation.
Key change preventing memory leaks: set reference for removed event to nil to release memory
User Recognition:
separate pipelines for anonymous and identified events.
aggregate identified events before processing - to speed up recognition phase.
reduce users_recognition.pool.size to 5 to increase effectiveness of identifier aggregation
don't copy every anonymous event to push them into queue. instead serialize to compressed bytes
don't mutate event for User-Agent bot detection
Optimize memory allocations while manipulating objects
Don't apply babel for builtin transform - to avoid goja initialization
Batch processing: do not copy events data for the first destination mapped to token
  • Loading branch information
absorbb committed Feb 7, 2022
2 parents e08b900 + da0ff17 commit 79d1347
Show file tree
Hide file tree
Showing 42 changed files with 384 additions and 341 deletions.
4 changes: 3 additions & 1 deletion documentation/changelog/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ We maintain two branches of Jitsu: `beta` and `master` (release). Please, see ou
A detailed change log is [available on GitHub](https://github.com/jitsucom/jitsu/releases). Here we publish only major releases with major issues description.
Starting from 1.36 we publish release announcements on [our blog](/blog):

<LargeLink href="/blog/jitsu-1.38">Jitsu v1.38 '<b>DeLorean 🚘️</b>' release</LargeLink>
<LargeLink href="/blog/jitsu-1.39">🌱 Jitsu v1.39 "<b>Foundation</b>" release</LargeLink>

<LargeLink href="/blog/jitsu-1.38">🚘️ Jitsu v1.38 '<b>DeLorean</b>' release</LargeLink>

<LargeLink href="/blog/jitsu-1.37">Jitsu v1.37 release</LargeLink>

Expand Down
2 changes: 1 addition & 1 deletion documentation/changelog/release-cycle.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ We make major version (`1.[version]`) once a month. Each version is announced on

Subscribe to any of those channels to follow the news!

Also, list of all major releases is available on a [changelog](http://localhost:3000/docs/changelog) page
Also, list of all major releases is available on a [changelog](/docs/changelog) page

## Beta

Expand Down
11 changes: 7 additions & 4 deletions documentation/configurator-configuration/index.mdx
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# Configuration File
# Configurator

If **Jitsu Configurator** is deployed in a standlone mode, it should be configured via single YAML file
(read more about deployment options [here](/docs/deployment/)).
**Jitsu Configurator** is a service that's provide a UI and API for configuring Jitsu. Configurator is an optional, but highly recommended
services (read more about `@jitsucom/server` and `@jitsucom/configurator` relations on a [docker deployment page](/docs/deployment/deploy-with-docker))

Jitsu Configurator config file consists of the following sections:
If **Jitsu Configurator** is deployed in a standalone mode (not a part of [joint image](/docs/deployment/deploy-with-docker/joint-image)), it should be configured via single YAML file
or with [env variables](/docs/deployment/deploy-with-docker/jitsu-configurator) if you're deploying it with Docker

The file consists of the following sections:

* `server` — General configuration parameters such as port, application logs configuration, CORS, etc.
* `jitsu` — Jitsu Server address configuration. Configurator UI proxies some requests from UI to Jitsu Server via Configurator e.g. getting Live events and Statistics data or Destinations Test requests.
Expand Down
4 changes: 2 additions & 2 deletions documentation/extending/destination-plugins.mdx
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
title: Destination Plugins
title: Destination Extensions
---

# Destination Plugin
# Destination Extensions (Plugins)

1. [Overview](#overview)
2. [Quickstart](#quickstart)
Expand Down
12 changes: 0 additions & 12 deletions documentation/extending/index.mdx

This file was deleted.

17 changes: 17 additions & 0 deletions documentation/extending/overview.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
title: Overview
---

# Extending Jitsu

With [Jitsu SDK](https://github.com/jitsucom/jitsu) you can implement extension for Jitsu using Typescript. Each extension
is a separate node package that could be published to public or private npm repository. Jitsu server downloads extension code and executes in within internal
V8 JS virtual machine

Extensions can have several purposes:

* **[Destination](/docs/extending/destination-plugins)**. An external service where Jitsu distributes
incoming events based on configuration. A good example of the destination extension will be [Mixpanel](https://github.com/jitsucom/jitsu-mixpanel).
* **Source (coming soon)**. Source (or pull-source) is typically a service or platform from which Jitsu pulls the data. Later, the data is being
sent to destination database or data-warehouse. See the list of [all supported source](https://cloud.jitsu.com/sources). The support of sources in
Jitsu SDK is coming in **Mar 2022**
24 changes: 21 additions & 3 deletions documentation/other-features/jitsu-api/index.mdx
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
# OpenAPI
# API Specs

Jitsu exposes following HTTP API

## Configuration API

This API is for configuring Jitsu: adding/removing/editing sources, destinations, api keys and etc.
All operations available in Jitsu UI can be done with the API too. In fact, UI is talking with backend
using the very same API

<LargeLink
href="/configurator-openapi.html"
title="Jitsu Configurator API described in openapi specification"
/>
title="Jitsu Configurator API"
/>

## Jitsu Server API

Jitsu Server API is documentation is coming. We're in the process of migrating documentation
to Open API spec. So far the documentation is scattered across different sections of documentation:

* [Cluster Administration API](/docs/other-features/admin-endpoints)
* [Events Cache API](/docs/other-features/events-cache)
* [Event API](/docs/sending-data/api)

70 changes: 2 additions & 68 deletions documentation/root.json
Original file line number Diff line number Diff line change
@@ -1,69 +1,3 @@
{
"sections": [
{
"name": "🚀 Quick Start",
"pages": [
"deployment",
"changelog"
]
},
{
"name": "Jitsu Internals",
"pages": [
"internals/jitsu-server"
]
},
{
"name": "Sending data",
"pages": [
"sending-data/js-sdk",
"sending-data/node-js",
"sending-data/mobile-apps",
"sending-data/api",
"sending-data/bulk-api",
"sending-data/segment-api",
"sending-data/segment-integration",
"sending-data/gif-pixel-api",
"sending-data/mobile-api",
"sending-data/javascript-reference"
]
},
{
"name": "Server Configuration",
"pages": [
"configuration",
"destinations-configuration",
"sources-configuration",
"sources"
]
},
{
"name": "Extending Jitsu",
"pages": [
"extending/destination-plugins"
]
},
{
"name": "Configurator UI",
"pages": [
"configurator-configuration"
]
},
{
"name": "❤️ Features",
"pages": [
"other-features/segment-compatibility",
"other-features/dbt-cloud-integration",
"other-features/dry-run-events",
"other-features/retroactive-user-recognition",
"other-features/events-cache",
"other-features/geo-data-resolution",
"other-features/typecast",
"other-features/admin-endpoints",
"other-features/application-metrics",
"other-features/cli",
"other-features/jitsu-api"
]
}
]
}
"__MOVED__": "This file has been moved to jitsu.com/components/documentation/DocumentationTableOfContents.json. Edit it there"
}
2 changes: 1 addition & 1 deletion server/appconfig/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func setDefaultParams(containerized bool) {
viper.SetDefault("users_recognition.enabled", false)
viper.SetDefault("users_recognition.anonymous_id_node", "/eventn_ctx/user/anonymous_id||/user/anonymous_id||/eventn_ctx/user/hashed_anonymous_id||/user/hashed_anonymous_id")
viper.SetDefault("users_recognition.identification_nodes", []string{"/eventn_ctx/user/id||/user/id", "/eventn_ctx/user/email||/user/email", "/eventn_ctx/user/internal_id||/user/internal_id"}) // internal_id is DEPRECATED and is set for backward compatibility
viper.SetDefault("users_recognition.pool.size", 10)
viper.SetDefault("users_recognition.pool.size", 5)

viper.SetDefault("singer-bridge.python", "python3")
viper.SetDefault("singer-bridge.install_taps", true)
Expand Down
8 changes: 4 additions & 4 deletions server/caching/events_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (ec *EventsCache) Put(disabled bool, destinationID, eventID string, seriali
select {
case ec.eventsChannel <- &statusEvent{eventType: "put", destinationID: destinationID, eventID: eventID, serializedPayload: serializedPayload}:
default:
if rand.Int31n(10) == 0 {
if rand.Int31n(1000) == 0 {
logging.Warnf("[events cache] queue overflow. Live Events UI may show inaccurate results. Consider increasing config variable: server.cache.pool.size (current value: %d)", ec.poolSize)
}
}
Expand All @@ -120,7 +120,7 @@ func (ec *EventsCache) Succeed(eventContext *adapters.EventContext) {
select {
case ec.eventsChannel <- &statusEvent{eventType: "succeed", eventContext: eventContext}:
default:
if rand.Int31n(10) == 0 {
if rand.Int31n(1000) == 0 {
logging.Warnf("[events cache] queue overflow. Live Events UI may show inaccurate results. Consider increasing config variable: server.cache.pool.size (current value: %d)", ec.poolSize)
}
}
Expand All @@ -133,7 +133,7 @@ func (ec *EventsCache) Error(disabled bool, destinationID, eventID string, errMs
select {
case ec.eventsChannel <- &statusEvent{eventType: "error", destinationID: destinationID, eventID: eventID, error: errMsg}:
default:
if rand.Int31n(10) == 0 {
if rand.Int31n(1000) == 0 {
logging.Warnf("[events cache] queue overflow. Live Events UI may show inaccurate results. Consider increasing config variable: server.cache.pool.size (current value: %d)", ec.poolSize)
}
}
Expand All @@ -146,7 +146,7 @@ func (ec *EventsCache) Skip(disabled bool, destinationID, eventID string, errMsg
select {
case ec.eventsChannel <- &statusEvent{eventType: "skip", destinationID: destinationID, eventID: eventID, error: errMsg}:
default:
if rand.Int31n(10) == 0 {
if rand.Int31n(1000) == 0 {
logging.Warnf("[events cache] queue overflow. Live Events UI may show inaccurate results. Consider increasing config variable: server.cache.pool.size (current value: %d)", ec.poolSize)
}
}
Expand Down
4 changes: 4 additions & 0 deletions server/enrichment/user_agent_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (uap *UserAgentParseRule) Execute(event map[string]interface{}) {
return
}
uap.mutex.Lock()
if len(uap.cache) > 100_000 {
logging.Infof("Cleaning up user-agent's cache.")
uap.cache = map[string]map[string]interface{}{}
}
uap.cache[ua] = parsedUAMap
uap.mutex.Unlock()
}
Expand Down
10 changes: 6 additions & 4 deletions server/handlers/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (bh *BulkHandler) BulkLoadingHandler(c *gin.Context) {
return
}

needCopyEvent := len(storageProxies) > 1

eventObjects, err := extractBulkEvents(c)
if err != nil {
c.JSON(http.StatusBadRequest, middleware.ErrResponse(err.Error(), nil))
Expand All @@ -62,8 +64,8 @@ func (bh *BulkHandler) BulkLoadingHandler(c *gin.Context) {

rowsCount := len(eventObjects)

for _, storageProxy := range storageProxies {
if err := bh.upload(storageProxy, eventObjects); err != nil {
for i, storageProxy := range storageProxies {
if err := bh.upload(storageProxy, eventObjects, needCopyEvent && i > 0); err != nil {

metrics.ErrorTokenEvents(tokenID, storageProxy.Type(), storageProxy.ID(), rowsCount)
metrics.ErrorTokenObjects(tokenID, rowsCount)
Expand Down Expand Up @@ -112,7 +114,7 @@ func extractBulkEvents(c *gin.Context) ([]map[string]interface{}, error) {
return objects, nil
}

func (bh *BulkHandler) upload(storageProxy storages.StorageProxy, objects []map[string]interface{}) error {
func (bh *BulkHandler) upload(storageProxy storages.StorageProxy, objects []map[string]interface{}, needCopyEvent bool) error {
storage, ok := storageProxy.Get()
if !ok {
return fmt.Errorf("Destination [%s] hasn't been initialized yet", storage.ID())
Expand All @@ -122,7 +124,7 @@ func (bh *BulkHandler) upload(storageProxy storages.StorageProxy, objects []map[
"cannot be used to store data (only available for dry-run)", storage.ID())
}

return storage.SyncStore(nil, objects, "", true)
return storage.SyncStore(nil, objects, "", true, needCopyEvent)
}

//readFileBytes reads file from form data and returns byte payload or err if occurred
Expand Down
2 changes: 1 addition & 1 deletion server/handlers/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (h *EventTemplateHandler) evaluate(req *EvaluateTemplateRequest) (response
response.UserResult = string(jsonBytes)
}

envls, err := storage.Processor().ProcessEvent(req.Object)
envls, err := storage.Processor().ProcessEvent(req.Object, false)
if err != nil {
if err == schema.ErrSkipObject {
response.Result = "SKIPPED"
Expand Down
2 changes: 1 addition & 1 deletion server/jsonutils/single_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (jp *SingleJSONPath) setWithInnerCreation(obj map[string]interface{}, value
return fmt.Errorf("Value %d wasn't set into %s: %s node isn't an object", value, jp.String(), key)
}
} else if createInnerObjects {
subMap := map[string]interface{}{}
subMap := make(map[string]interface{}, 1)
obj[key] = subMap
obj = subMap
} else {
Expand Down
11 changes: 6 additions & 5 deletions server/logfiles/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ func (u *PeriodicUploader) Start() {
for _, filePath := range files {
fileName := filepath.Base(filePath)

b, err := ioutil.ReadFile(filePath)
fileBytes, err := ioutil.ReadFile(filePath)
if err != nil {
logging.SystemErrorf("Error reading file [%s] with events: %v", filePath, err)
continue
}
if len(b) == 0 {
if len(fileBytes) == 0 {
os.Remove(filePath)
continue
}
Expand All @@ -101,8 +101,9 @@ func (u *PeriodicUploader) Start() {
logging.Warnf("Destination storages weren't found for file [%s] and token [%s]", filePath, tokenID)
continue
}
needCopyEvent := len(storageProxies) > 1

objects, parsingErrors, err := parsers.ParseJSONFileWithFuncFallback(b, parsers.ParseJSON)
objects, parsingErrors, err := parsers.ParseJSONFileWithFuncFallback(fileBytes, parsers.ParseJSON)
if err != nil {
logging.SystemErrorf("Error parsing JSON file [%s] with events: %v", filePath, err)
continue
Expand All @@ -119,7 +120,7 @@ func (u *PeriodicUploader) Start() {

//flag for archiving file if all storages don't have errors while storing this file
archiveFile := true
for _, storageProxy := range storageProxies {
for i, storageProxy := range storageProxies {
storage, ok := storageProxy.Get()
if !ok {
archiveFile = false
Expand All @@ -134,7 +135,7 @@ func (u *PeriodicUploader) Start() {
}
}

resultPerTable, failedEvents, skippedEvents, err := storage.Store(fileName, objects, alreadyUploadedTables)
resultPerTable, failedEvents, skippedEvents, err := storage.Store(fileName, objects, alreadyUploadedTables, needCopyEvent && i > 0)

if !skippedEvents.IsEmpty() {
metrics.SkipTokenEvents(tokenID, storage.Type(), storage.ID(), len(skippedEvents.Events))
Expand Down
2 changes: 1 addition & 1 deletion server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func main() {
logging.Fatalf("Error initializing users recognition storage: %v", err)
}

usersRecognitionService, err := users.NewRecognitionService(userRecognitionStorage, destinationsService, globalRecognitionConfiguration)
usersRecognitionService, err := users.NewRecognitionService(userRecognitionStorage, destinationsService, globalRecognitionConfiguration, viper.GetString("server.fields_configuration.user_agent_path"))
if err != nil {
logging.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions server/maputils/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package maputils

//CopyMap returns copy of input map with all sub objects
func CopyMap(m map[string]interface{}) map[string]interface{} {
cp := make(map[string]interface{})
cp := make(map[string]interface{}, len(m))
for k, v := range m {
vm, ok := v.(map[string]interface{})
if ok {
Expand All @@ -17,7 +17,7 @@ func CopyMap(m map[string]interface{}) map[string]interface{} {

//CopySet returns copy of input set
func CopySet(m map[string]bool) map[string]bool {
cs := make(map[string]bool)
cs := make(map[string]bool, len(m))
for k, v := range m {
cs[k] = v
}
Expand Down
Loading

0 comments on commit 79d1347

Please sign in to comment.