Skip to content

Commit 727ec43

Browse files
committed
Added ElasticSearch left fold projections
Cleaned EventStoreDB README links Minor updates in other places
1 parent e624f4b commit 727ec43

File tree

21 files changed

+172
-71
lines changed

21 files changed

+172
-71
lines changed

Core.ElasticSearch/Config.cs

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using System;
2+
using Microsoft.Extensions.Configuration;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using Nest;
5+
6+
namespace Core.ElasticSearch
7+
{
8+
public class ElasticSearchConfig
9+
{
10+
public string Url { get; set; } = default!;
11+
public string DefaultIndex { get; set; } = default!;
12+
}
13+
14+
public static class ElasticSearchConfigExtensions
15+
{
16+
private const string DefaultConfigKey = "ElasticSearch";
17+
public static void AddElasticsearch(
18+
this IServiceCollection services, IConfiguration configuration, Action<ConnectionSettings>? config = null)
19+
{
20+
var elasticSearchConfig = configuration.GetSection(DefaultConfigKey).Get<ElasticSearchConfig>();
21+
22+
var settings = new ConnectionSettings(new Uri(elasticSearchConfig.Url))
23+
.DefaultIndex(elasticSearchConfig.DefaultIndex);
24+
25+
config?.Invoke(settings);
26+
27+
var client = new ElasticClient(settings);
28+
29+
services.AddSingleton<IElasticClient>(client);
30+
}
31+
}
32+
}

Core.ElasticSearch/Core.ElasticSearch.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<ItemGroup>
1111
<PackageReference Include="NEST" Version="7.13.0" />
1212
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="5.0.0" />
13+
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="5.0.0" />
1314
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
1415
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
1516
</ItemGroup>

Core.ElasticSearch/ElasticSearchConfig.cs

-26
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Linq;
4+
5+
namespace Core.ElasticSearch.Indices
6+
{
7+
public class IndexNameMapper
8+
{
9+
private static readonly IndexNameMapper Instance = new();
10+
11+
private readonly ConcurrentDictionary<Type, string> typeNameMap = new();
12+
13+
public static void AddCustomMap<TStream>(string mappedStreamName) =>
14+
AddCustomMap(typeof(TStream), mappedStreamName);
15+
16+
public static void AddCustomMap(Type streamType, string mappedStreamName)
17+
{
18+
Instance.typeNameMap.AddOrUpdate(streamType, mappedStreamName, (_, _) => mappedStreamName);
19+
}
20+
21+
public static string ToIndexPrefix<TStream>() => ToIndexPrefix(typeof(TStream));
22+
23+
public static string ToIndexPrefix(Type streamType) => Instance.typeNameMap.GetOrAdd(streamType, (_) =>
24+
{
25+
var modulePrefix = streamType.Namespace!.Split(".").First();
26+
return $"{modulePrefix}-{streamType.Name}".ToLower();
27+
});
28+
29+
public static string ToIndexName<TStream>(object? tenantId = null) =>
30+
ToIndexName(typeof(TStream));
31+
32+
public static string ToIndexName(Type streamType, object? tenantId = null)
33+
{
34+
var tenantPrefix = tenantId != null ? $"{tenantId}-" : "";
35+
36+
return $"{tenantPrefix}{ToIndexPrefix(streamType)}".ToLower();
37+
}
38+
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Core.ElasticSearch.Indices;
5+
using Core.Events;
6+
using Core.Projections;
7+
using MediatR;
8+
using Microsoft.Extensions.DependencyInjection;
9+
using Nest;
10+
11+
namespace Core.ElasticSearch.Projections
12+
{
13+
public class ElasticSearchProjection<TEvent, TView> : IEventHandler<TEvent>
14+
where TView : class, IProjection
15+
where TEvent : IEvent
16+
{
17+
private readonly IElasticClient elasticClient;
18+
private readonly Func<TEvent, string> getId;
19+
20+
public ElasticSearchProjection(
21+
IElasticClient elasticClient,
22+
Func<TEvent, string> getId
23+
)
24+
{
25+
this.elasticClient = elasticClient ?? throw new ArgumentNullException(nameof(elasticClient));
26+
this.getId = getId ?? throw new ArgumentNullException(nameof(getId));
27+
}
28+
29+
public async Task Handle(TEvent @event, CancellationToken ct)
30+
{
31+
string id = getId(@event);
32+
33+
var entity = (await elasticClient.GetAsync<TView>(id, ct: ct))?.Source
34+
?? (TView) Activator.CreateInstance(typeof(TView), true)!;
35+
36+
entity.When(@event);
37+
38+
var result = await elasticClient.UpdateAsync<TView>(id,
39+
u => u.Doc(entity).Upsert(entity).Index(IndexNameMapper.ToIndexName<TView>()),
40+
ct
41+
);
42+
}
43+
}
44+
45+
public static class ElasticSearchProjectionConfig
46+
{
47+
public static IServiceCollection Project<TEvent, TView>(this IServiceCollection services,
48+
Func<TEvent, string> getId)
49+
where TView : class, IProjection
50+
where TEvent : IEvent
51+
{
52+
services.AddTransient<INotificationHandler<TEvent>>(sp =>
53+
{
54+
var session = sp.GetRequiredService<IElasticClient>();
55+
56+
return new ElasticSearchProjection<TEvent, TView>(session, getId);
57+
});
58+
59+
return services;
60+
}
61+
}
62+
}

Core.ElasticSearch/Repository/ElasticSearchRepository.cs

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
using System;
22
using System.Threading;
33
using System.Threading.Tasks;
4-
using Core.Aggregates;
4+
using Core.ElasticSearch.Indices;
55
using Core.Events;
6-
using Core.Repositories;
6+
using Nest;
7+
using IAggregate = Core.Aggregates.IAggregate;
78

89
namespace Core.ElasticSearch.Repository
910
{
10-
public class ElasticSearchRepository<T>: IRepository<T> where T : class, IAggregate, new()
11+
public class ElasticSearchRepository<T>: Repositories.IRepository<T> where T : class, IAggregate, new()
1112
{
12-
private readonly Nest.IElasticClient elasticClient;
13+
private readonly IElasticClient elasticClient;
1314
private readonly IEventBus eventBus;
1415

1516
public ElasticSearchRepository(
16-
Nest.IElasticClient elasticClient,
17+
IElasticClient elasticClient,
1718
IEventBus eventBus
1819
)
1920
{
@@ -29,12 +30,12 @@ IEventBus eventBus
2930

3031
public Task Add(T aggregate, CancellationToken cancellationToken)
3132
{
32-
return elasticClient.IndexAsync(aggregate, i => i.Id(aggregate.Id), cancellationToken);
33+
return elasticClient.IndexAsync(aggregate, i => i.Id(aggregate.Id).Index(IndexNameMapper.ToIndexName<T>()), cancellationToken);
3334
}
3435

3536
public Task Update(T aggregate, CancellationToken cancellationToken)
3637
{
37-
return elasticClient.UpdateAsync<T>(aggregate.Id, i => i.Doc(aggregate), cancellationToken);
38+
return elasticClient.UpdateAsync<T>(aggregate.Id, i => i.Doc(aggregate).Index(IndexNameMapper.ToIndexName<T>()), cancellationToken);
3839
}
3940

4041
public Task Delete(T aggregate, CancellationToken cancellationToken)

Core.EventStoreDB/Config.cs

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System;
22
using Core.EventStoreDB.Subscriptions;
3-
using Core.Subscriptions;
43
using EventStore.Client;
54
using Microsoft.Extensions.DependencyInjection;
65
using Microsoft.Extensions.Configuration;

Core.EventStoreDB/Serialization/EventStoreDBSerializer.cs

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System.Text;
22
using Core.Events;
3-
using Core.Reflection;
43
using EventStore.Client;
54
using Newtonsoft.Json;
65

Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionCheckpointRepository.cs

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Threading.Tasks;
55
using Core.Events;
66
using Core.EventStoreDB.Serialization;
7-
using Core.Subscriptions;
87
using EventStore.Client;
98

109
namespace Core.EventStoreDB.Subscriptions

Core/Subscriptions/ISubscriptionCheckpointRepository.cs renamed to Core.EventStoreDB/Subscriptions/ISubscriptionCheckpointRepository.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using System.Threading;
22
using System.Threading.Tasks;
33

4-
namespace Core.Subscriptions
4+
namespace Core.EventStoreDB.Subscriptions
55
{
66
public interface ISubscriptionCheckpointRepository
77
{

Core/Subscriptions/InMemorySubscriptionCheckpointRepository.cs renamed to Core.EventStoreDB/Subscriptions/InMemorySubscriptionCheckpointRepository.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using System.Threading;
33
using System.Threading.Tasks;
44

5-
namespace Core.Subscriptions
5+
namespace Core.EventStoreDB.Subscriptions
66
{
77
public class InMemorySubscriptionCheckpointRepository: ISubscriptionCheckpointRepository
88
{

Core.EventStoreDB/Subscriptions/SubscribeToAllBackgroundWorker.cs

-5
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,13 @@
11
using System;
2-
using System.Text;
32
using System.Threading;
43
using System.Threading.Tasks;
54
using Core.Events;
6-
using Core.Events.External;
75
using Core.EventStoreDB.Serialization;
8-
using Core.Reflection;
9-
using Core.Subscriptions;
106
using Core.Threading;
117
using EventStore.Client;
128
using Microsoft.Extensions.DependencyInjection;
139
using Microsoft.Extensions.Hosting;
1410
using Microsoft.Extensions.Logging;
15-
using Newtonsoft.Json;
1611

1712
namespace Core.EventStoreDB.Subscriptions
1813
{

Core.Testing/TestContext.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ public async Task PublishInternalEvent(IEvent @event)
105105

106106
public void Dispose()
107107
{
108-
server?.Dispose();
109-
Client?.Dispose();
108+
server.Dispose();
109+
Client.Dispose();
110110
}
111111

112112
public IReadOnlyCollection<TEvent> PublishedInternalEventsOfType<TEvent>()

Core/Threading/NoSynchronizationContextScope.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ public static Disposable Enter()
1414

1515
public struct Disposable: IDisposable
1616
{
17-
private readonly SynchronizationContext? _synchronizationContext;
17+
private readonly SynchronizationContext? synchronizationContext;
1818

1919
public Disposable(SynchronizationContext? synchronizationContext)
2020
{
21-
_synchronizationContext = synchronizationContext;
21+
this.synchronizationContext = synchronizationContext;
2222
}
2323

2424
public void Dispose() =>
25-
SynchronizationContext.SetSynchronizationContext(_synchronizationContext);
25+
SynchronizationContext.SetSynchronizationContext(synchronizationContext);
2626
}
2727
}
2828
}
+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
namespace Carts.Tests.Carts.InitializingCart
1313
{
14-
public class InitializeCardCommandHandlerTests
14+
public class InitializeCartCommandHandlerTests
1515
{
1616
[Fact]
1717
public async Task ForInitCardCommand_ShouldAddNewCart()

Sample/EventStoreDB/ECommerce/Carts/Carts/Carts/GettingCartAtVersion/GetCartAtVersion.cs

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using Core.Exceptions;
88
using Core.Queries;
99
using EventStore.Client;
10-
using Marten;
1110

1211
namespace Carts.Carts.GettingCartAtVersion
1312
{

Sample/EventStoreDB/ECommerce/README.md

+15-15
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Sample is showing the typical flow of the Event Sourcing app with [EventStoreDB]
2121
- Login: `[email protected]`, Password: `admin`
2222
- To connect to server Use host: `postgres`, user: `postgres`, password: `Password12!`
2323
4. Open, build and run `ECommerce.sln` solution.
24-
- Swagger should be available at: http://localhost:5000/index.html
24+
- Swagger should be available at: http://localhost:5000/index.html
2525

2626

2727
## Overview
@@ -31,35 +31,35 @@ It uses:
3131
- Stores events from Aggregate method results to EventStoreDB,
3232
- Builds read models using [Subscription to `$all`](https://developers.eventstore.com/clients/grpc/subscribing-to-streams/#subscribing-to-all).
3333
- Read models are stored as [Marten](https://martendb.io/) documents.
34-
- App has Swagger and predefined [docker-compose](https://github.com/oskardudycz/EventSourcing.NetCore/pull/49/files#diff-bd9579f0d00fbcbca25416ada9698a7f38fdd91b710c1651e0849d56843a6b45) to run and play with samples.
34+
- App has Swagger and predefined [docker-compose](./docker/docker-compose.yml) to run and play with samples.
3535

3636
## Write Model
3737

3838
- Most of the write model infrastructure was reused from other samples,
3939
- Added new project `Core.EventStoreDB` for specific EventStoreDB code,
40-
- Added [EventStoreDBRepository](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Repository/EventStoreDBRepository.cs) repository to load and store aggregate state,
41-
- Added separate [IProjection](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Projections/IProjection.cs) interface to handle the same way stream aggregation and materialised projections,
42-
- Thanks to that added dedicated [AggregateStream](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Events/AggregateStreamExtensions.cs#L12) method for stream aggregation
40+
- Added [EventStoreDBRepository](./Core/Core.EventStoreDB/Repository/EventStoreDBRepository.cs) repository to load and store aggregate state,
41+
- Added separate [IProjection](./Core/Core/Projections/IProjection.cs) interface to handle the same way stream aggregation and materialised projections,
42+
- Thanks to that added dedicated [AggregateStream](./Core/Core.EventStoreDB/Events/AggregateStreamExtensions.cs#L12) method for stream aggregation
4343
- See [sample Aggregate](./Carts/Carts/Carts/Cart.cs)
4444

4545
## Read Model
4646
- Read models are rebuilt with eventual consistency using subscribe to all EventStoreDB feature,
47-
- Added hosted service [SubscribeToAllBackgroundWorker](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Subscriptions/SubscribeToAllBackgroundWorker.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped.
48-
- Added [ISubscriptionCheckpointRepository](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Subscriptions/ISubscriptionCheckpointRepository.cs) for handling Subscription checkpointing.
49-
- Added checkpointing to EventStoreDB stream with [EventStoreDBSubscriptionCheckpointRepository](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionCheckpointRepository.cs) and dummy in-memory checkpointer [InMemorySubscriptionCheckpointRepository](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Subscriptions/InMemorySubscriptionCheckpointRepository.cs),
50-
- Added [MartenExternalProjection](https://github.com/oskardudycz/EventSourcing.NetCore/pull/49/files#diff-6d8dadf8ab81a9441836a5403632ef3616a1dc42788b5feae1c56a4f2321d4eeR12) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. Another (e.g. ElasticSearch, EntityFramework) can be implemented the same way.
47+
- Added hosted service [SubscribeToAllBackgroundWorker](./Core/Core.EventStoreDB/Subscriptions/SubscribeToAllBackgroundWorker.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped.
48+
- Added [ISubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/ISubscriptionCheckpointRepository.cs) for handling Subscription checkpointing.
49+
- Added checkpointing to EventStoreDB stream with [EventStoreDBSubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionCheckpointRepository.cs) and dummy in-memory checkpointer [InMemorySubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/InMemorySubscriptionCheckpointRepository.cs),
50+
- Added [MartenExternalProjection](./Core/Core.Marten/ExternalProjections/MartenExternalProjection.cs) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. Another (e.g. ElasticSearch, EntityFramework) can be implemented the same way.
5151

5252
## Tests
5353
- Added sample of unit testing in [`Carts.Tests`](./Carts/Carts.Tests):
54-
- [Aggregate unit tests](./Carts/Carts.Tests/Carts/InitCartTests.cs)
55-
- [Command handler unit tests](./Carts/Carts.Tests/Carts/CommandHandlers/InitCardCommandHandlerTests.cs)
54+
- [Aggregate unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartTests.cs)
55+
- [Command handler unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartCommandHandlerTests.cs)
5656
- Added sample of integration testing in [`Carts.Api.Tests`](./Carts/Carts.Api.Tests)
57-
- [API integration tests](./Carts/Carts.Api.Tests/Carts/InitCartTests.cs)
57+
- [API integration tests](./Carts/Carts.Api.Tests/Carts/InitializingCart/InitializeCartTests.cs)
5858

5959
## Other
60-
- Added [EventTypeMapper](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Events/EventTypeMapper.cs) class to allow both convention-based mapping (by the .NET type name) and custom to handle event versioning,
61-
- Added [StreamNameMapper](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Events/StreamNameMapper.cs) class for convention-based id (and optional tenant) mapping based on the stream type and module,
62-
- IoC [registration helpers for EventStoreDB configuration](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Config.cs),
60+
- Added [EventTypeMapper](./Core/Core/Events/EventTypeMapper.cs) class to allow both convention-based mapping (by the .NET type name) and custom to handle event versioning,
61+
- Added [StreamNameMapper](./Core/Core/Events/StreamNameMapper.cs) class for convention-based id (and optional tenant) mapping based on the stream type and module,
62+
- IoC [registration helpers for EventStoreDB configuration](./Core/Core.EventStoreDB/Config.cs),
6363

6464

6565
## Trivia

Sample/MeetingsManagement/MeetingsSearch.Api/appsettings.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
},
77
"AllowedHosts": "*",
88

9-
"elasticsearch": {
10-
"index": "meetings",
11-
"url": "http://localhost:9200/"
9+
"Elasticsearch": {
10+
"DefaultIndex": "meetings",
11+
"Url": "http://localhost:9200/"
1212
},
1313

1414
"KafkaConsumer": {

0 commit comments

Comments
 (0)