Be moar reactive with #uwpdev ?

This is the 4th and hopefully not the last post ever about Reactive Extensions with #uwpdev. After my 1st post, Jamie Mutton mentioned that writing async task code inside a subscription can lead to side effects and he posted a gist on various options.

I decided to try and reduce some of the async task code from subscription and into another observable following the first option Jamie suggested. Here’s what I had before.

The subscription in the constructor of the ViewModel.

this.ChannelDataService.ActivateChannelSubject
    .ObserveOn(RxApp.TaskpoolScheduler)
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .Subscribe(async tuple =>
{
    if (tuple != null)
    {
        await this.ProcessChannelData();
    }
});

this.ChannelDataService.ChannelDataSubject
    .ObserveOn(RxApp.TaskpoolScheduler)
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .Subscribe(async tuple =>
{
    if (tuple != null)
    {
        await this.ProcessChannelData();
    }
});

The View Model methods

private async Task ProcessChannelData()
{
    try
    {
        await semaphore.WaitAsync();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.IsBusy = true);

        var channelData = this.ChannelDataService.GetChannelData();
        await this.RenderChannelData(channelData);
    }
    finally
    {
        semaphore.Release();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.IsBusy = false);
    }
}

private async Task RenderChannelData(ChannelData channelData)
{
    if (channelData == null)
    {
        await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.ResetData());

        return;
    }

    var configSettings = this.ChannelDataService.ConfigurationSettings;

    var itemData = await channelData?.ProcessItems(ChannelService.ChannelDictionary, ChannelService.ChannelList[0], this.AppSettings.Country);

    if (itemData == null)
    {
        return;
    }

    var puffData = channelData.ProcessPuffs(ChannelService.ChannelDictionary, ChannelService.ChannelList[0]);

    System.Diagnostics.Debug.WriteLine($"Item count is {itemData.Item1.Count}, Group count is {itemData.Item2.Count}");

    await DispatcherHelper.ExecuteOnUIThreadAsync(() =>
    {
        try
        {
            ViewHelper.SetStatusBar(this.Channel.StatusColor, Colors.White, this.AppSettings.ElementTheme);

            if (this.Items.Count > 0)
            {
                this.Items.Clear();
            }

            this.Items.AddRange(itemData.Item1);

            if (this.Content.Count > 0)
            {
                this.Content.Clear();
            }

            this.Content.AddRange(itemData.Item2);
        }
        catch { }

        try
        {
            if (PuffContent.Count > 0)
            {
                this.PuffContent.Clear();
            }

            this.PuffContent.AddRange(puffData);
        }
        catch { }
    });
}

I tried to split the code into two parts – Fetch and prepare data in one and render in another.

private async Task<ChannelData> ProcessChannelDataAsync()
{
    ChannelData channelData = null;
    try
    {
        await semaphore.WaitAsync();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() =>
        {
            this.IsBusy = true;
        });

        await Task.Run(() =>
        {
            channelData = this.ChannelDataService.GetChannelData();

            if (channelData != null)
            {
                channelData.ProcessChannelData(ChannelService.ChannelDictionary, ChannelService.ChannelList[0], this.AppSettings.Country);
            }
        });
    }
    catch (JsonReaderException jsonEx)
    {
        Exception handledException = new Exception("Handled data parse exception", jsonEx);
        HockeyClient.Current.TrackException(handledException);
    }
    catch (SqliteException ex)
    {
        Exception handledEx = new Exception("Handled SQLite error getting channel data", ex);
        HockeyClient.Current.TrackException(handledEx);

        DialogService ds = new DialogService();
        await ds.ShowMessage("Application encountered serious error. Please reinstall the application.", SystemInformation.ApplicationName);
    }
    finally
    {
        semaphore.Release();
    }

    return channelData;
}

private async Task RenderChannelDataAsync(ChannelData processedData)
{
    try
    {
        await semaphore.WaitAsync();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() =>
        {
            ViewHelper.SetStatusBar(this.Channel.StatusColor, Colors.White, this.AppSettings.ElementTheme);

            if (processedData == null)
            {
                this.ResetData();

                return;
            }

            if (this.Items.Count > 0)
            {
                this.Items.Clear();
            }

            if (processedData != null)
            {
                this.Items.AddRange(processedData.ProcessedItems);
            }

            if (this.Content.Count > 0)
            {
                this.Content.Clear();
            }

            if (processedData != null)
            {
                this.Content.AddRange(processedData.GroupedItems);
            }

            if (PuffContent.Count > 0)
            {
                this.PuffContent.Clear();
            }

            if (processedData != null)
            {
                this.PuffContent.AddRange(processedData.PuffGroupedItems);
            }

            this.IsBusy = false;
        });
    }
    catch { }
    finally
    {
        semaphore.Release();
    }
}

Now I could chain the original observation with a new observable(s) created out of async method(s) as suggested by Jamie Mutton in his gist.

The extension method to convert async method to an observable is simply ToObservable().

this.ChannelDataService.ActivateChannelSubject
    .Where(t => t != null)
    .Select(_ => Observable.Defer(() => ProcessChannelDataAsync().ToObservable()))
    .Concat()
    .Select(processedData => Observable.Defer(() => this.RenderChannelDataAsync(processedData).ToObservable()))
    .Concat()
    .Subscribe(_ =>
    {
        System.Diagnostics.Debug.WriteLine($"Active channel changed");
    });

this.ChannelDataService.ChannelDataSubject
    .Where(t => t != null)
    .Select(_ => Observable.Defer(() => ProcessChannelDataAsync().ToObservable()))
    .Concat()
    .Select(processedData => Observable.Defer(() => this.RenderChannelDataAsync(processedData).ToObservable()))
    .Concat()
    .Subscribe(processedData =>
    {
        System.Diagnostics.Debug.WriteLine($"Channel data updated");
    });

Performance isn’t much significantly different from earlier but the code is now more reactive than before. The subscription as I was told is as light as can be and there is little room for side-effects

Happy coding.

Advertisements