This is the 4th and hopefully not the last post ever about Reactive Extensions with #uwpdev. After my 1st post, Jamie Mutton mentioned that writing async task code inside a subscription can lead to side effects and he posted a gist on various options.
I decided to try and reduce some of the async task code from subscription and into another observable following the first option Jamie suggested. Here’s what I had before.
The subscription in the constructor of the ViewModel.
this.ChannelDataService.ActivateChannelSubject .ObserveOn(RxApp.TaskpoolScheduler) .SubscribeOn(RxApp.TaskpoolScheduler) .Subscribe(async tuple => { if (tuple != null) { await this.ProcessChannelData(); } }); this.ChannelDataService.ChannelDataSubject .ObserveOn(RxApp.TaskpoolScheduler) .SubscribeOn(RxApp.TaskpoolScheduler) .Subscribe(async tuple => { if (tuple != null) { await this.ProcessChannelData(); } });
The View Model methods
private async Task ProcessChannelData() { try { await semaphore.WaitAsync(); await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.IsBusy = true); var channelData = this.ChannelDataService.GetChannelData(); await this.RenderChannelData(channelData); } finally { semaphore.Release(); await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.IsBusy = false); } } private async Task RenderChannelData(ChannelData channelData) { if (channelData == null) { await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.ResetData()); return; } var configSettings = this.ChannelDataService.ConfigurationSettings; var itemData = await channelData?.ProcessItems(ChannelService.ChannelDictionary, ChannelService.ChannelList[0], this.AppSettings.Country); if (itemData == null) { return; } var puffData = channelData.ProcessPuffs(ChannelService.ChannelDictionary, ChannelService.ChannelList[0]); System.Diagnostics.Debug.WriteLine($"Item count is {itemData.Item1.Count}, Group count is {itemData.Item2.Count}"); await DispatcherHelper.ExecuteOnUIThreadAsync(() => { try { ViewHelper.SetStatusBar(this.Channel.StatusColor, Colors.White, this.AppSettings.ElementTheme); if (this.Items.Count > 0) { this.Items.Clear(); } this.Items.AddRange(itemData.Item1); if (this.Content.Count > 0) { this.Content.Clear(); } this.Content.AddRange(itemData.Item2); } catch { } try { if (PuffContent.Count > 0) { this.PuffContent.Clear(); } this.PuffContent.AddRange(puffData); } catch { } }); }
I tried to split the code into two parts – Fetch and prepare data in one and render in another.
private async Task<ChannelData> ProcessChannelDataAsync() { ChannelData channelData = null; try { await semaphore.WaitAsync(); await DispatcherHelper.ExecuteOnUIThreadAsync(() => { this.IsBusy = true; }); await Task.Run(() => { channelData = this.ChannelDataService.GetChannelData(); if (channelData != null) { channelData.ProcessChannelData(ChannelService.ChannelDictionary, ChannelService.ChannelList[0], this.AppSettings.Country); } }); } catch (JsonReaderException jsonEx) { Exception handledException = new Exception("Handled data parse exception", jsonEx); HockeyClient.Current.TrackException(handledException); } catch (SqliteException ex) { Exception handledEx = new Exception("Handled SQLite error getting channel data", ex); HockeyClient.Current.TrackException(handledEx); DialogService ds = new DialogService(); await ds.ShowMessage("Application encountered serious error. Please reinstall the application.", SystemInformation.ApplicationName); } finally { semaphore.Release(); } return channelData; } private async Task RenderChannelDataAsync(ChannelData processedData) { try { await semaphore.WaitAsync(); await DispatcherHelper.ExecuteOnUIThreadAsync(() => { ViewHelper.SetStatusBar(this.Channel.StatusColor, Colors.White, this.AppSettings.ElementTheme); if (processedData == null) { this.ResetData(); return; } if (this.Items.Count > 0) { this.Items.Clear(); } if (processedData != null) { this.Items.AddRange(processedData.ProcessedItems); } if (this.Content.Count > 0) { this.Content.Clear(); } if (processedData != null) { this.Content.AddRange(processedData.GroupedItems); } if (PuffContent.Count > 0) { this.PuffContent.Clear(); } if (processedData != null) { this.PuffContent.AddRange(processedData.PuffGroupedItems); } this.IsBusy = false; }); } catch { } finally { semaphore.Release(); } }
Now I could chain the original observation with a new observable(s) created out of async method(s) as suggested by Jamie Mutton in his gist.
The extension method to convert async method to an observable is simply ToObservable()
.
this.ChannelDataService.ActivateChannelSubject .Where(t => t != null) .Select(_ => Observable.Defer(() => ProcessChannelDataAsync().ToObservable())) .Concat() .Select(processedData => Observable.Defer(() => this.RenderChannelDataAsync(processedData).ToObservable())) .Concat() .Subscribe(_ => { System.Diagnostics.Debug.WriteLine($"Active channel changed"); }); this.ChannelDataService.ChannelDataSubject .Where(t => t != null) .Select(_ => Observable.Defer(() => ProcessChannelDataAsync().ToObservable())) .Concat() .Select(processedData => Observable.Defer(() => this.RenderChannelDataAsync(processedData).ToObservable())) .Concat() .Subscribe(processedData => { System.Diagnostics.Debug.WriteLine($"Channel data updated"); });
Performance isn’t much significantly different from earlier but the code is now more reactive than before. The subscription as I was told is as light as can be and there is little room for side-effects
Happy coding.