When we configure nServicebus to run with multiple thread (MaximumConcurrencyLevel) sometimes duplicate records are inserted. This happens when the endpoint has been down for maintenance and the queue has filled up with messages.
Repro
This is not ideal, but can be used on a multi core machine to reproduce the issue with a test. The context talks to a real database, so this is an integration test.
The pseudo code in the Handler is:
- Find the existing record
- If found update the record
- Else insert a record
The test creates 2 messages and 2 tasks, but you can create more. Our repro was with 4 messages/tasks. The Handler should create a record if not found in the database, else do an update.
[Fact] public void MyHandler_Concurrency_repro() { var message1 = new MyMessage { Value = 1 }; var message2 = new MyMessage { Value = 1 }; // repeat ... var T1 = Task.Factory.StartNew(() => Test.Handler<MyHandler>(bus => new MyHandler(CreateContext())) .OnMessage(message1) ); var T2 = Task.Factory.StartNew(() => Test.Handler<MyHandler>(bus => new MyHandler(CreateContext())) .OnMessage(message2) ); // repeat ... Task.WaitAll(T1, T2, ...); var context = CreateContext(); var messageCount = context.Messages.Where(x => x.Value == 1).Count(); Assert.Equal(1, messageCount); }
Solutions
First we tried to start a transaction before the Find and commit it after the update/insert. Unfortunately a deadlock occurred
System.Data.SqlClient.SqlException : Transaction (Process ID 54) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
We could add a Unique index to the database to disallow inserting the same record and a Timestamp column for update concurrency. But the column is nullable. Even the trick with a computed column described here is not fool proof. We don’t know the identity column value would be unique.
The code works when it is executed on a single thread. Why not force that?
Locking
We introduced a static object for locking and forced the threads to wait their turn.
public class MyHandler : IHandleMessages<MyMessage> { static readonly object _concurrencySolution; static MyHandler() { _concurrencySolution = new object(); } public void Handle(MyMessage message) { lock(_concurrencySolution) { // Find // Insert of Update } } }
This defies the purpose of multiple threads as they have to wait for the other threads to release the lock. But at least no duplicates in the database.