Dealing with Back-Pressure - UI - a database - the ReactiveX way
This will be a discussion of a possible solution with the requirement to query a large number of web services and selectively project the responses in UI and in a database.
- For the UI I chose the XAF application framework from DevExpress, because I wanted the tools to validate, secure, display and persist my data while I spent my time designing the service layer.
- For sending the web requests I introduced a service layer in my solution where I used the HttpClient class and I serialized the responses with Utf8Json.
Here I want to draw your attention to the bad design of the HttpClient. This class implements IDisposable which makes you think that you can use it in a multithreaded environment and simply dispose it. However the case as it turns out is different, designed to be re-used for multiple calls, and there is a lot of buzz around (Link1, Link2, Link3). In short if you try to dispose the HttpClient when the request is done you will end with endless debugging hours. In the previous posts there are a few alternatives which to my bad luck did not work well from me. I found my solution at the corefx site issues.
Uft8Json is a super fast Json serializer, it is build from neuecc the father of ZeroFormatter and uses the same architecture which is to do the work on the byte level, thus minimizing memory allocation. In a high back-pressure environment you need to minimize the allocated memory as much as possible because GC work is very expensive.
For the same reason
a) I used structures instead of classes.
b) Installed the Heap allocator plugin so to get notified while I code in case I miss some expensive boxing operation.
c) Try to guess the size of my collections and initiate before hand and reused the objects instead of creating new.
d) Prefer StringBuilder instead of interpolation or concat.
e) Prefer for instead of foreach and LINQ for my hot paths.
I think you get the idea up to know you can find a lot of way if you google on the optimization subject.
To test and design the service layer I used xUnit.net and AutoFixture frameworks. Following is a compact version of a response deserialization test for each of my web requests.
Short description for the above code: The Test_Request method is testing the SendAsync method passing the service urls, an HttpHandlerType which is responsible to parameterize the requests per service and a converter to deserialize the responses.
- For handling the exceptions I needed to use another framework as the web-requests are fragile and tend to throw for many reasons. One great framework for such cases is The Polly Project. I created a few strategies like the basic worth retrying found at Polly github readme and refactor the SendAsync method like.
- For scheduling the work asynchronously Reactive Extensions (RX) from Microsoft was used. The result was a composable observable sequence of objects. Here is what I mean by this:
First I created an array with parameters to be used from the SendAsync method, then I converted to an observable using the extension methods of the RX framework. Having an observable I can now apply any Reactive operator I like to this sequence. In this case I used the SelectMany to execute my SendAsync tasks and flatten their responses into one sequence for further processing as we will discuss later.
Note: An observable sequence has certain similarities with an enumerable like in IEnumerable<T> if you do not enumerate no work is done, similar in IObservable<T> if you not subscribe no work is done. For example we could subscribe to the above sequence and to persist each item arrived.
- Rate limiting the work with RX this was just a matter of applying another operator the Window and the Distinc in this case to check for duplicate parameters. A pseudo code implementation follows:
In the production code there is combination of multiple rate limiters as per scenario in the same query.
- For persisting the responses although XAF fully supports Entity Framework, I feel more confident with XPO. I think is much easier to use and has build-in features I wanted to use. As for the database server SQLServer was my choice because XPO is really well tested against it.
So, moving from the service layer into the data layer I needed to have a super fast data layer as the number of incoming requests was large. For this I made the following choices.
a) I declared my objects with a long primary key as opposed to the default GUID type.
b) Instead of using an XPObjectSpace which is a heavy object used from XAF I went for a FastUnitOfWork.
c) I disabled optimistic locking only for this level by modifying the FastUnitOfWork properties.
d) Although I could instruct RX to send all requests to the same thread, I wanted to use all my processors and I chose initialize the FastUnitOfWork over a Lazy<ThreadSafeDalayer> with a CachedDataStoreProvider
e) I buffered the responses for 2 sec creating larger commit batches and less locks from the ThreadSafeDatalayer.
Finally I Subscribed to the products observable build in previous steps and persist my objects.
- For getting statistics I subscribed again to the products observable, this time using some custom made reactive operators.
Responses:50/sec Objects:14000/sec-->Those are IMPRESSIVE RESULTS for all XPO, RX, SqlServer.
- For the application datalayer at the beginning I thought since I have a thread safe cached datalayer already doing my persistence to use it. Further testing though show that the UI hiccup with unresponsiveness so I ended up initializing a separated data layer exactly as suggested by default from XAF. This layer was to cover all CRUD operation of other objects in the solution, but for displaying the fast coming objects from the observable stream I aim to go for a NonPersistentObjectSpace which is a lighter object than the XPObjectSpace.
Note here that XAF does not fully support nested non-persistent listviews however that is not a problem we can easily extend it to support and test how it works. To do so I created a controller and attached to XafApplication CreateCustomPropertyCollectionSource event which allowed me to replace the default PropertyCollectionSource with a custom one which behaved as I wanted to.
and for my case since this controller already exists in the eXpandframework I didn't need the above code I just had to enable the functionality from the model editor.
The NonPersistentObjectSpace I enabled though does not know how to get the data to display. Since we are nested we are going to use the MasterObjectViewController for lets say our Company-Product relation. Once you derive from that abstract controller the first method is implemented is the UpdateMasterObject which is called when the master object, the Company in our case, changes. Here the code of what I just explained.
This method is the appropriate place to subscribe to the NonPersistentObjectSpace ObjectGetting event which is used to feed the object space. Also we need to keep track of created datasource so to display it again when the end user returns to the same company. Doing so the controller changes to
Next we need to subscribe again to our service layer observable sequence of products to filter(Where), throttle(Buffer, SampleResponsive) and display them in the UI(ObserveOn).
The this.LockListEditorUpdates method is an extension method that comes from eXpandFramework LockListEditorDataUpdatesController and as the name implies locks the editor from update while massive data updates and is design so to be used from the platform agnostic modules as well.
At the point the UI does not feel any back-pressure but still there is a small improvement missing. The solution uses a MDI Tabbed environment which means that the user may open multiple views and the application will start subscribing to multiple sequences. To kill each subscription when a document page is deactivate we can use the ActiveDocumentViewController and modify the previous controller as follows.
- Revisiting Exceptions: Remember the Polly project at step 2 where we use it to hardcode a strategy inside our SendAsync method? Good news the Polly allows wrapping multiple policies together so we can use XAF to map the IPolicy objects and allow the end user to configure the strategies from there making the solution super flexible. Here is what I mean.
- Monitor system health: Remember the statistics we collected at step 7 wouldn't it be very useful to display them in the UI allowing user to validate the system's health?. For this we can use the same exact query but this time observe in the UI thread.
Having my stats handy on the bottom left corner of my app exactly as described in How to: Customize Window Status Messages (WinForms) I would like to share them once more and close this long but I hope very interesting post.
Threads: 207, CPU:20% Disk:10% Web-Responses:2700/min Persisting objects:700hyyhuh000/min