zitadel/internal/logstore/emitter.go
Elio Bischof 1a49b7d298
perf: project quotas and usages (#6441)
* project quota added

* project quota removed

* add periods table

* make log record generic

* accumulate usage

* query usage

* count action run seconds

* fix filter in ReportQuotaUsage

* fix existing tests

* fix logstore tests

* fix typo

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* move notifications into debouncer and improve limit querying

* cleanup

* comment

* fix: add quota unit tests command side

* fix remaining quota usage query

* implement InmemLogStorage

* cleanup and linting

* improve test

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* fix: add quota unit tests command side

* action notifications and fixes for notifications query

* revert console prefix

* fix: add quota unit tests command side

* fix: add quota integration tests

* improve accountable requests

* improve accountable requests

* fix: add quota integration tests

* fix: add quota integration tests

* fix: add quota integration tests

* comment

* remove ability to store logs in db and other changes requested from review

* changes requested from review

* changes requested from review

* Update internal/api/http/middleware/access_interceptor.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* tests: fix quotas integration tests

* improve incrementUsageStatement

* linting

* fix: delete e2e tests as intergation tests cover functionality

* Update internal/api/http/middleware/access_interceptor.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* backup

* fix conflict

* create rc

* create prerelease

* remove issue release labeling

* fix tracing

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
Co-authored-by: Stefan Benz <stefan@caos.ch>
Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
2023-09-15 16:58:45 +02:00

85 lines
1.8 KiB
Go

package logstore
import (
"context"
"time"
"github.com/benbjohnson/clock"
)
type EmitterConfig struct {
Enabled bool
Debounce *DebouncerConfig
}
type emitter[T LogRecord[T]] struct {
enabled bool
ctx context.Context
debouncer *debouncer[T]
emitter LogEmitter[T]
clock clock.Clock
}
type LogRecord[T any] interface {
Normalize() T
}
type LogRecordFunc[T any] func() T
func (r LogRecordFunc[T]) Normalize() T {
return r()
}
type LogEmitter[T LogRecord[T]] interface {
Emit(ctx context.Context, bulk []T) error
}
type LogEmitterFunc[T LogRecord[T]] func(ctx context.Context, bulk []T) error
func (l LogEmitterFunc[T]) Emit(ctx context.Context, bulk []T) error {
return l(ctx, bulk)
}
type LogCleanupper[T LogRecord[T]] interface {
Cleanup(ctx context.Context, keep time.Duration) error
LogEmitter[T]
}
// NewEmitter accepts Clock from github.com/benbjohnson/clock so we can control timers and tickers in the unit tests
func NewEmitter[T LogRecord[T]](ctx context.Context, clock clock.Clock, cfg *EmitterConfig, logger LogEmitter[T]) (*emitter[T], error) {
svc := &emitter[T]{
enabled: cfg != nil && cfg.Enabled,
ctx: ctx,
emitter: logger,
clock: clock,
}
if !svc.enabled {
return svc, nil
}
if cfg.Debounce != nil && (cfg.Debounce.MinFrequency > 0 || cfg.Debounce.MaxBulkSize > 0) {
svc.debouncer = newDebouncer[T](ctx, *cfg.Debounce, clock, newStorageBulkSink(svc.emitter))
}
return svc, nil
}
func (s *emitter[T]) Emit(ctx context.Context, record T) (err error) {
if !s.enabled {
return nil
}
if s.debouncer != nil {
s.debouncer.add(record)
return nil
}
return s.emitter.Emit(ctx, []T{record})
}
func newStorageBulkSink[T LogRecord[T]](emitter LogEmitter[T]) bulkSinkFunc[T] {
return func(ctx context.Context, bulk []T) error {
return emitter.Emit(ctx, bulk)
}
}