From bb756482c7917a1e1ded926e444eec4be2f2ade8 Mon Sep 17 00:00:00 2001 From: Elio Bischof Date: Thu, 6 Jul 2023 08:38:13 +0200 Subject: [PATCH] feat: push telemetry (#6027) * document analytics config * rework configuration and docs * describe HandleActiveInstances better * describe active instances on quotas better * only projected events are considered * cleanup * describe changes at runtime * push milestones * stop tracking events * calculate and push 4 in 6 milestones * reduce milestone pushed * remove docs * fix scheduled pseudo event projection * push 5 in 6 milestones * push 6 in 6 milestones * ignore client ids * fix text array contains * push human readable milestone type * statement unit tests * improve dev and db performance * organize imports * cleanup * organize imports * test projection * check rows.Err() * test search query * pass linting * review * test 4 milestones * simplify milestone by instance ids query * use type NamespacedCondition * cleanup * lint * lint * dont overwrite original error * no opt-in in examples * cleanup * prerelease * enable request headers * make limit configurable * review fixes * only requeue special handlers secondly * include integration tests * Revert "include integration tests" This reverts commit 96db9504ecdb4e73451f09554fd749cd7c27341f. * pass reducers * test handlers * fix unit test * feat: increment version * lint * remove prerelease * fix integration tests --- .github/workflows/integration.yml | 2 +- .gitignore | 1 + CONTRIBUTING.md | 20 +- build/zitadel/Dockerfile | 2 +- cmd/defaults.yaml | 43 +- cmd/start/config.go | 2 + cmd/start/start.go | 4 +- docs/docs/self-hosting/deploy/knative.mdx | 2 +- docs/docs/self-hosting/manage/production.md | 20 + .../session/v2/session_integration_test.go | 2 +- .../api/grpc/user/v2/user_integration_test.go | 6 +- internal/command/command.go | 2 + internal/command/milestone.go | 22 + internal/command/quota_report.go | 4 +- .../eventstore/handler/crdb/handler_stmt.go | 55 ++- .../handler/crdb/handler_stmt_test.go | 54 ++- internal/eventstore/handler/crdb/statement.go | 69 +-- .../eventstore/handler/crdb/statement_test.go | 164 ++++--- .../eventstore/handler/handler_projection.go | 74 ++-- .../handler/handler_projection_test.go | 1 + internal/eventstore/handler/statement.go | 16 +- internal/integration/client.go | 29 ++ .../integration/config/system-user-key.pem | 27 ++ internal/integration/config/zitadel.yaml | 16 + internal/integration/integration.go | 82 +++- internal/integration/rand.go | 20 + .../notification/channels/webhook/channel.go | 11 +- .../notification/channels/webhook/config.go | 2 + .../notification/handlers/already_handled.go | 5 +- .../handlers/handlers_integration_test.go | 30 ++ .../{quotanotifier.go => quota_notifier.go} | 2 +- .../notification/handlers/telemetry_pusher.go | 150 +++++++ .../telemetry_pusher_integration_test.go | 89 ++++ .../{usernotifier.go => user_notifier.go} | 6 +- internal/notification/projections.go | 13 + internal/query/milestone.go | 146 +++++++ internal/query/milestone_test.go | 189 ++++++++ internal/query/projection/assert.go | 14 + internal/query/projection/assert_test.go | 52 +++ internal/query/projection/event_test.go | 16 +- internal/query/projection/label_policy.go | 8 +- .../query/projection/label_policy_test.go | 4 +- internal/query/projection/milestones.go | 295 +++++++++++++ internal/query/projection/milestones_test.go | 404 ++++++++++++++++++ internal/query/projection/projection.go | 6 +- internal/query/search_query.go | 21 + internal/repository/milestone/aggregate.go | 30 ++ internal/repository/milestone/events.go | 54 +++ internal/repository/milestone/eventstore.go | 9 + internal/repository/milestone/type.go | 59 +++ internal/repository/milestone/type_string.go | 30 ++ internal/repository/pseudo/aggregate.go | 21 + internal/repository/pseudo/events.go | 40 ++ 53 files changed, 2214 insertions(+), 231 deletions(-) create mode 100644 internal/command/milestone.go create mode 100644 internal/integration/config/system-user-key.pem create mode 100644 internal/integration/rand.go create mode 100644 internal/notification/handlers/handlers_integration_test.go rename internal/notification/handlers/{quotanotifier.go => quota_notifier.go} (96%) create mode 100644 internal/notification/handlers/telemetry_pusher.go create mode 100644 internal/notification/handlers/telemetry_pusher_integration_test.go rename internal/notification/handlers/{usernotifier.go => user_notifier.go} (99%) create mode 100644 internal/query/milestone.go create mode 100644 internal/query/milestone_test.go create mode 100644 internal/query/projection/assert.go create mode 100644 internal/query/projection/assert_test.go create mode 100644 internal/query/projection/milestones.go create mode 100644 internal/query/projection/milestones_test.go create mode 100644 internal/repository/milestone/aggregate.go create mode 100644 internal/repository/milestone/events.go create mode 100644 internal/repository/milestone/eventstore.go create mode 100644 internal/repository/milestone/type.go create mode 100644 internal/repository/milestone/type_string.go create mode 100644 internal/repository/pseudo/aggregate.go create mode 100644 internal/repository/pseudo/events.go diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 23e35ceb66..0bc56314ca 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -43,7 +43,7 @@ jobs: go run main.go init --config internal/integration/config/zitadel.yaml --config internal/integration/config/${INTEGRATION_DB_FLAVOR}.yaml go run main.go setup --masterkeyFromEnv --config internal/integration/config/zitadel.yaml --config internal/integration/config/${INTEGRATION_DB_FLAVOR}.yaml - name: Run integration tests - run: go test -tags=integration -race -p 1 -v -coverprofile=profile.cov -coverpkg=./internal/...,./cmd/... ./internal/integration ./internal/api/grpc/... + run: go test -tags=integration -race -p 1 -v -coverprofile=profile.cov -coverpkg=./internal/...,./cmd/... ./internal/integration ./internal/api/grpc/... ./internal/notification/handlers/... - name: Publish go coverage uses: codecov/codecov-action@v3.1.0 with: diff --git a/.gitignore b/.gitignore index 75d5b034ff..f9793af1e6 100644 --- a/.gitignore +++ b/.gitignore @@ -59,6 +59,7 @@ docs/docs/apis/auth docs/docs/apis/admin docs/docs/apis/mgmt docs/docs/apis/system +docs/docs/apis/proto # local build/local/*.env diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 06be1797a1..32de6b633c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,3 +1,4 @@ + # Contributing to ZITADEL ## Introduction @@ -34,7 +35,11 @@ Follow [@zitadel](https://twitter.com/zitadel) on twitter We strongly recommend to [talk to us](https://zitadel.com/contact) before you start contributing to streamline our and your work. -We accept contributions through pull requests. You need a github account for that. If you are unfamiliar with git have a look at Github's documentation on [creating forks](https://help.github.com/articles/fork-a-repo) and [creating pull requests](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork). Please draft the pull request as soon as possible. Go through the following checklist before you submit the final pull request: +We accept contributions through pull requests. +You need a github account for that. +If you are unfamiliar with git have a look at Github's documentation on [creating forks](https://help.github.com/articles/fork-a-repo) and [creating pull requests](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork). +Please draft the pull request as soon as possible. +Go through the following checklist before you submit the final pull request: ### Submit a pull request (PR) @@ -61,7 +66,8 @@ We accept contributions through pull requests. You need a github account for tha ### Review a pull request -The reviewers will provide you feedback and approve your changes as soon as they are satisfied. If we ask you for changes in the code, you can follow the [GitHub Guide](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/incorporating-feedback-in-your-pull-request) to incorporate feedback in your pull request. +The reviewers will provide you feedback and approve your changes as soon as they are satisfied. +If we ask you for changes in the code, you can follow the [GitHub Guide](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/incorporating-feedback-in-your-pull-request) to incorporate feedback in your pull request. @@ -88,6 +94,16 @@ This is optional to indicate which component is affected. In doubt, leave blank Provide a brief description of the change. +### Quality assurance + +Please make sure you cover your changes with tests before marking a Pull Request as ready for review: + +- [ ] Integration tests against the gRPC server ensure that one or multiple API calls that belong together return the expected results. +- [ ] Integration tests against the gRPC server ensure that probable good and bad read and write permissions are tested. +- [ ] Integration tests against the gRPC server ensure that the API is easily usable despite eventual consistency. +- [ ] Integration tests against the gRPC server ensure that all probable login and registration flows are covered." +- [ ] Integration tests ensure that certain commands send expected notifications. + ## Contribute The code consists of the following parts: diff --git a/build/zitadel/Dockerfile b/build/zitadel/Dockerfile index 7c0f7bdcad..18d3dc118e 100644 --- a/build/zitadel/Dockerfile +++ b/build/zitadel/Dockerfile @@ -97,7 +97,7 @@ RUN rm -r cockroach-${COCKROACH_VERSION}.linux-amd64 # Migrations for cockroach-secure RUN go install github.com/rakyll/statik \ - && go test -race -v -coverprofile=profile.cov $(go list ./... | grep -v /operator/) + && go test -race -coverprofile=profile.cov $(go list ./... | grep -v /operator/) ####################### ## Go test results diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 4cc990b34b..b15317fcc6 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -14,6 +14,29 @@ Tracing: Fraction: 1.0 MetricPrefix: zitadel +Telemetry: + # As long as Enabled is true, ZITADEL tries to send usage data to the configured Telemetry.Endpoints. + # Data is projected by ZITADEL even if Enabled is false. + # This means that switching this to true makes ZITADEL try to send past data. + Enabled: false + # Push telemetry data to all these endpoints at least once using an HTTP POST request. + # If one endpoint returns an unsuccessful response code or times out, + # ZITADEL retries to push the data point to all configured endpoints until it succeeds. + # Configure delivery guarantees and intervals in the section Projections.Customizations.Telemetry + # The endpoints can be reconfigured at runtime. + # Ten redirects are followed. + # If you change this configuration at runtime, remaining data that is not successfully delivered to the old endpoints is sent to the new endpoints. + Endpoints: + - https://httpbin.org/post + # These headers are sent with every request to the configured endpoints. + Headers: + # single-value: "single-value" + # multi-value: + # - "multi-value-1" + # - "multi-value-2" + # The maximum number of data points that are queried before they are sent to the configured endpoints. + Limit: 100 # ZITADEL_TELEMETRY_LIMIT + # Port ZITADEL will listen on Port: 8080 # Port ZITADEL is exposed on, it can differ from port e.g. if you proxy the traffic @@ -169,17 +192,29 @@ Projections: BulkLimit: 2000 # The Notifications projection is used for sending emails and SMS to users Notifications: - # As notification projections don't result in database statements, retries don't have an effect + # As notification projections don't result in database statements, retries don't have any effects MaxFailureCount: 0 # The NotificationsQuotas projection is used for calling quota webhooks NotificationsQuotas: - # Delivery guarantee requirements are probably higher for quota webhooks + # In case of failed deliveries, ZITADEL retries to send the data points to the configured endpoints, but only for active instances. + # An instance is active, as long as there are projected events on the instance, that are not older than the HandleActiveInstances duration. + # Delivery guarantee requirements are higher for quota webhooks # Defaults to 45 days HandleActiveInstances: 1080h - # As quota notification projections don't result in database statements, retries don't have an effect + # As quota notification projections don't result in database statements, retries don't have any effects MaxFailureCount: 0 - # Quota notifications are not so time critical. Setting RequeueEvery every five minutes doesn't annoy the db too much. + # Quota notifications are not so time critical. Setting RequeueEvery every five minutes doesn't annoy the database too much. RequeueEvery: 300s + Telemetry: + # In case of failed deliveries, ZITADEL retries to send the data points to the configured endpoints, but only for active instances. + # An instance is active, as long as there are projected events on the instance, that are not older than the HandleActiveInstances duration. + # Telemetry delivery guarantee requirements are a bit higher than normal data projections, as they are not interactively retryable. + # Defaults to 15 days + HandleActiveInstances: 360h + # As sending telemetry data doesn't result in database statements, retries don't have any effects + MaxFailureCount: 0 + # Telemetry data synchronization is not time critical. Setting RequeueEvery to 55 minutes doesn't annoy the database too much. + RequeueEvery: 3300s Auth: SearchLimit: 1000 diff --git a/cmd/start/config.go b/cmd/start/config.go index 3517ef7bb2..ac8a9f75db 100644 --- a/cmd/start/config.go +++ b/cmd/start/config.go @@ -25,6 +25,7 @@ import ( "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/logstore" + "github.com/zitadel/zitadel/internal/notification/handlers" "github.com/zitadel/zitadel/internal/query/projection" static_config "github.com/zitadel/zitadel/internal/static/config" metrics "github.com/zitadel/zitadel/internal/telemetry/metrics/config" @@ -65,6 +66,7 @@ type Config struct { Eventstore *eventstore.Config LogStore *logstore.Configs Quotas *QuotasConfig + Telemetry *handlers.TelemetryPusherConfig } type QuotasConfig struct { diff --git a/cmd/start/start.go b/cmd/start/start.go index e91b628d9d..f970698bf2 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -207,14 +207,14 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error return err } - usageReporter := logstore.UsageReporterFunc(commands.ReportUsage) + usageReporter := logstore.UsageReporterFunc(commands.ReportQuotaUsage) actionsLogstoreSvc := logstore.New(queries, usageReporter, actionsExecutionDBEmitter, actionsExecutionStdoutEmitter) if actionsLogstoreSvc.Enabled() { logging.Warn("execution logs are currently in beta") } actions.SetLogstoreService(actionsLogstoreSvc) - notification.Start(ctx, config.Projections.Customizations["notifications"], config.Projections.Customizations["notificationsquotas"], config.ExternalPort, config.ExternalSecure, commands, queries, eventstoreClient, assets.AssetAPIFromDomain(config.ExternalSecure, config.ExternalPort), config.SystemDefaults.Notifications.FileSystemPath, keys.User, keys.SMTP, keys.SMS) + notification.Start(ctx, config.Projections.Customizations["notifications"], config.Projections.Customizations["notificationsquotas"], config.Projections.Customizations["telemetry"], *config.Telemetry, config.ExternalPort, config.ExternalSecure, commands, queries, eventstoreClient, assets.AssetAPIFromDomain(config.ExternalSecure, config.ExternalPort), config.SystemDefaults.Notifications.FileSystemPath, keys.User, keys.SMTP, keys.SMS) router := mux.NewRouter() tlsConfig, err := config.TLS.Config() diff --git a/docs/docs/self-hosting/deploy/knative.mdx b/docs/docs/self-hosting/deploy/knative.mdx index 22da0ce9d1..e2a67ccb6c 100644 --- a/docs/docs/self-hosting/deploy/knative.mdx +++ b/docs/docs/self-hosting/deploy/knative.mdx @@ -33,7 +33,7 @@ kn service create zitadel \ --env ZITADEL_EXTERNALPORT=80 \ --env ZITADEL_TLS_ENABLED=false \ --env ZITADEL_EXTERNALDOMAIN=zitadel.default.127.0.0.1.sslip.io \ ---arg "start-from-init" --arg "--masterkey" --arg "MasterkeyNeedsToHave32Characters" +--arg "start-from-init" --arg "--masterkey" --arg "MasterkeyNeedsToHave32Characters" ``` ### Knavite yaml diff --git a/docs/docs/self-hosting/manage/production.md b/docs/docs/self-hosting/manage/production.md index 0f3169fa02..f00e7fe0ef 100644 --- a/docs/docs/self-hosting/manage/production.md +++ b/docs/docs/self-hosting/manage/production.md @@ -61,6 +61,26 @@ Instead, your execution environment should provide tooling for managing logs in This includes tasks like rotating files, routing, collecting, archiving and cleaning-up. For example, systemd has journald and kubernetes has fluentd and fluentbit. +## Telemetry + +If you want to have some data about reached usage milestones pushed to external systems, enable telemetry in the ZITADEL configuration. + +The following table describes the milestones that are sent to the endpoints: + +| Trigger | Description | +|-----------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------| +| A virtual instance is created. | This data point is also sent when the first instance is automatically created during the ZITADEL binaries setup phase in a self-hosting scenario. | +| An authentication succeeded for the first time on an instance. | This is the first authentication with the instances automatically created admin user during the instance setup, which can be a human or a machine. | +| A project is created for the first time in a virtual instance. | The ZITADEL project that is automatically created during the instance setup is omitted. | +| An application is created for the first time in a virtual instance. | The applications in the ZITADEL project that are automatically created during the instance setup are omitted. | +| An authentication succeeded for the first time in a virtal instances application. | This is the first authentication using a ZITADEL application that is not created during the instance setup phase. | +| A virtual instance is deleted. | This data point is sent when a virtual instance is deleted via ZITADELs system API | + + +ZITADEL pushes the metrics by projecting certain events. +Therefore, you can configure delivery guarantees not in the Telemetry section of the ZITADEL configuration, +but in the Projections.Customizations.Telemetry section + ## Database ### Prefer CockroachDB diff --git a/internal/api/grpc/session/v2/session_integration_test.go b/internal/api/grpc/session/v2/session_integration_test.go index 6c3c2720dd..82833d2983 100644 --- a/internal/api/grpc/session/v2/session_integration_test.go +++ b/internal/api/grpc/session/v2/session_integration_test.go @@ -36,7 +36,7 @@ func TestMain(m *testing.M) { defer Tester.Done() Client = Tester.Client.SessionV2 - CTX, _ = Tester.WithSystemAuthorization(ctx, integration.OrgOwner), errCtx + CTX, _ = Tester.WithAuthorization(ctx, integration.OrgOwner), errCtx User = Tester.CreateHumanUser(CTX) Tester.RegisterUserPasskey(CTX, User.GetUserId()) return m.Run() diff --git a/internal/api/grpc/user/v2/user_integration_test.go b/internal/api/grpc/user/v2/user_integration_test.go index 40efe61950..e43b6a2c93 100644 --- a/internal/api/grpc/user/v2/user_integration_test.go +++ b/internal/api/grpc/user/v2/user_integration_test.go @@ -38,7 +38,7 @@ func TestMain(m *testing.M) { Tester = integration.NewTester(ctx) defer Tester.Done() - CTX, ErrCTX = Tester.WithSystemAuthorization(ctx, integration.OrgOwner), errCtx + CTX, ErrCTX = Tester.WithAuthorization(ctx, integration.OrgOwner), errCtx Client = Tester.Client.UserV2 return m.Run() }()) @@ -454,7 +454,7 @@ func TestServer_AddIDPLink(t *testing.T) { args: args{ CTX, &user.AddIDPLinkRequest{ - UserId: Tester.Users[integration.OrgOwner].ID, + UserId: Tester.Users[integration.FirstInstanceUsersKey][integration.OrgOwner].ID, IdpLink: &user.IDPLink{ IdpId: "idpID", UserId: "userID", @@ -470,7 +470,7 @@ func TestServer_AddIDPLink(t *testing.T) { args: args{ CTX, &user.AddIDPLinkRequest{ - UserId: Tester.Users[integration.OrgOwner].ID, + UserId: Tester.Users[integration.FirstInstanceUsersKey][integration.OrgOwner].ID, IdpLink: &user.IDPLink{ IdpId: idpID, UserId: "userID", diff --git a/internal/command/command.go b/internal/command/command.go index 3c804576be..5065bd2fd2 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -18,6 +18,7 @@ import ( "github.com/zitadel/zitadel/internal/repository/idpintent" instance_repo "github.com/zitadel/zitadel/internal/repository/instance" "github.com/zitadel/zitadel/internal/repository/keypair" + "github.com/zitadel/zitadel/internal/repository/milestone" "github.com/zitadel/zitadel/internal/repository/org" proj_repo "github.com/zitadel/zitadel/internal/repository/project" "github.com/zitadel/zitadel/internal/repository/quota" @@ -124,6 +125,7 @@ func StartCommands( quota.RegisterEventMappers(repo.eventstore) session.RegisterEventMappers(repo.eventstore) idpintent.RegisterEventMappers(repo.eventstore) + milestone.RegisterEventMappers(repo.eventstore) repo.userPasswordAlg = crypto.NewBCrypt(defaults.SecretGenerators.PasswordSaltCost) repo.machineKeySize = int(defaults.SecretGenerators.MachineKeySize) diff --git a/internal/command/milestone.go b/internal/command/milestone.go new file mode 100644 index 0000000000..f01ec6d158 --- /dev/null +++ b/internal/command/milestone.go @@ -0,0 +1,22 @@ +package command + +import ( + "context" + + "github.com/zitadel/zitadel/internal/repository/milestone" +) + +// MilestonePushed writes a new milestone.PushedEvent with a new milestone.Aggregate to the eventstore +func (c *Commands) MilestonePushed( + ctx context.Context, + msType milestone.Type, + endpoints []string, + primaryDomain string, +) error { + id, err := c.idGenerator.Next() + if err != nil { + return err + } + _, err = c.eventstore.Push(ctx, milestone.NewPushedEvent(ctx, milestone.NewAggregate(ctx, id), msType, endpoints, primaryDomain, c.externalDomain)) + return err +} diff --git a/internal/command/quota_report.go b/internal/command/quota_report.go index b18b9e9b51..19855452ed 100644 --- a/internal/command/quota_report.go +++ b/internal/command/quota_report.go @@ -7,8 +7,8 @@ import ( "github.com/zitadel/zitadel/internal/repository/quota" ) -// ReportUsage calls notification hooks and emits the notified events -func (c *Commands) ReportUsage(ctx context.Context, dueNotifications []*quota.NotificationDueEvent) error { +// ReportQuotaUsage writes a slice of *quota.NotificationDueEvent directly to the eventstore +func (c *Commands) ReportQuotaUsage(ctx context.Context, dueNotifications []*quota.NotificationDueEvent) error { cmds := make([]eventstore.Command, len(dueNotifications)) for idx, notification := range dueNotifications { cmds[idx] = notification diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index 8f6d9481f0..e49fe007be 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -11,6 +11,7 @@ import ( "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/repository/pseudo" ) var ( @@ -49,6 +50,8 @@ type StatementHandler struct { initialized chan bool bulkLimit uint64 + + reduceScheduledPseudoEvent bool } func NewStatementHandler( @@ -57,30 +60,40 @@ func NewStatementHandler( ) StatementHandler { aggregateTypes := make([]eventstore.AggregateType, 0, len(config.Reducers)) reduces := make(map[eventstore.EventType]handler.Reduce, len(config.Reducers)) + reduceScheduledPseudoEvent := false for _, aggReducer := range config.Reducers { aggregateTypes = append(aggregateTypes, aggReducer.Aggregate) + if aggReducer.Aggregate == pseudo.AggregateType { + reduceScheduledPseudoEvent = true + if len(config.Reducers) != 1 || + len(aggReducer.EventRedusers) != 1 || + aggReducer.EventRedusers[0].Event != pseudo.ScheduledEventType { + panic("if a pseudo.AggregateType is reduced, exactly one event reducer for pseudo.ScheduledEventType is supported and no other aggregate can be reduced") + } + } for _, eventReducer := range aggReducer.EventRedusers { reduces[eventReducer.Event] = eventReducer.Reduce } } h := StatementHandler{ - client: config.Client, - sequenceTable: config.SequenceTable, - maxFailureCount: config.MaxFailureCount, - currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, config.SequenceTable), - updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, config.SequenceTable), - failureCountStmt: fmt.Sprintf(failureCountStmtFormat, config.FailedEventsTable), - setFailureCountStmt: fmt.Sprintf(setFailureCountStmtFormat, config.FailedEventsTable), - aggregates: aggregateTypes, - reduces: reduces, - bulkLimit: config.BulkLimit, - Locker: NewLocker(config.Client.DB, config.LockTable, config.ProjectionName), - initCheck: config.InitCheck, - initialized: make(chan bool), + client: config.Client, + sequenceTable: config.SequenceTable, + maxFailureCount: config.MaxFailureCount, + currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, config.SequenceTable), + updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, config.SequenceTable), + failureCountStmt: fmt.Sprintf(failureCountStmtFormat, config.FailedEventsTable), + setFailureCountStmt: fmt.Sprintf(setFailureCountStmtFormat, config.FailedEventsTable), + aggregates: aggregateTypes, + reduces: reduces, + bulkLimit: config.BulkLimit, + Locker: NewLocker(config.Client.DB, config.LockTable, config.ProjectionName), + initCheck: config.InitCheck, + initialized: make(chan bool), + reduceScheduledPseudoEvent: reduceScheduledPseudoEvent, } - h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized) + h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.searchQuery, h.Lock, h.Unlock, h.initialized, reduceScheduledPseudoEvent) return h } @@ -88,10 +101,19 @@ func NewStatementHandler( func (h *StatementHandler) Start() { h.initialized <- true close(h.initialized) - h.Subscribe(h.aggregates...) + if !h.reduceScheduledPseudoEvent { + h.Subscribe(h.aggregates...) + } } -func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) { +func (h *StatementHandler) searchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) { + if h.reduceScheduledPseudoEvent { + return nil, 1, nil + } + return h.dbSearchQuery(ctx, instanceIDs) +} + +func (h *StatementHandler) dbSearchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) { sequences, err := h.currentSequences(ctx, h.client.QueryContext, instanceIDs) if err != nil { return nil, 0, err @@ -115,7 +137,6 @@ func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string InstanceID(instanceID) } } - return queryBuilder, h.bulkLimit, nil } diff --git a/internal/eventstore/handler/crdb/handler_stmt_test.go b/internal/eventstore/handler/crdb/handler_stmt_test.go index 8c0d1537f4..b72db40eb1 100644 --- a/internal/eventstore/handler/crdb/handler_stmt_test.go +++ b/internal/eventstore/handler/crdb/handler_stmt_test.go @@ -18,6 +18,7 @@ import ( "github.com/zitadel/zitadel/internal/eventstore/repository" es_repo_mock "github.com/zitadel/zitadel/internal/eventstore/repository/mock" "github.com/zitadel/zitadel/internal/id" + "github.com/zitadel/zitadel/internal/repository/pseudo" ) var ( @@ -60,7 +61,7 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { type fields struct { sequenceTable string projectionName string - aggregates []eventstore.AggregateType + reducers []handler.AggregateReducer bulkLimit uint64 } type args struct { @@ -77,7 +78,7 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { fields: fields{ sequenceTable: "my_sequences", projectionName: "my_projection", - aggregates: []eventstore.AggregateType{"testAgg"}, + reducers: failingAggregateReducers("testAgg"), bulkLimit: 5, }, args: args{ @@ -99,7 +100,7 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { fields: fields{ sequenceTable: "my_sequences", projectionName: "my_projection", - aggregates: []eventstore.AggregateType{"testAgg"}, + reducers: failingAggregateReducers("testAgg"), bulkLimit: 5, }, args: args{ @@ -129,7 +130,7 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { fields: fields{ sequenceTable: "my_sequences", projectionName: "my_projection", - aggregates: []eventstore.AggregateType{"testAgg"}, + reducers: failingAggregateReducers("testAgg"), bulkLimit: 5, }, args: args{ @@ -158,6 +159,32 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { Limit(5), }, }, + { + name: "scheduled pseudo event", + fields: fields{ + sequenceTable: "my_sequences", + projectionName: "my_projection", + reducers: []handler.AggregateReducer{{ + Aggregate: pseudo.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: pseudo.ScheduledEventType, + Reduce: testReduceErr(errors.New("should not be called")), + }, + }, + }}, + bulkLimit: 5, + }, + args: args{ + instanceIDs: []string{"instanceID1", "instanceID2"}, + }, + want: want{ + limit: 1, + isErr: func(err error) bool { + return err == nil + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -177,15 +204,14 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { Client: &database.DB{ DB: client, }, + Reducers: tt.fields.reducers, }) - h.aggregates = tt.fields.aggregates - for _, expectation := range tt.want.expectations { expectation(mock) } - query, limit, err := h.SearchQuery(context.Background(), tt.args.instanceIDs) + query, limit, err := h.searchQuery(context.Background(), tt.args.instanceIDs) if !tt.want.isErr(err) { t.Errorf("ProjectionHandler.prepareBulkStmts() error = %v", err) return @@ -1768,3 +1794,17 @@ func testReduceErr(err error) handler.Reduce { return nil, err } } + +func failingAggregateReducers(aggregates ...eventstore.AggregateType) []handler.AggregateReducer { + reducers := make([]handler.AggregateReducer, len(aggregates)) + for idx := range aggregates { + reducers[idx] = handler.AggregateReducer{ + Aggregate: aggregates[idx], + EventRedusers: []handler.EventReducer{{ + Event: "any.event", + Reduce: testReduceErr(errors.New("should not be called")), + }}, + } + } + return reducers +} diff --git a/internal/eventstore/handler/crdb/statement.go b/internal/eventstore/handler/crdb/statement.go index 591751e663..efcc4952e0 100644 --- a/internal/eventstore/handler/crdb/statement.go +++ b/internal/eventstore/handler/crdb/statement.go @@ -235,12 +235,6 @@ func AddDeleteStatement(conditions []handler.Condition, opts ...execOption) func } } -func AddCopyStatement(conflict, from, to []handler.Column, conditions []handler.Condition, opts ...execOption) func(eventstore.Event) Exec { - return func(event eventstore.Event) Exec { - return NewCopyStatement(event, conflict, from, to, conditions, opts...).Execute - } -} - func NewArrayAppendCol(column string, value interface{}) handler.Column { return handler.Column{ Name: column, @@ -286,12 +280,30 @@ func NewCopyCol(column, from string) handler.Column { } func NewLessThanCond(column string, value interface{}) handler.Condition { - return handler.Condition{ - Name: column, - Value: value, - ParameterOpt: func(placeholder string) string { - return " < " + placeholder - }, + return func(param string) (string, interface{}) { + return column + " < " + param, value + } +} + +func NewIsNullCond(column string) handler.Condition { + return func(param string) (string, interface{}) { + return column + " IS NULL", nil + } +} + +// NewTextArrayContainsCond returns a handler.Condition that checks if the column that stores an array of text contains the given value +func NewTextArrayContainsCond(column string, value string) handler.Condition { + return func(param string) (string, interface{}) { + return column + " @> " + param, database.StringArray{value} + } +} + +// Not is a function and not a method, so that calling it is well readable +// For example conditions := []handler.Condition{ Not(NewTextArrayContainsCond())} +func Not(condition handler.Condition) handler.Condition { + return func(param string) (string, interface{}) { + cond, value := condition(param) + return "NOT (" + cond + ")", value } } @@ -300,7 +312,7 @@ func NewLessThanCond(column string, value interface{}) handler.Condition { // if the value of a col is empty the data will be copied from the selected row // if the value of a col is not empty the data will be set by the static value // conds represent the conditions for the selection subquery -func NewCopyStatement(event eventstore.Event, conflictCols, from, to []handler.Column, conds []handler.Condition, opts ...execOption) *handler.Statement { +func NewCopyStatement(event eventstore.Event, conflictCols, from, to []handler.Column, nsCond []handler.NamespacedCondition, opts ...execOption) *handler.Statement { columnNames := make([]string, len(to)) selectColumns := make([]string, len(from)) updateColumns := make([]string, len(columnNames)) @@ -319,13 +331,12 @@ func NewCopyStatement(event eventstore.Event, conflictCols, from, to []handler.C } } - - wheres := make([]string, len(conds)) - for i, cond := range conds { - argCounter++ - wheres[i] = "copy_table." + cond.Name + " = $" + strconv.Itoa(argCounter) - args = append(args, cond.Value) + cond := make([]handler.Condition, len(nsCond)) + for i := range nsCond { + cond[i] = nsCond[i]("copy_table") } + wheres, values := conditionsToWhere(cond, len(args)) + args = append(args, values...) conflictTargets := make([]string, len(conflictCols)) for i, conflictCol := range conflictCols { @@ -340,7 +351,7 @@ func NewCopyStatement(event eventstore.Event, conflictCols, from, to []handler.C config.err = handler.ErrNoValues } - if len(conds) == 0 { + if len(cond) == 0 { config.err = handler.ErrNoCondition } @@ -394,18 +405,16 @@ func columnsToQuery(cols []handler.Column) (names []string, parameters []string, return names, parameters, values[:parameterIndex] } -func conditionsToWhere(cols []handler.Condition, paramOffset int) (wheres []string, values []interface{}) { - wheres = make([]string, len(cols)) - values = make([]interface{}, len(cols)) - - for i, col := range cols { - wheres[i] = "(" + col.Name + " = $" + strconv.Itoa(i+1+paramOffset) + ")" - if col.ParameterOpt != nil { - wheres[i] = "(" + col.Name + col.ParameterOpt("$"+strconv.Itoa(i+1+paramOffset)) + ")" +func conditionsToWhere(conditions []handler.Condition, paramOffset int) (wheres []string, values []interface{}) { + wheres = make([]string, len(conditions)) + values = make([]interface{}, 0, len(conditions)) + for i, conditionFunc := range conditions { + condition, value := conditionFunc("$" + strconv.Itoa(i+1+paramOffset)) + wheres[i] = "(" + condition + ")" + if value != nil { + values = append(values, value) } - values[i] = col.Value } - return wheres, values } diff --git a/internal/eventstore/handler/crdb/statement_test.go b/internal/eventstore/handler/crdb/statement_test.go index fd9f451d94..e023f0079e 100644 --- a/internal/eventstore/handler/crdb/statement_test.go +++ b/internal/eventstore/handler/crdb/statement_test.go @@ -6,6 +6,7 @@ import ( "reflect" "testing" + "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler" ) @@ -420,10 +421,7 @@ func TestNewUpdateStatement(t *testing.T) { }, }, conditions: []handler.Condition{ - { - Name: "col2", - Value: 1, - }, + handler.NewCond("col2", 1), }, }, want: want{ @@ -450,10 +448,7 @@ func TestNewUpdateStatement(t *testing.T) { }, values: []handler.Column{}, conditions: []handler.Condition{ - { - Name: "col2", - Value: 1, - }, + handler.NewCond("col2", 1), }, }, want: want{ @@ -515,10 +510,7 @@ func TestNewUpdateStatement(t *testing.T) { }, }, conditions: []handler.Condition{ - { - Name: "col2", - Value: 1, - }, + handler.NewCond("col2", 1), }, }, want: want{ @@ -560,10 +552,7 @@ func TestNewUpdateStatement(t *testing.T) { }, }, conditions: []handler.Condition{ - { - Name: "col2", - Value: 1, - }, + handler.NewCond("col2", 1), }, }, want: want{ @@ -630,10 +619,7 @@ func TestNewDeleteStatement(t *testing.T) { previousSequence: 0, }, conditions: []handler.Condition{ - { - Name: "col2", - Value: 1, - }, + handler.NewCond("col2", 1), }, }, want: want{ @@ -683,10 +669,7 @@ func TestNewDeleteStatement(t *testing.T) { aggregateType: "agg", }, conditions: []handler.Condition{ - { - Name: "col1", - Value: 1, - }, + handler.NewCond("col1", 1), }, }, want: want{ @@ -842,11 +825,9 @@ func TestNewMultiStatement(t *testing.T) { execs: []func(eventstore.Event) Exec{ AddDeleteStatement( []handler.Condition{ - { - Name: "col1", - Value: 1, - }, - }), + handler.NewCond("col1", 1), + }, + ), AddCreateStatement( []handler.Column{ { @@ -876,11 +857,9 @@ func TestNewMultiStatement(t *testing.T) { }, }, []handler.Condition{ - { - Name: "col1", - Value: 1, - }, - }), + handler.NewCond("col1", 1), + }, + ), }, }, want: want{ @@ -942,7 +921,7 @@ func TestNewCopyStatement(t *testing.T) { conflictingCols []handler.Column from []handler.Column to []handler.Column - conds []handler.Condition + conds []handler.NamespacedCondition } type want struct { aggregateType eventstore.AggregateType @@ -966,11 +945,8 @@ func TestNewCopyStatement(t *testing.T) { sequence: 1, previousSequence: 0, }, - conds: []handler.Condition{ - { - Name: "col2", - Value: 1, - }, + conds: []handler.NamespacedCondition{ + handler.NewNamespacedCondition("col2", 1), }, }, want: want{ @@ -995,7 +971,7 @@ func TestNewCopyStatement(t *testing.T) { sequence: 1, previousSequence: 0, }, - conds: []handler.Condition{}, + conds: []handler.NamespacedCondition{}, from: []handler.Column{ { Name: "col", @@ -1029,7 +1005,7 @@ func TestNewCopyStatement(t *testing.T) { sequence: 1, previousSequence: 0, }, - conds: []handler.Condition{}, + conds: []handler.NamespacedCondition{}, from: []handler.Column{ { Name: "col", @@ -1066,10 +1042,8 @@ func TestNewCopyStatement(t *testing.T) { sequence: 1, previousSequence: 0, }, - conds: []handler.Condition{ - { - Name: "col", - }, + conds: []handler.NamespacedCondition{ + handler.NewNamespacedCondition("col2", nil), }, from: []handler.Column{}, }, @@ -1124,15 +1098,9 @@ func TestNewCopyStatement(t *testing.T) { Name: "col_b", }, }, - conds: []handler.Condition{ - { - Name: "id", - Value: 2, - }, - { - Name: "state", - Value: 3, - }, + conds: []handler.NamespacedCondition{ + handler.NewNamespacedCondition("id", 2), + handler.NewNamespacedCondition("state", 3), }, }, want: want{ @@ -1143,7 +1111,7 @@ func TestNewCopyStatement(t *testing.T) { executer: &wantExecuter{ params: []params{ { - query: "INSERT INTO my_table (state, id, col_a, col_b) SELECT $1, id, col_a, col_b FROM my_table AS copy_table WHERE copy_table.id = $2 AND copy_table.state = $3 ON CONFLICT () DO UPDATE SET (state, id, col_a, col_b) = ($1, EXCLUDED.id, EXCLUDED.col_a, EXCLUDED.col_b)", + query: "INSERT INTO my_table (state, id, col_a, col_b) SELECT $1, id, col_a, col_b FROM my_table AS copy_table WHERE (copy_table.id = $2) AND (copy_table.state = $3) ON CONFLICT () DO UPDATE SET (state, id, col_a, col_b) = ($1, EXCLUDED.id, EXCLUDED.col_a, EXCLUDED.col_b)", args: []interface{}{1, 2, 3}, }, }, @@ -1191,15 +1159,9 @@ func TestNewCopyStatement(t *testing.T) { Name: "col_d", }, }, - conds: []handler.Condition{ - { - Name: "id", - Value: 2, - }, - { - Name: "state", - Value: 3, - }, + conds: []handler.NamespacedCondition{ + handler.NewNamespacedCondition("id", 2), + handler.NewNamespacedCondition("state", 3), }, }, want: want{ @@ -1210,7 +1172,7 @@ func TestNewCopyStatement(t *testing.T) { executer: &wantExecuter{ params: []params{ { - query: "INSERT INTO my_table (state, id, col_c, col_d) SELECT $1, id, col_a, col_b FROM my_table AS copy_table WHERE copy_table.id = $2 AND copy_table.state = $3 ON CONFLICT () DO UPDATE SET (state, id, col_c, col_d) = ($1, EXCLUDED.id, EXCLUDED.col_a, EXCLUDED.col_b)", + query: "INSERT INTO my_table (state, id, col_c, col_d) SELECT $1, id, col_a, col_b FROM my_table AS copy_table WHERE (copy_table.id = $2) AND (copy_table.state = $3) ON CONFLICT () DO UPDATE SET (state, id, col_c, col_d) = ($1, EXCLUDED.id, EXCLUDED.col_a, EXCLUDED.col_b)", args: []interface{}{1, 2, 3}, }, }, @@ -1395,7 +1357,7 @@ func Test_columnsToQuery(t *testing.T) { } } -func Test_columnsToWhere(t *testing.T) { +func Test_conditionsToWhere(t *testing.T) { type args struct { conds []handler.Condition paramOffset int @@ -1421,10 +1383,7 @@ func Test_columnsToWhere(t *testing.T) { name: "no offset", args: args{ conds: []handler.Condition{ - { - Name: "col1", - Value: "val1", - }, + handler.NewCond("col1", "val1"), }, paramOffset: 0, }, @@ -1437,14 +1396,8 @@ func Test_columnsToWhere(t *testing.T) { name: "multiple cols", args: args{ conds: []handler.Condition{ - { - Name: "col1", - Value: "val1", - }, - { - Name: "col2", - Value: "val2", - }, + handler.NewCond("col1", "val1"), + handler.NewCond("col2", "val2"), }, paramOffset: 0, }, @@ -1457,10 +1410,7 @@ func Test_columnsToWhere(t *testing.T) { name: "2 offset", args: args{ conds: []handler.Condition{ - { - Name: "col1", - Value: "val1", - }, + handler.NewCond("col1", "val1"), }, paramOffset: 2, }, @@ -1469,6 +1419,54 @@ func Test_columnsToWhere(t *testing.T) { values: []interface{}{"val1"}, }, }, + { + name: "less than", + args: args{ + conds: []handler.Condition{ + NewLessThanCond("col1", "val1"), + }, + }, + want: want{ + wheres: []string{"(col1 < $1)"}, + values: []interface{}{"val1"}, + }, + }, + { + name: "is null", + args: args{ + conds: []handler.Condition{ + NewIsNullCond("col1"), + }, + }, + want: want{ + wheres: []string{"(col1 IS NULL)"}, + values: []interface{}{}, + }, + }, + { + name: "text array contains", + args: args{ + conds: []handler.Condition{ + NewTextArrayContainsCond("col1", "val1"), + }, + }, + want: want{ + wheres: []string{"(col1 @> $1)"}, + values: []interface{}{database.StringArray{"val1"}}, + }, + }, + { + name: "not", + args: args{ + conds: []handler.Condition{ + Not(handler.NewCond("col1", "val1")), + }, + }, + want: want{ + wheres: []string{"(NOT (col1 = $1))"}, + values: []interface{}{"val1"}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index 808da1dbb8..c8c3096905 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -10,6 +10,7 @@ import ( "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/repository/pseudo" ) const ( @@ -49,19 +50,20 @@ type NowFunc func() time.Time type ProjectionHandler struct { Handler - ProjectionName string - reduce Reduce - update Update - searchQuery SearchQuery - triggerProjection *time.Timer - lock Lock - unlock Unlock - requeueAfter time.Duration - retryFailedAfter time.Duration - retries int - concurrentInstances int - handleActiveInstances time.Duration - nowFunc NowFunc + ProjectionName string + reduce Reduce + update Update + searchQuery SearchQuery + triggerProjection *time.Timer + lock Lock + unlock Unlock + requeueAfter time.Duration + retryFailedAfter time.Duration + retries int + concurrentInstances int + handleActiveInstances time.Duration + nowFunc NowFunc + reduceScheduledPseudoEvent bool } func NewProjectionHandler( @@ -73,32 +75,35 @@ func NewProjectionHandler( lock Lock, unlock Unlock, initialized <-chan bool, + reduceScheduledPseudoEvent bool, ) *ProjectionHandler { concurrentInstances := int(config.ConcurrentInstances) if concurrentInstances < 1 { concurrentInstances = 1 } h := &ProjectionHandler{ - Handler: NewHandler(config.HandlerConfig), - ProjectionName: config.ProjectionName, - reduce: reduce, - update: update, - searchQuery: query, - lock: lock, - unlock: unlock, - requeueAfter: config.RequeueEvery, - triggerProjection: time.NewTimer(0), // first trigger is instant on startup - retryFailedAfter: config.RetryFailedAfter, - retries: int(config.Retries), - concurrentInstances: concurrentInstances, - handleActiveInstances: config.HandleActiveInstances, - nowFunc: time.Now, + Handler: NewHandler(config.HandlerConfig), + ProjectionName: config.ProjectionName, + reduce: reduce, + update: update, + searchQuery: query, + lock: lock, + unlock: unlock, + requeueAfter: config.RequeueEvery, + triggerProjection: time.NewTimer(0), // first trigger is instant on startup + retryFailedAfter: config.RetryFailedAfter, + retries: int(config.Retries), + concurrentInstances: concurrentInstances, + handleActiveInstances: config.HandleActiveInstances, + nowFunc: time.Now, + reduceScheduledPseudoEvent: reduceScheduledPseudoEvent, } go func() { <-initialized - go h.subscribe(ctx) - + if !h.reduceScheduledPseudoEvent { + go h.subscribe(ctx) + } go h.schedule(ctx) }() @@ -158,6 +163,13 @@ func (h *ProjectionHandler) Process(ctx context.Context, events ...eventstore.Ev // FetchEvents checks the current sequences and filters for newer events func (h *ProjectionHandler) FetchEvents(ctx context.Context, instances ...string) ([]eventstore.Event, bool, error) { + if h.reduceScheduledPseudoEvent { + return h.fetchPseudoEvents(ctx, instances...) + } + return h.fetchDBEvents(ctx, instances...) +} + +func (h *ProjectionHandler) fetchDBEvents(ctx context.Context, instances ...string) ([]eventstore.Event, bool, error) { eventQuery, eventsLimit, err := h.searchQuery(ctx, instances) if err != nil { return nil, false, err @@ -169,6 +181,10 @@ func (h *ProjectionHandler) FetchEvents(ctx context.Context, instances ...string return events, int(eventsLimit) == len(events), err } +func (h *ProjectionHandler) fetchPseudoEvents(ctx context.Context, instances ...string) ([]eventstore.Event, bool, error) { + return []eventstore.Event{pseudo.NewScheduledEvent(ctx, time.Now(), instances...)}, false, nil +} + func (h *ProjectionHandler) subscribe(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer func() { diff --git a/internal/eventstore/handler/handler_projection_test.go b/internal/eventstore/handler/handler_projection_test.go index 3ab88ef4a1..349799e46a 100644 --- a/internal/eventstore/handler/handler_projection_test.go +++ b/internal/eventstore/handler/handler_projection_test.go @@ -342,6 +342,7 @@ func TestProjectionHandler_Process(t *testing.T) { nil, nil, nil, + false, ) index, err := h.Process(tt.args.ctx, tt.args.events...) diff --git a/internal/eventstore/handler/statement.go b/internal/eventstore/handler/statement.go index ae7bec92e1..1eecd78f23 100644 --- a/internal/eventstore/handler/statement.go +++ b/internal/eventstore/handler/statement.go @@ -4,7 +4,6 @@ import ( "database/sql" "encoding/json" "errors" - "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/eventstore" @@ -62,11 +61,18 @@ func NewJSONCol(name string, value interface{}) Column { return NewCol(name, marshalled) } -type Condition Column +type Condition func(param string) (string, interface{}) + +type NamespacedCondition func(namespace string) Condition func NewCond(name string, value interface{}) Condition { - return Condition{ - Name: name, - Value: value, + return func(param string) (string, interface{}) { + return name + " = " + param, value + } +} + +func NewNamespacedCondition(name string, value interface{}) NamespacedCondition { + return func(namespace string) Condition { + return NewCond(namespace+"."+name, value) } } diff --git a/internal/integration/client.go b/internal/integration/client.go index 86b78bb87e..fecfecac44 100644 --- a/internal/integration/client.go +++ b/internal/integration/client.go @@ -20,6 +20,7 @@ import ( mgmt "github.com/zitadel/zitadel/pkg/grpc/management" object "github.com/zitadel/zitadel/pkg/grpc/object/v2alpha" session "github.com/zitadel/zitadel/pkg/grpc/session/v2alpha" + "github.com/zitadel/zitadel/pkg/grpc/system" user "github.com/zitadel/zitadel/pkg/grpc/user/v2alpha" ) @@ -29,6 +30,7 @@ type Client struct { Mgmt mgmt.ManagementServiceClient UserV2 user.UserServiceClient SessionV2 session.SessionServiceClient + System system.SystemServiceClient } func newClient(cc *grpc.ClientConn) Client { @@ -38,9 +40,36 @@ func newClient(cc *grpc.ClientConn) Client { Mgmt: mgmt.NewManagementServiceClient(cc), UserV2: user.NewUserServiceClient(cc), SessionV2: session.NewSessionServiceClient(cc), + System: system.NewSystemServiceClient(cc), } } +func (t *Tester) UseIsolatedInstance(iamOwnerCtx, systemCtx context.Context) (primaryDomain, instanceId string, authenticatedIamOwnerCtx context.Context) { + primaryDomain = randString(5) + ".integration" + instance, err := t.Client.System.CreateInstance(systemCtx, &system.CreateInstanceRequest{ + InstanceName: "testinstance", + CustomDomain: primaryDomain, + Owner: &system.CreateInstanceRequest_Machine_{ + Machine: &system.CreateInstanceRequest_Machine{ + UserName: "owner", + Name: "owner", + PersonalAccessToken: &system.CreateInstanceRequest_PersonalAccessToken{}, + }, + }, + }) + if err != nil { + panic(err) + } + t.createClientConn(iamOwnerCtx, grpc.WithAuthority(primaryDomain)) + instanceId = instance.GetInstanceId() + t.Users[instanceId] = map[UserType]User{ + IAMOwner: { + Token: instance.GetPat(), + }, + } + return primaryDomain, instanceId, t.WithInstanceAuthorization(iamOwnerCtx, IAMOwner, instanceId) +} + func (s *Tester) CreateHumanUser(ctx context.Context) *user.AddHumanUserResponse { resp, err := s.Client.UserV2.AddHumanUser(ctx, &user.AddHumanUserRequest{ Organisation: &object.Organisation{ diff --git a/internal/integration/config/system-user-key.pem b/internal/integration/config/system-user-key.pem new file mode 100644 index 0000000000..2c7a6e7f1f --- /dev/null +++ b/internal/integration/config/system-user-key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAzi+FFSJL7f5yw4KTwzgMP34ePGycm/M+kT0M7V4Cgx5V3EaD +IvTQKTLfBaEB45zb9LtjIXzDw0rXRoS2hO6th+CYQCz3KCvh09C0IzxZiB2IS3H/ +aT+5Bx9EFY+vnAkZjccbyG5YNRvmtOlnvIeIH7qZ0tEwkPfF5GEZNPJPtmy3UGV7 +iofdVQS1xRj73+aMw5rvH4D8IdyiAC3VekIbpt0Vj0SUX3DwKtog337BzTiPk3aX +RF0sbFhQoqdJRI8NqgZjCwjq9yfI5tyxYswn+JGzHGdHvW3idODlmwEt5K2pasiR +IWK2OGfq+w0EcltQHabuqEPgZlmhCkRdNfixBwIDAQABAoIBAA9jNoBkRdxmH/R9 +Wz+3gBqA9Aq4ZFuzJJk8QCm62V8ltWyyCnliYeKhPEm0QWrWOwghr/1AzW9Wt4g4 +wVJcabD5TwODF5L0626eZcM3bsscwR44TMJzEgD5EWC2j3mKqFCPaoBj08tq4KXh +wW8tgjgz+eTk3cYD583qfTIZX1+SzSMBpetTBsssQtGhhOB/xPiuL7hi+fXmV2rh +8mc9X6+wJ5u3zepsyK0vBeEDmurD4ZUIXFrZ0WCB/wNkSW9VKyoH+RC1asQAgqTz +glJ/NPbDJSKGvSBQydoKkqoXx7MVJ8VObFddfgo4dtOoz6YCfUVBHt8qy+E5rz5y +CICjL/kCgYEA9MnHntVVKNXtEFZPo02xgCwS3eG27ZwjYgJ1ZkCHM5BuL4MS7qbr +743/POs1Ctaok0udHl1PFB4uAG0URnmkUnWzcoJYb6Plv03F0LRdsnfuhehfIxLP +nWvxSm5n21H4ytfxm0BWY09JkLDnJZtXrgTILbuqb9Wy6TmAvUaF2YUCgYEA16Ec +ywSaLVdqPaVpsTxi7XpRJAB2Isjp6RffNEecta4S0LL7s/IO3QXDH9SYpgmgCTah +3aXhpT4hIFlpg3eBjVfbOwgqub8DgirnSQyQt99edUtHIK+K8nMdGxz6X6pfTKzK +asSH7qPlt5tz1621vC0ocXSZR7zm99/FgwILwBsCgYBOsP8nJFV4By1qbxSy3qsN +FR4LjiAMSoFlZHzxHhVYkjmZtH1FkwuNuwwuPT6T+WW/1DLyK/Tb9se7A1XdQgV9 +LLE/Qn/Dg+C7mvjYmuL0GHHpQkYzNDzh0m2DC/L/Il7kdn8I9anPyxFPHk9wW3vY +SVlAum+T/BLDvuSP9DfbMQKBgCc1j7PG8XYfOB1fj7l/volqPYjrYI/wssAE7Dxo +bTGIJrm2YhiVgmhkXNfT47IFfAlQ2twgBsjyZDmqqIoUWAVonV+9m29NMYkg3g+l +bkdRIa74ckWaRgzSK8+7VDfDFjMuFFyXwhP9z460gLsORkaie4Et75Vg3yrhkNvC +qnpTAoGBAMguDSWBbCewXnHlKGFpm+LH+OIvVKGEhtCSvfZojtNrg/JBeBebSL1n +mmT1cONO+0O5bz7uVaRd3JdnH2JFevY698zFfhVsjVCrm+fz31i5cxAgC39G2Lfl +YkTaa1AFLstnf348ZjuvBN3USUYZo3X3mxnS+uluVuRSGwIKsN0a +-----END RSA PRIVATE KEY----- diff --git a/internal/integration/config/zitadel.yaml b/internal/integration/config/zitadel.yaml index 60389d6de0..ba52818bce 100644 --- a/internal/integration/config/zitadel.yaml +++ b/internal/integration/config/zitadel.yaml @@ -4,6 +4,16 @@ Log: TLS: Enabled: false +Telemetry: + Enabled: true + Endpoints: + - http://localhost:8081 + Headers: + single-value: "single-value" + multi-value: + - "multi-value-1" + - "multi-value-2" + FirstInstance: Org: Human: @@ -31,7 +41,13 @@ Projections: Customizations: NotificationsQuotas: RequeueEvery: 1s + Telemetry: + RequeueEvery: 5s DefaultInstance: LoginPolicy: MfaInitSkipLifetime: "0" + +SystemAPIUsers: + - tester: + KeyData: "LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF6aStGRlNKTDdmNXl3NEtUd3pnTQpQMzRlUEd5Y20vTStrVDBNN1Y0Q2d4NVYzRWFESXZUUUtUTGZCYUVCNDV6YjlMdGpJWHpEdzByWFJvUzJoTzZ0CmgrQ1lRQ3ozS0N2aDA5QzBJenhaaUIySVMzSC9hVCs1Qng5RUZZK3ZuQWtaamNjYnlHNVlOUnZtdE9sbnZJZUkKSDdxWjB0RXdrUGZGNUdFWk5QSlB0bXkzVUdWN2lvZmRWUVMxeFJqNzMrYU13NXJ2SDREOElkeWlBQzNWZWtJYgpwdDBWajBTVVgzRHdLdG9nMzM3QnpUaVBrM2FYUkYwc2JGaFFvcWRKUkk4TnFnWmpDd2pxOXlmSTV0eXhZc3duCitKR3pIR2RIdlczaWRPRGxtd0V0NUsycGFzaVJJV0syT0dmcSt3MEVjbHRRSGFidXFFUGdabG1oQ2tSZE5maXgKQndJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==" diff --git a/internal/integration/integration.go b/internal/integration/integration.go index 6ffec11e1d..d21cb36d2d 100644 --- a/internal/integration/integration.go +++ b/internal/integration/integration.go @@ -15,6 +15,7 @@ import ( "github.com/spf13/viper" "github.com/zitadel/logging" + "github.com/zitadel/oidc/v2/pkg/client" "github.com/zitadel/oidc/v2/pkg/oidc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -23,6 +24,7 @@ import ( "github.com/zitadel/zitadel/cmd" "github.com/zitadel/zitadel/cmd/start" "github.com/zitadel/zitadel/internal/api/authz" + http_util "github.com/zitadel/zitadel/internal/api/http" z_oidc "github.com/zitadel/zitadel/internal/api/oidc" "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/domain" @@ -40,6 +42,8 @@ var ( cockroachYAML []byte //go:embed config/postgres.yaml postgresYAML []byte + //go:embed config/system-user-key.pem + systemUserKey []byte ) // UserType provides constants that give @@ -53,6 +57,12 @@ type UserType int const ( Unspecified UserType = iota OrgOwner + IAMOwner + SystemUser // SystemUser is a user with access to the system service. +) + +const ( + FirstInstanceUsersKey = "first" ) // User information with a Personal Access Token. @@ -67,7 +77,7 @@ type Tester struct { Instance authz.Instance Organisation *query.Org - Users map[UserType]User + Users map[string]map[UserType]User Client Client WebAuthN *webauthn.Client @@ -80,11 +90,12 @@ func (s *Tester) Host() string { return fmt.Sprintf("%s:%d", s.Config.ExternalDomain, s.Config.Port) } -func (s *Tester) createClientConn(ctx context.Context) { +func (s *Tester) createClientConn(ctx context.Context, opts ...grpc.DialOption) { target := s.Host() - cc, err := grpc.DialContext(ctx, target, - grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()), - ) + cc, err := grpc.DialContext(ctx, target, append(opts, + grpc.WithBlock(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + )...) if err != nil { s.Shutdown <- os.Interrupt s.wg.Wait() @@ -124,10 +135,10 @@ func (s *Tester) pollHealth(ctx context.Context) (err error) { } const ( - SystemUser = "integration" + MachineUser = "integration" ) -func (s *Tester) createSystemUser(ctx context.Context) { +func (s *Tester) createMachineUser(ctx context.Context, instanceId string) { var err error s.Instance, err = s.Queries.InstanceByHost(ctx, s.Host()) @@ -137,7 +148,7 @@ func (s *Tester) createSystemUser(ctx context.Context) { s.Organisation, err = s.Queries.OrgByID(ctx, true, s.Instance.DefaultOrganisationID()) logging.OnError(err).Fatal("query organisation") - query, err := query.NewUserUsernameSearchQuery(SystemUser, query.TextEquals) + query, err := query.NewUserUsernameSearchQuery(MachineUser, query.TextEquals) logging.OnError(err).Fatal("user query") user, err := s.Queries.GetUser(ctx, true, true, query) @@ -146,8 +157,8 @@ func (s *Tester) createSystemUser(ctx context.Context) { ObjectRoot: models.ObjectRoot{ ResourceOwner: s.Organisation.ID, }, - Username: SystemUser, - Name: SystemUser, + Username: MachineUser, + Name: MachineUser, Description: "who cares?", AccessTokenType: domain.OIDCTokenTypeJWT, }) @@ -168,16 +179,43 @@ func (s *Tester) createSystemUser(ctx context.Context) { _, err = s.Commands.AddPersonalAccessToken(ctx, pat) logging.OnError(err).Fatal("add pat") - s.Users = map[UserType]User{ - OrgOwner: { - User: user, - Token: pat.Token, - }, + if s.Users == nil { + s.Users = make(map[string]map[UserType]User) + } + if s.Users[instanceId] == nil { + s.Users[instanceId] = make(map[UserType]User) + } + s.Users[instanceId][OrgOwner] = User{ + User: user, + Token: pat.Token, } } -func (s *Tester) WithSystemAuthorization(ctx context.Context, u UserType) context.Context { - return metadata.AppendToOutgoingContext(ctx, "Authorization", fmt.Sprintf("Bearer %s", s.Users[u].Token)) +func (s *Tester) WithAuthorization(ctx context.Context, u UserType) context.Context { + return s.WithInstanceAuthorization(ctx, u, FirstInstanceUsersKey) +} + +func (s *Tester) WithInstanceAuthorization(ctx context.Context, u UserType, instanceID string) context.Context { + if u == SystemUser { + s.ensureSystemUser() + } + return metadata.AppendToOutgoingContext(ctx, "Authorization", fmt.Sprintf("Bearer %s", s.Users[instanceID][u].Token)) +} + +func (s *Tester) ensureSystemUser() { + const ISSUER = "tester" + if s.Users[FirstInstanceUsersKey] == nil { + s.Users[FirstInstanceUsersKey] = make(map[UserType]User) + } + if _, ok := s.Users[FirstInstanceUsersKey][SystemUser]; ok { + return + } + audience := http_util.BuildOrigin(s.Host(), s.Server.Config.ExternalSecure) + signer, err := client.NewSignerFromPrivateKeyByte(systemUserKey, "") + logging.OnError(err).Fatal("system key signer") + jwt, err := client.SignedJWTProfileAssertion(ISSUER, []string{audience}, time.Hour, signer) + logging.OnError(err).Fatal("system key jwt") + s.Users[FirstInstanceUsersKey][SystemUser] = User{Token: jwt} } // Done send an interrupt signal to cleanly shutdown the server. @@ -224,7 +262,11 @@ func NewTester(ctx context.Context) *Tester { } logging.OnError(err).Fatal() - tester := new(Tester) + tester := Tester{ + Users: map[string]map[UserType]User{ + FirstInstanceUsersKey: make(map[UserType]User), + }, + } tester.wg.Add(1) go func(wg *sync.WaitGroup) { logging.OnError(cmd.Execute()).Fatal() @@ -237,10 +279,10 @@ func NewTester(ctx context.Context) *Tester { logging.OnError(ctx.Err()).Fatal("waiting for integration tester server") } tester.createClientConn(ctx) - tester.createSystemUser(ctx) + tester.createMachineUser(ctx, FirstInstanceUsersKey) tester.WebAuthN = webauthn.NewClient(tester.Config.WebAuthNName, tester.Config.ExternalDomain, "https://"+tester.Host()) - return tester + return &tester } func Contexts(timeout time.Duration) (ctx, errCtx context.Context, cancel context.CancelFunc) { diff --git a/internal/integration/rand.go b/internal/integration/rand.go new file mode 100644 index 0000000000..4425c97c8c --- /dev/null +++ b/internal/integration/rand.go @@ -0,0 +1,20 @@ +package integration + +import ( + "math/rand" + "time" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz") + +func randString(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) +} diff --git a/internal/notification/channels/webhook/channel.go b/internal/notification/channels/webhook/channel.go index 6dbed74eb8..c587f593b0 100644 --- a/internal/notification/channels/webhook/channel.go +++ b/internal/notification/channels/webhook/channel.go @@ -21,10 +21,8 @@ func InitChannel(ctx context.Context, cfg Config) (channels.NotificationChannel, logging.Debug("successfully initialized webhook json channel") return channels.HandleMessageFunc(func(message channels.Message) error { - requestCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - msg, ok := message.(*messages.JSON) if !ok { return errors.ThrowInternal(nil, "WEBH-K686U", "message is not JSON") @@ -33,27 +31,24 @@ func InitChannel(ctx context.Context, cfg Config) (channels.NotificationChannel, if err != nil { return err } - req, err := http.NewRequestWithContext(requestCtx, cfg.Method, cfg.CallURL, strings.NewReader(payload)) if err != nil { return err } - + if cfg.Headers != nil { + req.Header = cfg.Headers + } req.Header.Set("Content-Type", "application/json") - resp, err := http.DefaultClient.Do(req) if err != nil { return err } - if err = resp.Body.Close(); err != nil { return err } - if resp.StatusCode < 200 || resp.StatusCode >= 300 { return errors.ThrowUnknown(fmt.Errorf("calling url %s returned %s", cfg.CallURL, resp.Status), "WEBH-LBxU0", "webhook didn't return a success status") } - logging.WithFields("calling_url", cfg.CallURL, "method", cfg.Method).Debug("webhook called") return nil }), nil diff --git a/internal/notification/channels/webhook/config.go b/internal/notification/channels/webhook/config.go index 4af0c15402..9772151d88 100644 --- a/internal/notification/channels/webhook/config.go +++ b/internal/notification/channels/webhook/config.go @@ -1,12 +1,14 @@ package webhook import ( + "net/http" "net/url" ) type Config struct { CallURL string Method string + Headers http.Header } func (w *Config) Validate() error { diff --git a/internal/notification/handlers/already_handled.go b/internal/notification/handlers/already_handled.go index abf7eb901b..556a3e33c9 100644 --- a/internal/notification/handlers/already_handled.go +++ b/internal/notification/handlers/already_handled.go @@ -4,16 +4,15 @@ import ( "context" "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/repository/user" ) -func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) { +func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, aggregateType eventstore.AggregateType, eventTypes ...eventstore.EventType) (bool, error) { events, err := n.es.Filter( ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). InstanceID(event.Aggregate().InstanceID). AddQuery(). - AggregateTypes(user.AggregateType). + AggregateTypes(aggregateType). AggregateIDs(event.Aggregate().ID). SequenceGreater(event.Sequence()). EventTypes(eventTypes...). diff --git a/internal/notification/handlers/handlers_integration_test.go b/internal/notification/handlers/handlers_integration_test.go new file mode 100644 index 0000000000..37f6a344c9 --- /dev/null +++ b/internal/notification/handlers/handlers_integration_test.go @@ -0,0 +1,30 @@ +//go:build integration + +package handlers_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/zitadel/zitadel/internal/integration" +) + +var ( + CTX context.Context + SystemCTX context.Context + Tester *integration.Tester +) + +func TestMain(m *testing.M) { + os.Exit(func() int { + ctx, _, cancel := integration.Contexts(5 * time.Minute) + CTX = ctx + defer cancel() + Tester = integration.NewTester(ctx) + SystemCTX = Tester.WithAuthorization(ctx, integration.SystemUser) + defer Tester.Done() + return m.Run() + }()) +} diff --git a/internal/notification/handlers/quotanotifier.go b/internal/notification/handlers/quota_notifier.go similarity index 96% rename from internal/notification/handlers/quotanotifier.go rename to internal/notification/handlers/quota_notifier.go index e53998490a..21daa92ffc 100644 --- a/internal/notification/handlers/quotanotifier.go +++ b/internal/notification/handlers/quota_notifier.go @@ -68,7 +68,7 @@ func (u *quotaNotifier) reduceNotificationDue(event eventstore.Event) (*handler. return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-DLxdE", "reduce.wrong.event.type %s", quota.NotificationDueEventType) } ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType) + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.AggregateType, quota.NotifiedEventType) if err != nil { return nil, err } diff --git a/internal/notification/handlers/telemetry_pusher.go b/internal/notification/handlers/telemetry_pusher.go new file mode 100644 index 0000000000..8a0de424f1 --- /dev/null +++ b/internal/notification/handlers/telemetry_pusher.go @@ -0,0 +1,150 @@ +package handlers + +import ( + "context" + "fmt" + "net/http" + + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/api/call" + "github.com/zitadel/zitadel/internal/command" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" + "github.com/zitadel/zitadel/internal/notification/channels/webhook" + _ "github.com/zitadel/zitadel/internal/notification/statik" + "github.com/zitadel/zitadel/internal/notification/types" + "github.com/zitadel/zitadel/internal/query" + "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/repository/milestone" + "github.com/zitadel/zitadel/internal/repository/pseudo" +) + +const ( + TelemetryProjectionTable = "projections.telemetry" +) + +type TelemetryPusherConfig struct { + Enabled bool + Endpoints []string + Headers http.Header + Limit uint64 +} + +type telemetryPusher struct { + crdb.StatementHandler + cfg TelemetryPusherConfig + commands *command.Commands + queries *NotificationQueries + metricSuccessfulDeliveriesJSON string + metricFailedDeliveriesJSON string +} + +func NewTelemetryPusher( + ctx context.Context, + telemetryCfg TelemetryPusherConfig, + handlerCfg crdb.StatementHandlerConfig, + commands *command.Commands, + queries *NotificationQueries, + metricSuccessfulDeliveriesJSON, + metricFailedDeliveriesJSON string, +) *telemetryPusher { + p := new(telemetryPusher) + handlerCfg.ProjectionName = TelemetryProjectionTable + handlerCfg.Reducers = p.reducers() + p.cfg = telemetryCfg + p.StatementHandler = crdb.NewStatementHandler(ctx, handlerCfg) + p.commands = commands + p.queries = queries + p.metricSuccessfulDeliveriesJSON = metricSuccessfulDeliveriesJSON + p.metricFailedDeliveriesJSON = metricFailedDeliveriesJSON + projection.TelemetryPusherProjection = p + return p +} + +func (t *telemetryPusher) reducers() []handler.AggregateReducer { + return []handler.AggregateReducer{{ + Aggregate: pseudo.AggregateType, + EventRedusers: []handler.EventReducer{{ + Event: pseudo.ScheduledEventType, + Reduce: t.pushMilestones, + }}, + }} +} + +func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) { + ctx := call.WithTimestamp(context.Background()) + scheduledEvent, ok := event.(*pseudo.ScheduledEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-lDTs5", "reduce.wrong.event.type %s", event.Type()) + } + + isReached, err := query.NewNotNullQuery(query.MilestoneReachedDateColID) + if err != nil { + return nil, err + } + isNotPushed, err := query.NewIsNullQuery(query.MilestonePushedDateColID) + if err != nil { + return nil, err + } + hasPrimaryDomain, err := query.NewNotNullQuery(query.MilestonePrimaryDomainColID) + if err != nil { + return nil, err + } + unpushedMilestones, err := t.queries.Queries.SearchMilestones(ctx, scheduledEvent.InstanceIDs, &query.MilestonesSearchQueries{ + SearchRequest: query.SearchRequest{ + Limit: t.cfg.Limit, + SortingColumn: query.MilestoneReachedDateColID, + Asc: true, + }, + Queries: []query.SearchQuery{isReached, isNotPushed, hasPrimaryDomain}, + }) + if err != nil { + return nil, err + } + var errs int + for _, ms := range unpushedMilestones.Milestones { + if err = t.pushMilestone(ctx, scheduledEvent, ms); err != nil { + errs++ + logging.Warnf("pushing milestone %+v failed: %s", *ms, err.Error()) + } + } + if errs > 0 { + return nil, fmt.Errorf("pushing %d of %d milestones failed", errs, unpushedMilestones.Count) + } + + return crdb.NewNoOpStatement(scheduledEvent), nil +} + +func (t *telemetryPusher) pushMilestone(ctx context.Context, event *pseudo.ScheduledEvent, ms *query.Milestone) error { + ctx = authz.WithInstanceID(ctx, ms.InstanceID) + alreadyHandled, err := t.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"type": ms.Type.String()}, milestone.AggregateType, milestone.PushedEventType) + if err != nil { + return err + } + if alreadyHandled { + return nil + } + for _, endpoint := range t.cfg.Endpoints { + if err := types.SendJSON( + ctx, + webhook.Config{ + CallURL: endpoint, + Method: http.MethodPost, + Headers: t.cfg.Headers, + }, + t.queries.GetFileSystemProvider, + t.queries.GetLogProvider, + ms, + event, + t.metricSuccessfulDeliveriesJSON, + t.metricFailedDeliveriesJSON, + ).WithoutTemplate(); err != nil { + return err + } + } + return t.commands.MilestonePushed(ctx, ms.Type, t.cfg.Endpoints, ms.PrimaryDomain) +} diff --git a/internal/notification/handlers/telemetry_pusher_integration_test.go b/internal/notification/handlers/telemetry_pusher_integration_test.go new file mode 100644 index 0000000000..86ed3d106d --- /dev/null +++ b/internal/notification/handlers/telemetry_pusher_integration_test.go @@ -0,0 +1,89 @@ +//go:build integration + +package handlers_test + +import ( + "bytes" + "encoding/json" + "io" + "net" + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/zitadel/zitadel/pkg/grpc/management" + "github.com/zitadel/zitadel/pkg/grpc/system" +) + +func TestServer_TelemetryPushMilestones(t *testing.T) { + bodies := make(chan []byte, 0) + mockServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Error(err) + } + if r.Header.Get("single-value") != "single-value" { + t.Error("single-value header not set") + } + if reflect.DeepEqual(r.Header.Get("multi-value"), "multi-value-1,multi-value-2") { + t.Error("single-value header not set") + } + bodies <- body + w.WriteHeader(http.StatusOK) + })) + listener, err := net.Listen("tcp", "localhost:8081") + if err != nil { + t.Fatal(err) + } + mockServer.Listener = listener + mockServer.Start() + t.Cleanup(mockServer.Close) + primaryDomain, instanceID, iamOwnerCtx := Tester.UseIsolatedInstance(CTX, SystemCTX) + t.Log("testing against instance with primary domain", primaryDomain) + awaitMilestone(t, bodies, primaryDomain, "InstanceCreated") + project, err := Tester.Client.Mgmt.AddProject(iamOwnerCtx, &management.AddProjectRequest{Name: "integration"}) + if err != nil { + t.Fatal(err) + } + awaitMilestone(t, bodies, primaryDomain, "ProjectCreated") + if _, err = Tester.Client.Mgmt.AddOIDCApp(iamOwnerCtx, &management.AddOIDCAppRequest{ + ProjectId: project.GetId(), + Name: "integration", + }); err != nil { + t.Fatal(err) + } + awaitMilestone(t, bodies, primaryDomain, "ApplicationCreated") + // TODO: trigger and await milestone AuthenticationSucceededOnInstance + // TODO: trigger and await milestone AuthenticationSucceededOnApplication + if _, err = Tester.Client.System.RemoveInstance(SystemCTX, &system.RemoveInstanceRequest{InstanceId: instanceID}); err != nil { + t.Fatal(err) + } + awaitMilestone(t, bodies, primaryDomain, "InstanceDeleted") +} + +func awaitMilestone(t *testing.T, bodies chan []byte, primaryDomain, expectMilestoneType string) { + for { + select { + case body := <-bodies: + plain := new(bytes.Buffer) + if err := json.Indent(plain, body, "", " "); err != nil { + t.Fatal(err) + } + t.Log("received milestone", plain.String()) + milestone := struct { + Type string + PrimaryDomain string + }{} + if err := json.Unmarshal(body, &milestone); err != nil { + t.Error(err) + } + if milestone.Type == expectMilestoneType && milestone.PrimaryDomain == primaryDomain { + return + } + case <-time.After(60 * time.Second): + t.Fatalf("timed out waiting for milestone %s in domain %s", expectMilestoneType, primaryDomain) + } + } +} diff --git a/internal/notification/handlers/usernotifier.go b/internal/notification/handlers/user_notifier.go similarity index 99% rename from internal/notification/handlers/usernotifier.go rename to internal/notification/handlers/user_notifier.go index cd2a48f09d..d726fb62a1 100644 --- a/internal/notification/handlers/usernotifier.go +++ b/internal/notification/handlers/user_notifier.go @@ -337,7 +337,7 @@ func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Sta return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType) } ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType, user.UserDomainClaimedType, user.UserDomainClaimedSentType) if err != nil { return nil, err @@ -465,7 +465,7 @@ func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.S return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType) } ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType) + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType, user.HumanPasswordChangeSentType) if err != nil { return nil, err } @@ -594,5 +594,5 @@ func (u *userNotifier) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, e if event.CreationDate().Add(expiry).Before(time.Now().UTC()) { return true, nil } - return u.queries.IsAlreadyHandled(ctx, event, data, eventTypes...) + return u.queries.IsAlreadyHandled(ctx, event, data, user.AggregateType, eventTypes...) } diff --git a/internal/notification/projections.go b/internal/notification/projections.go index f4abf3b7bc..a2b06aadd4 100644 --- a/internal/notification/projections.go +++ b/internal/notification/projections.go @@ -29,6 +29,8 @@ func Start( ctx context.Context, userHandlerCustomConfig projection.CustomConfig, quotaHandlerCustomConfig projection.CustomConfig, + telemetryHandlerCustomConfig projection.CustomConfig, + telemetryCfg handlers.TelemetryPusherConfig, externalPort uint16, externalSecure bool, commands *command.Commands, @@ -74,4 +76,15 @@ func Start( metricSuccessfulDeliveriesJSON, metricFailedDeliveriesJSON, ).Start() + if telemetryCfg.Enabled { + handlers.NewTelemetryPusher( + ctx, + telemetryCfg, + projection.ApplyCustomConfig(telemetryHandlerCustomConfig), + commands, + q, + metricSuccessfulDeliveriesJSON, + metricFailedDeliveriesJSON, + ).Start() + } } diff --git a/internal/query/milestone.go b/internal/query/milestone.go new file mode 100644 index 0000000000..b4767495b8 --- /dev/null +++ b/internal/query/milestone.go @@ -0,0 +1,146 @@ +package query + +import ( + "context" + "database/sql" + "time" + + sq "github.com/Masterminds/squirrel" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/api/call" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/repository/milestone" + "github.com/zitadel/zitadel/internal/telemetry/tracing" +) + +type Milestones struct { + SearchResponse + Milestones []*Milestone +} + +type Milestone struct { + InstanceID string + Type milestone.Type + ReachedDate time.Time + PushedDate time.Time + PrimaryDomain string +} + +type MilestonesSearchQueries struct { + SearchRequest + Queries []SearchQuery +} + +func (q *MilestonesSearchQueries) toQuery(query sq.SelectBuilder) sq.SelectBuilder { + query = q.SearchRequest.toQuery(query) + for _, q := range q.Queries { + query = q.toQuery(query) + } + return query +} + +var ( + milestonesTable = table{ + name: projection.MilestonesProjectionTable, + instanceIDCol: projection.MilestoneColumnInstanceID, + } + MilestoneInstanceIDColID = Column{ + name: projection.MilestoneColumnInstanceID, + table: milestonesTable, + } + MilestoneTypeColID = Column{ + name: projection.MilestoneColumnType, + table: milestonesTable, + } + MilestonePrimaryDomainColID = Column{ + name: projection.MilestoneColumnPrimaryDomain, + table: milestonesTable, + } + MilestoneReachedDateColID = Column{ + name: projection.MilestoneColumnReachedDate, + table: milestonesTable, + } + MilestonePushedDateColID = Column{ + name: projection.MilestoneColumnPushedDate, + table: milestonesTable, + } +) + +// SearchMilestones tries to defer the instanceID from the passed context if no instanceIDs are passed +func (q *Queries) SearchMilestones(ctx context.Context, instanceIDs []string, queries *MilestonesSearchQueries) (_ *Milestones, err error) { + ctx, span := tracing.NewSpan(ctx) + defer func() { span.EndWithError(err) }() + query, scan := prepareMilestonesQuery(ctx, q.client) + if len(instanceIDs) == 0 { + instanceIDs = []string{authz.GetInstance(ctx).InstanceID()} + } + stmt, args, err := queries.toQuery(query).Where(sq.Eq{MilestoneInstanceIDColID.identifier(): instanceIDs}).ToSql() + if err != nil { + return nil, errors.ThrowInternal(err, "QUERY-A9i5k", "Errors.Query.SQLStatement") + } + rows, err := q.client.QueryContext(ctx, stmt, args...) + if err != nil { + return nil, err + } + defer func() { + if closeErr := rows.Close(); closeErr != nil && err == nil { + err = errors.ThrowInternal(closeErr, "QUERY-CK9mI", "Errors.Query.CloseRows") + } + }() + milestones, err := scan(rows) + if err != nil { + return nil, err + } + if err = rows.Err(); err != nil { + return nil, errors.ThrowInternal(err, "QUERY-asLsI", "Errors.Internal") + } + milestones.LatestSequence, err = q.latestSequence(ctx, milestonesTable) + return milestones, err + +} + +func prepareMilestonesQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuilder, func(*sql.Rows) (*Milestones, error)) { + return sq.Select( + MilestoneInstanceIDColID.identifier(), + MilestonePrimaryDomainColID.identifier(), + MilestoneReachedDateColID.identifier(), + MilestonePushedDateColID.identifier(), + MilestoneTypeColID.identifier(), + countColumn.identifier(), + ). + From(milestonesTable.identifier() + db.Timetravel(call.Took(ctx))). + PlaceholderFormat(sq.Dollar), + func(rows *sql.Rows) (*Milestones, error) { + milestones := make([]*Milestone, 0) + var count uint64 + for rows.Next() { + m := new(Milestone) + reachedDate := sql.NullTime{} + pushedDate := sql.NullTime{} + primaryDomain := sql.NullString{} + err := rows.Scan( + &m.InstanceID, + &primaryDomain, + &reachedDate, + &pushedDate, + &m.Type, + &count, + ) + if err != nil { + return nil, err + } + m.PrimaryDomain = primaryDomain.String + m.ReachedDate = reachedDate.Time + m.PushedDate = pushedDate.Time + milestones = append(milestones, m) + } + return &Milestones{ + Milestones: milestones, + SearchResponse: SearchResponse{ + Count: count, + }, + }, nil + } +} diff --git a/internal/query/milestone_test.go b/internal/query/milestone_test.go new file mode 100644 index 0000000000..1c1852a59d --- /dev/null +++ b/internal/query/milestone_test.go @@ -0,0 +1,189 @@ +package query + +import ( + "database/sql" + "database/sql/driver" + "errors" + "fmt" + "regexp" + "testing" +) + +var ( + expectedMilestoneQuery = regexp.QuoteMeta(` + SELECT projections.milestones.instance_id, + projections.milestones.primary_domain, + projections.milestones.reached_date, + projections.milestones.last_pushed_date, + projections.milestones.type, + COUNT(*) OVER () + FROM projections.milestones AS OF SYSTEM TIME '-1 ms' + `) + + milestoneCols = []string{ + "instance_id", + "primary_domain", + "reached_date", + "last_pushed_date", + "type", + "ignore_client_ids", + } +) + +func Test_MilestonesPrepare(t *testing.T) { + type want struct { + sqlExpectations sqlExpectation + err checkErr + } + tests := []struct { + name string + prepare interface{} + want want + object interface{} + }{ + { + name: "prepareMilestonesQuery no result", + prepare: prepareMilestonesQuery, + want: want{ + sqlExpectations: mockQueries( + expectedMilestoneQuery, + nil, + nil, + ), + }, + object: &Milestones{Milestones: []*Milestone{}}, + }, + { + name: "prepareMilestonesQuery", + prepare: prepareMilestonesQuery, + want: want{ + sqlExpectations: mockQueries( + expectedMilestoneQuery, + milestoneCols, + [][]driver.Value{ + { + "instance-id", + "primary.domain", + testNow, + testNow, + 1, + 1, + }, + }, + ), + }, + object: &Milestones{ + SearchResponse: SearchResponse{ + Count: 1, + }, + Milestones: []*Milestone{ + { + InstanceID: "instance-id", + Type: 1, + ReachedDate: testNow, + PushedDate: testNow, + PrimaryDomain: "primary.domain", + }, + }, + }, + }, + { + name: "prepareMilestonesQuery multiple result", + prepare: prepareMilestonesQuery, + want: want{ + sqlExpectations: mockQueries( + expectedMilestoneQuery, + milestoneCols, + [][]driver.Value{ + { + "instance-id", + "primary.domain", + testNow, + testNow, + 1, + 1, + }, + { + "instance-id", + "primary.domain", + testNow, + testNow, + 2, + 2, + }, + { + "instance-id", + "primary.domain", + testNow, + nil, + 3, + 3, + }, + { + "instance-id", + "primary.domain", + nil, + nil, + 4, + 4, + }, + }, + ), + }, + object: &Milestones{ + SearchResponse: SearchResponse{ + Count: 4, + }, + Milestones: []*Milestone{ + { + InstanceID: "instance-id", + Type: 1, + ReachedDate: testNow, + PushedDate: testNow, + PrimaryDomain: "primary.domain", + }, + { + InstanceID: "instance-id", + Type: 2, + ReachedDate: testNow, + PushedDate: testNow, + PrimaryDomain: "primary.domain", + }, + { + InstanceID: "instance-id", + Type: 3, + ReachedDate: testNow, + PrimaryDomain: "primary.domain", + }, + { + InstanceID: "instance-id", + Type: 4, + PrimaryDomain: "primary.domain", + }, + }, + }, + }, + { + name: "prepareMilestonesQuery sql err", + prepare: prepareMilestonesQuery, + want: want{ + sqlExpectations: mockQueryErr( + expectedMilestoneQuery, + sql.ErrConnDone, + ), + err: func(err error) (error, bool) { + if !errors.Is(err, sql.ErrConnDone) { + return fmt.Errorf("err should be sql.ErrConnDone got: %w", err), false + } + return nil, true + }, + }, + object: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assertPrepare(t, tt.prepare, tt.object, tt.want.sqlExpectations, tt.want.err, defaultPrepareArgs...) + }) + } +} diff --git a/internal/query/projection/assert.go b/internal/query/projection/assert.go new file mode 100644 index 0000000000..369a6da698 --- /dev/null +++ b/internal/query/projection/assert.go @@ -0,0 +1,14 @@ +package projection + +import ( + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" +) + +func assertEvent[T eventstore.Event](event eventstore.Event) (T, error) { + e, ok := event.(T) + if !ok { + return e, errors.ThrowInvalidArgumentf(nil, "HANDL-1m9fS", "reduce.wrong.event.type %T", event) + } + return e, nil +} diff --git a/internal/query/projection/assert_test.go b/internal/query/projection/assert_test.go new file mode 100644 index 0000000000..010f0c81a9 --- /dev/null +++ b/internal/query/projection/assert_test.go @@ -0,0 +1,52 @@ +package projection + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/repository/instance" +) + +func Test_assertEvent(t *testing.T) { + type args struct { + event eventstore.Event + assertFunc func(eventstore.Event) (eventstore.Event, error) + } + type testCase struct { + name string + args args + wantErr assert.ErrorAssertionFunc + } + tests := []testCase{ + { + name: "correct event type", + args: args{ + event: instance.NewInstanceAddedEvent(context.Background(), &instance.NewAggregate("instance-id").Aggregate, "instance-name"), + assertFunc: func(event eventstore.Event) (eventstore.Event, error) { + return assertEvent[*instance.InstanceAddedEvent](event) + }, + }, + wantErr: assert.NoError, + }, { + name: "wrong event type", + args: args{ + event: instance.NewInstanceRemovedEvent(context.Background(), &instance.NewAggregate("instance-id").Aggregate, "instance-name", nil), + assertFunc: func(event eventstore.Event) (eventstore.Event, error) { + return assertEvent[*instance.InstanceAddedEvent](event) + }, + }, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := tt.args.assertFunc(tt.args.event) + if !tt.wantErr(t, err) { + return + } + }) + } +} diff --git a/internal/query/projection/event_test.go b/internal/query/projection/event_test.go index 29e32e95cf..40dd2234e1 100644 --- a/internal/query/projection/event_test.go +++ b/internal/query/projection/event_test.go @@ -14,12 +14,26 @@ func testEvent( eventType repository.EventType, aggregateType repository.AggregateType, data []byte, +) *repository.Event { + return timedTestEvent(eventType, aggregateType, data, time.Now()) +} + +func toSystemEvent(event *repository.Event) *repository.Event { + event.EditorService = "SYSTEM" + return event +} + +func timedTestEvent( + eventType repository.EventType, + aggregateType repository.AggregateType, + data []byte, + creationDate time.Time, ) *repository.Event { return &repository.Event{ Sequence: 15, PreviousAggregateSequence: 10, PreviousAggregateTypeSequence: 10, - CreationDate: time.Now(), + CreationDate: creationDate, Type: eventType, AggregateType: aggregateType, Data: data, diff --git a/internal/query/projection/label_policy.go b/internal/query/projection/label_policy.go index b8a84d9221..e62c39b4c5 100644 --- a/internal/query/projection/label_policy.go +++ b/internal/query/projection/label_policy.go @@ -402,10 +402,10 @@ func (p *labelPolicyProjection) reduceActivated(event eventstore.Event) (*handle handler.NewCol(LabelPolicyDarkLogoURLCol, nil), handler.NewCol(LabelPolicyDarkIconURLCol, nil), }, - []handler.Condition{ - handler.NewCond(LabelPolicyIDCol, event.Aggregate().ID), - handler.NewCond(LabelPolicyStateCol, domain.LabelPolicyStatePreview), - handler.NewCond(LabelPolicyInstanceIDCol, event.Aggregate().InstanceID), + []handler.NamespacedCondition{ + handler.NewNamespacedCondition(LabelPolicyIDCol, event.Aggregate().ID), + handler.NewNamespacedCondition(LabelPolicyStateCol, domain.LabelPolicyStatePreview), + handler.NewNamespacedCondition(LabelPolicyInstanceIDCol, event.Aggregate().InstanceID), }), nil } diff --git a/internal/query/projection/label_policy_test.go b/internal/query/projection/label_policy_test.go index f7190f6d52..88a82ea0cd 100644 --- a/internal/query/projection/label_policy_test.go +++ b/internal/query/projection/label_policy_test.go @@ -170,7 +170,7 @@ func TestLabelPolicyProjection_reduces(t *testing.T) { executer: &testExecuter{ executions: []execution{ { - expectedStmt: "INSERT INTO projections.label_policies2 (change_date, sequence, state, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url) SELECT $1, $2, $3, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url FROM projections.label_policies2 AS copy_table WHERE copy_table.id = $4 AND copy_table.state = $5 AND copy_table.instance_id = $6 ON CONFLICT (instance_id, id, state) DO UPDATE SET (change_date, sequence, state, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url) = ($1, $2, $3, EXCLUDED.creation_date, EXCLUDED.resource_owner, EXCLUDED.instance_id, EXCLUDED.id, EXCLUDED.is_default, EXCLUDED.hide_login_name_suffix, EXCLUDED.font_url, EXCLUDED.watermark_disabled, EXCLUDED.should_error_popup, EXCLUDED.light_primary_color, EXCLUDED.light_warn_color, EXCLUDED.light_background_color, EXCLUDED.light_font_color, EXCLUDED.light_logo_url, EXCLUDED.light_icon_url, EXCLUDED.dark_primary_color, EXCLUDED.dark_warn_color, EXCLUDED.dark_background_color, EXCLUDED.dark_font_color, EXCLUDED.dark_logo_url, EXCLUDED.dark_icon_url)", + expectedStmt: "INSERT INTO projections.label_policies2 (change_date, sequence, state, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url) SELECT $1, $2, $3, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url FROM projections.label_policies2 AS copy_table WHERE (copy_table.id = $4) AND (copy_table.state = $5) AND (copy_table.instance_id = $6) ON CONFLICT (instance_id, id, state) DO UPDATE SET (change_date, sequence, state, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url) = ($1, $2, $3, EXCLUDED.creation_date, EXCLUDED.resource_owner, EXCLUDED.instance_id, EXCLUDED.id, EXCLUDED.is_default, EXCLUDED.hide_login_name_suffix, EXCLUDED.font_url, EXCLUDED.watermark_disabled, EXCLUDED.should_error_popup, EXCLUDED.light_primary_color, EXCLUDED.light_warn_color, EXCLUDED.light_background_color, EXCLUDED.light_font_color, EXCLUDED.light_logo_url, EXCLUDED.light_icon_url, EXCLUDED.dark_primary_color, EXCLUDED.dark_warn_color, EXCLUDED.dark_background_color, EXCLUDED.dark_font_color, EXCLUDED.dark_logo_url, EXCLUDED.dark_icon_url)", expectedArgs: []interface{}{ anyArg{}, uint64(15), @@ -631,7 +631,7 @@ func TestLabelPolicyProjection_reduces(t *testing.T) { executer: &testExecuter{ executions: []execution{ { - expectedStmt: "INSERT INTO projections.label_policies2 (change_date, sequence, state, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url) SELECT $1, $2, $3, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url FROM projections.label_policies2 AS copy_table WHERE copy_table.id = $4 AND copy_table.state = $5 AND copy_table.instance_id = $6 ON CONFLICT (instance_id, id, state) DO UPDATE SET (change_date, sequence, state, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url) = ($1, $2, $3, EXCLUDED.creation_date, EXCLUDED.resource_owner, EXCLUDED.instance_id, EXCLUDED.id, EXCLUDED.is_default, EXCLUDED.hide_login_name_suffix, EXCLUDED.font_url, EXCLUDED.watermark_disabled, EXCLUDED.should_error_popup, EXCLUDED.light_primary_color, EXCLUDED.light_warn_color, EXCLUDED.light_background_color, EXCLUDED.light_font_color, EXCLUDED.light_logo_url, EXCLUDED.light_icon_url, EXCLUDED.dark_primary_color, EXCLUDED.dark_warn_color, EXCLUDED.dark_background_color, EXCLUDED.dark_font_color, EXCLUDED.dark_logo_url, EXCLUDED.dark_icon_url)", + expectedStmt: "INSERT INTO projections.label_policies2 (change_date, sequence, state, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url) SELECT $1, $2, $3, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url FROM projections.label_policies2 AS copy_table WHERE (copy_table.id = $4) AND (copy_table.state = $5) AND (copy_table.instance_id = $6) ON CONFLICT (instance_id, id, state) DO UPDATE SET (change_date, sequence, state, creation_date, resource_owner, instance_id, id, is_default, hide_login_name_suffix, font_url, watermark_disabled, should_error_popup, light_primary_color, light_warn_color, light_background_color, light_font_color, light_logo_url, light_icon_url, dark_primary_color, dark_warn_color, dark_background_color, dark_font_color, dark_logo_url, dark_icon_url) = ($1, $2, $3, EXCLUDED.creation_date, EXCLUDED.resource_owner, EXCLUDED.instance_id, EXCLUDED.id, EXCLUDED.is_default, EXCLUDED.hide_login_name_suffix, EXCLUDED.font_url, EXCLUDED.watermark_disabled, EXCLUDED.should_error_popup, EXCLUDED.light_primary_color, EXCLUDED.light_warn_color, EXCLUDED.light_background_color, EXCLUDED.light_font_color, EXCLUDED.light_logo_url, EXCLUDED.light_icon_url, EXCLUDED.dark_primary_color, EXCLUDED.dark_warn_color, EXCLUDED.dark_background_color, EXCLUDED.dark_font_color, EXCLUDED.dark_logo_url, EXCLUDED.dark_icon_url)", expectedArgs: []interface{}{ anyArg{}, uint64(15), diff --git a/internal/query/projection/milestones.go b/internal/query/projection/milestones.go new file mode 100644 index 0000000000..dbbf22a603 --- /dev/null +++ b/internal/query/projection/milestones.go @@ -0,0 +1,295 @@ +package projection + +import ( + "context" + "strconv" + "strings" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" + "github.com/zitadel/zitadel/internal/repository/instance" + "github.com/zitadel/zitadel/internal/repository/milestone" + "github.com/zitadel/zitadel/internal/repository/project" + "github.com/zitadel/zitadel/internal/repository/user" +) + +const ( + MilestonesProjectionTable = "projections.milestones" + + MilestoneColumnInstanceID = "instance_id" + MilestoneColumnType = "type" + MilestoneColumnPrimaryDomain = "primary_domain" + MilestoneColumnReachedDate = "reached_date" + MilestoneColumnPushedDate = "last_pushed_date" + MilestoneColumnIgnoreClientIDs = "ignore_client_ids" +) + +type milestoneProjection struct { + crdb.StatementHandler +} + +func newMilestoneProjection(ctx context.Context, config crdb.StatementHandlerConfig) *milestoneProjection { + p := new(milestoneProjection) + config.ProjectionName = MilestonesProjectionTable + config.Reducers = p.reducers() + config.InitCheck = crdb.NewMultiTableCheck( + crdb.NewTable([]*crdb.Column{ + crdb.NewColumn(MilestoneColumnInstanceID, crdb.ColumnTypeText), + crdb.NewColumn(MilestoneColumnType, crdb.ColumnTypeEnum), + crdb.NewColumn(MilestoneColumnReachedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()), + crdb.NewColumn(MilestoneColumnPushedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()), + crdb.NewColumn(MilestoneColumnPrimaryDomain, crdb.ColumnTypeText, crdb.Nullable()), + crdb.NewColumn(MilestoneColumnIgnoreClientIDs, crdb.ColumnTypeTextArray, crdb.Nullable()), + }, + crdb.NewPrimaryKey(MilestoneColumnInstanceID, MilestoneColumnType), + ), + ) + p.StatementHandler = crdb.NewStatementHandler(ctx, config) + return p +} + +func (p *milestoneProjection) reducers() []handler.AggregateReducer { + return []handler.AggregateReducer{ + { + Aggregate: instance.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: instance.InstanceAddedEventType, + Reduce: p.reduceInstanceAdded, + }, + { + Event: instance.InstanceDomainPrimarySetEventType, + Reduce: p.reduceInstanceDomainPrimarySet, + }, + { + Event: instance.InstanceRemovedEventType, + Reduce: p.reduceInstanceRemoved, + }, + }, + }, + { + Aggregate: project.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: project.ProjectAddedType, + Reduce: p.reduceProjectAdded, + }, + { + Event: project.ApplicationAddedType, + Reduce: p.reduceApplicationAdded, + }, + { + Event: project.OIDCConfigAddedType, + Reduce: p.reduceOIDCConfigAdded, + }, + { + Event: project.APIConfigAddedType, + Reduce: p.reduceAPIConfigAdded, + }, + }, + }, + { + Aggregate: user.AggregateType, + EventRedusers: []handler.EventReducer{ + { + // user.UserTokenAddedType is not emitted on creation of personal access tokens + // PATs have no effect on milestone.AuthenticationSucceededOnApplication or milestone.AuthenticationSucceededOnInstance + Event: user.UserTokenAddedType, + Reduce: p.reduceUserTokenAdded, + }, + }, + }, + { + Aggregate: milestone.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: milestone.PushedEventType, + Reduce: p.reduceMilestonePushed, + }, + }, + }, + } +} + +func (p *milestoneProjection) reduceInstanceAdded(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*instance.InstanceAddedEvent](event) + if err != nil { + return nil, err + } + allTypes := milestone.AllTypes() + statements := make([]func(eventstore.Event) crdb.Exec, 0, len(allTypes)) + for _, msType := range allTypes { + createColumns := []handler.Column{ + handler.NewCol(MilestoneColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCol(MilestoneColumnType, msType), + } + if msType == milestone.InstanceCreated { + createColumns = append(createColumns, handler.NewCol(MilestoneColumnReachedDate, event.CreationDate())) + } + statements = append(statements, crdb.AddCreateStatement(createColumns)) + } + return crdb.NewMultiStatement(e, statements...), nil +} + +func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*instance.DomainPrimarySetEvent](event) + if err != nil { + return nil, err + } + return crdb.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(MilestoneColumnPrimaryDomain, e.Domain), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, e.Aggregate().InstanceID), + crdb.NewIsNullCond(MilestoneColumnPushedDate), + }, + ), nil +} + +func (p *milestoneProjection) reduceProjectAdded(event eventstore.Event) (*handler.Statement, error) { + if _, err := assertEvent[*project.ProjectAddedEvent](event); err != nil { + return nil, err + } + return p.reduceReachedIfUserEventFunc(milestone.ProjectCreated)(event) +} + +func (p *milestoneProjection) reduceApplicationAdded(event eventstore.Event) (*handler.Statement, error) { + if _, err := assertEvent[*project.ApplicationAddedEvent](event); err != nil { + return nil, err + } + return p.reduceReachedIfUserEventFunc(milestone.ApplicationCreated)(event) +} + +func (p *milestoneProjection) reduceOIDCConfigAdded(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*project.OIDCConfigAddedEvent](event) + if err != nil { + return nil, err + } + return p.reduceAppConfigAdded(e, e.ClientID) +} + +func (p *milestoneProjection) reduceAPIConfigAdded(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*project.APIConfigAddedEvent](event) + if err != nil { + return nil, err + } + return p.reduceAppConfigAdded(e, e.ClientID) +} + +func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*user.UserTokenAddedEvent](event) + if err != nil { + return nil, err + } + statements := []func(eventstore.Event) crdb.Exec{ + crdb.AddUpdateStatement( + []handler.Column{ + handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnType, milestone.AuthenticationSucceededOnInstance), + crdb.NewIsNullCond(MilestoneColumnReachedDate), + }, + ), + } + // We ignore authentications without app, for example JWT profile or PAT + if e.ApplicationID != "" { + statements = append(statements, crdb.AddUpdateStatement( + []handler.Column{ + handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnType, milestone.AuthenticationSucceededOnApplication), + crdb.Not(crdb.NewTextArrayContainsCond(MilestoneColumnIgnoreClientIDs, e.ApplicationID)), + crdb.NewIsNullCond(MilestoneColumnReachedDate), + }, + )) + } + return crdb.NewMultiStatement(e, statements...), nil +} + +func (p *milestoneProjection) reduceInstanceRemoved(event eventstore.Event) (*handler.Statement, error) { + if _, err := assertEvent[*instance.InstanceRemovedEvent](event); err != nil { + return nil, err + } + return p.reduceReachedFunc(milestone.InstanceDeleted)(event) +} + +func (p *milestoneProjection) reduceMilestonePushed(event eventstore.Event) (*handler.Statement, error) { + e, err := assertEvent[*milestone.PushedEvent](event) + if err != nil { + return nil, err + } + if e.MilestoneType != milestone.InstanceDeleted { + return crdb.NewUpdateStatement( + event, + []handler.Column{ + handler.NewCol(MilestoneColumnPushedDate, event.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnType, e.MilestoneType), + }, + ), nil + } + return crdb.NewDeleteStatement( + event, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + }, + ), nil +} + +func (p *milestoneProjection) reduceReachedIfUserEventFunc(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) { + return func(event eventstore.Event) (*handler.Statement, error) { + if p.isSystemEvent(event) { + return crdb.NewNoOpStatement(event), nil + } + return p.reduceReachedFunc(msType)(event) + } +} + +func (p *milestoneProjection) reduceReachedFunc(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) { + return func(event eventstore.Event) (*handler.Statement, error) { + return crdb.NewUpdateStatement(event, []handler.Column{ + handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnType, msType), + crdb.NewIsNullCond(MilestoneColumnReachedDate), + }), nil + } +} + +func (p *milestoneProjection) reduceAppConfigAdded(event eventstore.Event, clientID string) (*handler.Statement, error) { + if !p.isSystemEvent(event) { + return crdb.NewNoOpStatement(event), nil + } + return crdb.NewUpdateStatement( + event, + []handler.Column{ + crdb.NewArrayAppendCol(MilestoneColumnIgnoreClientIDs, clientID), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnType, milestone.AuthenticationSucceededOnApplication), + crdb.NewIsNullCond(MilestoneColumnReachedDate), + }, + ), nil +} + +func (p *milestoneProjection) isSystemEvent(event eventstore.Event) bool { + if userId, err := strconv.Atoi(event.EditorUser()); err == nil && userId > 0 { + return false + } + lowerEditorService := strings.ToLower(event.EditorService()) + return lowerEditorService == "" || + lowerEditorService == "system" || + lowerEditorService == "system-api" +} diff --git a/internal/query/projection/milestones_test.go b/internal/query/projection/milestones_test.go new file mode 100644 index 0000000000..5138fc1573 --- /dev/null +++ b/internal/query/projection/milestones_test.go @@ -0,0 +1,404 @@ +package projection + +import ( + "testing" + "time" + + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/eventstore/repository" + "github.com/zitadel/zitadel/internal/repository/instance" + "github.com/zitadel/zitadel/internal/repository/milestone" + "github.com/zitadel/zitadel/internal/repository/project" + "github.com/zitadel/zitadel/internal/repository/user" +) + +func TestMilestonesProjection_reduces(t *testing.T) { + type args struct { + event func(t *testing.T) eventstore.Event + } + now := time.Now() + tests := []struct { + name string + args args + reduce func(event eventstore.Event) (*handler.Statement, error) + want wantReduce + }{ + { + name: "reduceInstanceAdded", + args: args{ + event: getEvent(timedTestEvent( + repository.EventType(instance.InstanceAddedEventType), + instance.AggregateType, + []byte(`{}`), + now, + ), instance.InstanceAddedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceInstanceAdded, + want: wantReduce{ + aggregateType: eventstore.AggregateType("instance"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "INSERT INTO projections.milestones (instance_id, type, reached_date) VALUES ($1, $2, $3)", + expectedArgs: []interface{}{ + "instance-id", + milestone.InstanceCreated, + now, + }, + }, + { + expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)", + expectedArgs: []interface{}{ + "instance-id", + milestone.AuthenticationSucceededOnInstance, + }, + }, + { + expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)", + expectedArgs: []interface{}{ + "instance-id", + milestone.ProjectCreated, + }, + }, + { + expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)", + expectedArgs: []interface{}{ + "instance-id", + milestone.ApplicationCreated, + }, + }, + { + expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)", + expectedArgs: []interface{}{ + "instance-id", + milestone.AuthenticationSucceededOnApplication, + }, + }, + { + expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)", + expectedArgs: []interface{}{ + "instance-id", + milestone.InstanceDeleted, + }, + }, + }, + }, + }, + }, + { + name: "reduceInstancePrimaryDomainSet", + args: args{ + event: getEvent(testEvent( + repository.EventType(instance.InstanceDomainPrimarySetEventType), + instance.AggregateType, + []byte(`{"domain": "my.domain"}`), + ), instance.DomainPrimarySetEventMapper), + }, + reduce: (&milestoneProjection{}).reduceInstanceDomainPrimarySet, + want: wantReduce{ + aggregateType: eventstore.AggregateType("instance"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "UPDATE projections.milestones SET primary_domain = $1 WHERE (instance_id = $2) AND (last_pushed_date IS NULL)", + expectedArgs: []interface{}{ + "my.domain", + "instance-id", + }, + }, + }, + }, + }, + }, + { + name: "reduceProjectAdded", + args: args{ + event: getEvent(timedTestEvent( + repository.EventType(project.ProjectAddedType), + project.AggregateType, + []byte(`{}`), + now, + ), project.ProjectAddedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceProjectAdded, + want: wantReduce{ + aggregateType: eventstore.AggregateType("project"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)", + expectedArgs: []interface{}{ + now, + "instance-id", + milestone.ProjectCreated, + }, + }, + }, + }, + }, + }, + { + name: "reduceApplicationAdded", + args: args{ + event: getEvent(timedTestEvent( + repository.EventType(project.ApplicationAddedType), + project.AggregateType, + []byte(`{}`), + now, + ), project.ApplicationAddedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceApplicationAdded, + want: wantReduce{ + aggregateType: eventstore.AggregateType("project"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)", + expectedArgs: []interface{}{ + now, + "instance-id", + milestone.ApplicationCreated, + }, + }, + }, + }, + }, + }, + { + name: "reduceOIDCConfigAdded user event", + args: args{ + event: getEvent(testEvent( + repository.EventType(project.OIDCConfigAddedType), + project.AggregateType, + []byte(`{}`), + ), project.OIDCConfigAddedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceOIDCConfigAdded, + want: wantReduce{ + aggregateType: eventstore.AggregateType("project"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{}, + }, + }, + { + name: "reduceOIDCConfigAdded system event", + args: args{ + event: getEvent(toSystemEvent(testEvent( + repository.EventType(project.OIDCConfigAddedType), + project.AggregateType, + []byte(`{"clientId": "client-id"}`), + )), project.OIDCConfigAddedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceOIDCConfigAdded, + want: wantReduce{ + aggregateType: eventstore.AggregateType("project"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "UPDATE projections.milestones SET ignore_client_ids = array_append(ignore_client_ids, $1) WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)", + expectedArgs: []interface{}{ + "client-id", + "instance-id", + milestone.AuthenticationSucceededOnApplication, + }, + }, + }, + }, + }, + }, + { + name: "reduceAPIConfigAdded user event", + args: args{ + event: getEvent(testEvent( + repository.EventType(project.APIConfigAddedType), + project.AggregateType, + []byte(`{}`), + ), project.APIConfigAddedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceAPIConfigAdded, + want: wantReduce{ + aggregateType: eventstore.AggregateType("project"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{}, + }, + }, + { + name: "reduceAPIConfigAdded system event", + args: args{ + event: getEvent(toSystemEvent(testEvent( + repository.EventType(project.APIConfigAddedType), + project.AggregateType, + []byte(`{"clientId": "client-id"}`), + )), project.APIConfigAddedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceAPIConfigAdded, + want: wantReduce{ + aggregateType: eventstore.AggregateType("project"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "UPDATE projections.milestones SET ignore_client_ids = array_append(ignore_client_ids, $1) WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)", + expectedArgs: []interface{}{ + "client-id", + "instance-id", + milestone.AuthenticationSucceededOnApplication, + }, + }, + }, + }, + }, + }, + { + name: "reduceUserTokenAdded", + args: args{ + event: getEvent(timedTestEvent( + repository.EventType(user.UserTokenAddedType), + user.AggregateType, + []byte(`{"applicationId": "client-id"}`), + now, + ), user.UserTokenAddedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceUserTokenAdded, + want: wantReduce{ + aggregateType: eventstore.AggregateType("user"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + // TODO: This can be optimized to only use one statement with OR + executions: []execution{ + { + expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)", + expectedArgs: []interface{}{ + now, + "instance-id", + milestone.AuthenticationSucceededOnInstance, + }, + }, + { + expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (NOT (ignore_client_ids @> $4)) AND (reached_date IS NULL)", + expectedArgs: []interface{}{ + now, + "instance-id", + milestone.AuthenticationSucceededOnApplication, + database.StringArray{"client-id"}, + }, + }, + }, + }, + }, + }, + { + name: "reduceInstanceRemoved", + args: args{ + event: getEvent(timedTestEvent( + repository.EventType(instance.InstanceRemovedEventType), + instance.AggregateType, + []byte(`{}`), + now, + ), instance.InstanceRemovedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceInstanceRemoved, + want: wantReduce{ + aggregateType: eventstore.AggregateType("instance"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)", + expectedArgs: []interface{}{ + now, + "instance-id", + milestone.InstanceDeleted, + }, + }, + }, + }, + }, + }, + { + name: "reduceMilestonePushed normal milestone", + args: args{ + event: getEvent(timedTestEvent( + repository.EventType(milestone.PushedEventType), + milestone.AggregateType, + []byte(`{"type": "ProjectCreated"}`), + now, + ), milestone.PushedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceMilestonePushed, + want: wantReduce{ + aggregateType: eventstore.AggregateType("milestone"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "UPDATE projections.milestones SET last_pushed_date = $1 WHERE (instance_id = $2) AND (type = $3)", + expectedArgs: []interface{}{ + now, + "instance-id", + milestone.ProjectCreated, + }, + }, + }, + }, + }, + }, + { + name: "reduceMilestonePushed instance deleted milestone", + args: args{ + event: getEvent(testEvent( + repository.EventType(milestone.PushedEventType), + milestone.AggregateType, + []byte(`{"type": "InstanceDeleted"}`), + ), milestone.PushedEventMapper), + }, + reduce: (&milestoneProjection{}).reduceMilestonePushed, + want: wantReduce{ + aggregateType: eventstore.AggregateType("milestone"), + sequence: 15, + previousSequence: 10, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "DELETE FROM projections.milestones WHERE (instance_id = $1)", + expectedArgs: []interface{}{ + "instance-id", + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + event := baseEvent(t) + got, err := tt.reduce(event) + if !errors.IsErrorInvalidArgument(err) { + t.Errorf("no wrong event mapping: %v, got: %v", err, got) + } + event = tt.args.event(t) + got, err = tt.reduce(event) + assertReduce(t, got, err, MilestonesProjectionTable, tt.want) + }) + } +} diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 45670b8bac..6da0638347 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -64,8 +64,10 @@ var ( NotificationPolicyProjection *notificationPolicyProjection NotificationsProjection interface{} NotificationsQuotaProjection interface{} + TelemetryPusherProjection interface{} DeviceAuthProjection *deviceAuthProjection SessionProjection *sessionProjection + MilestoneProjection *milestoneProjection ) type projection interface { @@ -143,6 +145,7 @@ func Create(ctx context.Context, sqlClient *database.DB, es *eventstore.Eventsto NotificationPolicyProjection = newNotificationPolicyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["notification_policies"])) DeviceAuthProjection = newDeviceAuthProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["device_auth"])) SessionProjection = newSessionProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["sessions"])) + MilestoneProjection = newMilestoneProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["milestones"])) newProjectionsList() return nil } @@ -191,7 +194,7 @@ func applyCustomConfig(config crdb.StatementHandlerConfig, customConfig CustomCo // as setup and start currently create them individually, we make sure we get the right one // will be refactored when changing to new id based projections // -// NotificationsProjection is not added here, because it does not statement based / has no proprietary projection table +// Event handlers NotificationsProjection, NotificationsQuotaProjection and NotificationsProjection are not added here, because they do not reduce to database statements func newProjectionsList() { projections = []projection{ OrgProjection, @@ -240,5 +243,6 @@ func newProjectionsList() { NotificationPolicyProjection, DeviceAuthProjection, SessionProjection, + MilestoneProjection, } } diff --git a/internal/query/search_query.go b/internal/query/search_query.go index 0f4314dd05..16d37502d2 100644 --- a/internal/query/search_query.go +++ b/internal/query/search_query.go @@ -66,6 +66,27 @@ func (q *NotNullQuery) comp() sq.Sqlizer { return sq.NotEq{q.Column.identifier(): nil} } +type IsNullQuery struct { + Column Column +} + +func NewIsNullQuery(col Column) (*IsNullQuery, error) { + if col.isZero() { + return nil, ErrMissingColumn + } + return &IsNullQuery{ + Column: col, + }, nil +} + +func (q *IsNullQuery) toQuery(query sq.SelectBuilder) sq.SelectBuilder { + return query.Where(q.comp()) +} + +func (q *IsNullQuery) comp() sq.Sqlizer { + return sq.Eq{q.Column.identifier(): nil} +} + type orQuery struct { queries []SearchQuery } diff --git a/internal/repository/milestone/aggregate.go b/internal/repository/milestone/aggregate.go new file mode 100644 index 0000000000..bb9ca99cb3 --- /dev/null +++ b/internal/repository/milestone/aggregate.go @@ -0,0 +1,30 @@ +package milestone + +import ( + "context" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/eventstore" +) + +const ( + AggregateType = "milestone" + AggregateVersion = "v1" +) + +type Aggregate struct { + eventstore.Aggregate +} + +func NewAggregate(ctx context.Context, id string) *Aggregate { + instanceID := authz.GetInstance(ctx).InstanceID() + return &Aggregate{ + Aggregate: eventstore.Aggregate{ + Type: AggregateType, + Version: AggregateVersion, + ID: id, + ResourceOwner: instanceID, + InstanceID: instanceID, + }, + } +} diff --git a/internal/repository/milestone/events.go b/internal/repository/milestone/events.go new file mode 100644 index 0000000000..5ec2f96f93 --- /dev/null +++ b/internal/repository/milestone/events.go @@ -0,0 +1,54 @@ +package milestone + +import ( + "context" + + "github.com/zitadel/zitadel/internal/eventstore" +) + +const ( + eventTypePrefix = eventstore.EventType("milestone.") + PushedEventType = eventTypePrefix + "pushed" +) + +type PushedEvent struct { + *eventstore.BaseEvent `json:"-"` + MilestoneType Type `json:"type"` + ExternalDomain string `json:"externalDomain"` + PrimaryDomain string `json:"primaryDomain"` + Endpoints []string `json:"endpoints"` +} + +func (p *PushedEvent) Data() interface{} { + return p +} + +func (p *PushedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { + return nil +} + +func (p *PushedEvent) SetBaseEvent(b *eventstore.BaseEvent) { + p.BaseEvent = b +} + +var PushedEventMapper = eventstore.GenericEventMapper[PushedEvent] + +func NewPushedEvent( + ctx context.Context, + aggregate *Aggregate, + msType Type, + endpoints []string, + externalDomain, primaryDomain string, +) *PushedEvent { + return &PushedEvent{ + BaseEvent: eventstore.NewBaseEventForPush( + ctx, + &aggregate.Aggregate, + PushedEventType, + ), + MilestoneType: msType, + Endpoints: endpoints, + ExternalDomain: externalDomain, + PrimaryDomain: primaryDomain, + } +} diff --git a/internal/repository/milestone/eventstore.go b/internal/repository/milestone/eventstore.go new file mode 100644 index 0000000000..4f82b08195 --- /dev/null +++ b/internal/repository/milestone/eventstore.go @@ -0,0 +1,9 @@ +package milestone + +import ( + "github.com/zitadel/zitadel/internal/eventstore" +) + +func RegisterEventMappers(es *eventstore.Eventstore) { + es.RegisterFilterEventMapper(AggregateType, PushedEventType, PushedEventMapper) +} diff --git a/internal/repository/milestone/type.go b/internal/repository/milestone/type.go new file mode 100644 index 0000000000..f57bb032ee --- /dev/null +++ b/internal/repository/milestone/type.go @@ -0,0 +1,59 @@ +//go:generate stringer -type Type + +package milestone + +import ( + "fmt" + "strings" +) + +type Type int + +const ( + unknown Type = iota + + InstanceCreated + AuthenticationSucceededOnInstance + ProjectCreated + ApplicationCreated + AuthenticationSucceededOnApplication + InstanceDeleted + + typesCount +) + +func AllTypes() []Type { + types := make([]Type, typesCount-1) + for i := Type(1); i < typesCount; i++ { + types[i-1] = i + } + return types +} + +func (t *Type) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, t.String())), nil +} + +func (t *Type) UnmarshalJSON(data []byte) error { + *t = typeFromString(strings.Trim(string(data), `"`)) + return nil +} + +func typeFromString(t string) Type { + switch t { + case InstanceCreated.String(): + return InstanceCreated + case AuthenticationSucceededOnInstance.String(): + return AuthenticationSucceededOnInstance + case ProjectCreated.String(): + return ProjectCreated + case ApplicationCreated.String(): + return ApplicationCreated + case AuthenticationSucceededOnApplication.String(): + return AuthenticationSucceededOnApplication + case InstanceDeleted.String(): + return InstanceDeleted + default: + return unknown + } +} diff --git a/internal/repository/milestone/type_string.go b/internal/repository/milestone/type_string.go new file mode 100644 index 0000000000..ce6a98441c --- /dev/null +++ b/internal/repository/milestone/type_string.go @@ -0,0 +1,30 @@ +// Code generated by "stringer -type Type"; DO NOT EDIT. + +package milestone + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[unknown-0] + _ = x[InstanceCreated-1] + _ = x[AuthenticationSucceededOnInstance-2] + _ = x[ProjectCreated-3] + _ = x[ApplicationCreated-4] + _ = x[AuthenticationSucceededOnApplication-5] + _ = x[InstanceDeleted-6] + _ = x[typesCount-7] +} + +const _Type_name = "unknownInstanceCreatedAuthenticationSucceededOnInstanceProjectCreatedApplicationCreatedAuthenticationSucceededOnApplicationInstanceDeletedtypesCount" + +var _Type_index = [...]uint8{0, 7, 22, 55, 69, 87, 123, 138, 148} + +func (i Type) String() string { + if i < 0 || i >= Type(len(_Type_index)-1) { + return "Type(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _Type_name[_Type_index[i]:_Type_index[i+1]] +} diff --git a/internal/repository/pseudo/aggregate.go b/internal/repository/pseudo/aggregate.go new file mode 100644 index 0000000000..811f566615 --- /dev/null +++ b/internal/repository/pseudo/aggregate.go @@ -0,0 +1,21 @@ +package pseudo + +import "github.com/zitadel/zitadel/internal/eventstore" + +const ( + AggregateType = "pseudo" + AggregateVersion = "v1" +) + +type Aggregate struct { + eventstore.Aggregate +} + +func NewAggregate() *Aggregate { + return &Aggregate{ + Aggregate: eventstore.Aggregate{ + Type: AggregateType, + Version: AggregateVersion, + }, + } +} diff --git a/internal/repository/pseudo/events.go b/internal/repository/pseudo/events.go new file mode 100644 index 0000000000..0baaa9368f --- /dev/null +++ b/internal/repository/pseudo/events.go @@ -0,0 +1,40 @@ +// Package pseudo contains virtual events, that are not stored in the eventstore. +package pseudo + +import ( + "context" + "time" + + "github.com/zitadel/zitadel/internal/eventstore" +) + +const ( + eventTypePrefix = eventstore.EventType("pseudo.") + ScheduledEventType = eventTypePrefix + "timestamp" +) + +var _ eventstore.Event = (*ScheduledEvent)(nil) + +type ScheduledEvent struct { + *eventstore.BaseEvent `json:"-"` + Timestamp time.Time `json:"-"` + InstanceIDs []string `json:"-"` +} + +// NewScheduledEvent returns an event that can be processed by event handlers like any other event. +// It receives the current timestamp and an ID list of instances that are active and should be processed. +func NewScheduledEvent( + ctx context.Context, + timestamp time.Time, + instanceIDs ...string, +) *ScheduledEvent { + return &ScheduledEvent{ + BaseEvent: eventstore.NewBaseEventForPush( + ctx, + &NewAggregate().Aggregate, + ScheduledEventType, + ), + Timestamp: timestamp, + InstanceIDs: instanceIDs, + } +}