Solve concurrency database issues in nServicebus

When we configure nServicebus to run with multiple thread (MaximumConcurrencyLevel) sometimes duplicate records are inserted. This happens when the endpoint has been down for maintenance and the queue has filled up with messages.

Repro

This is not ideal, but can be used on a multi core machine to reproduce the issue with a test. The context talks to a real database, so this is an integration test.

The pseudo code in the Handler is:

  1. Find the existing record
  2. If found update the record
  3. Else insert a record

The test creates 2 messages and 2 tasks, but you can create more. Our repro was with 4 messages/tasks. The Handler should create a record if not found in the database, else do an update.

[Fact]
public void MyHandler_Concurrency_repro()
{
  var message1 = new MyMessage { Value = 1 };
  var message2 = new MyMessage { Value = 1 };
  // repeat ... 
  var T1 = Task.Factory.StartNew(() =>
     Test.Handler<MyHandler>(bus => new MyHandler(CreateContext()))
         .OnMessage(message1)
  );
  var T2 = Task.Factory.StartNew(() =>
     Test.Handler<MyHandler>(bus => new MyHandler(CreateContext()))
         .OnMessage(message2)
  );
  // repeat ...
  Task.WaitAll(T1, T2, ...);
  var context = CreateContext();
  var messageCount = context.Messages.Where(x => x.Value == 1).Count();
  Assert.Equal(1, messageCount);
}

Solutions

First we tried to start a transaction before the Find and commit it after the update/insert. Unfortunately a deadlock occurred

System.Data.SqlClient.SqlException : Transaction (Process ID 54) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

We could add a Unique index to the database to disallow inserting the same record and a Timestamp column for update concurrency. But the column is nullable. Even the trick with a computed column described here is not fool proof. We don’t know the identity column value would be unique.

The code works when it is executed on a single thread. Why not force that?

Locking

We introduced a static object for locking and forced the threads to wait their turn.

public class MyHandler : IHandleMessages<MyMessage> {
   static readonly object _concurrencySolution;
   static MyHandler() { 
     _concurrencySolution = new object();
   }
   public void Handle(MyMessage message) {
     lock(_concurrencySolution) {
       // Find
       // Insert of Update
     }
   }
}

This defies the purpose of multiple threads as they have to wait for the other threads to release the lock. But at least no duplicates in the database.

About erictummers

Working in a DevOps team is the best thing that happened to me. I like challenges and sharing the solutions with others. On my blog I’ll mostly post about my work, but expect an occasional home project, productivity tip and tooling review.
This entry was posted in Development and tagged , , , , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.