Reactive Extensions with #uwpdev Part 2

In my post yesterday I talked about adding a few reactive bits inside MVVM using ReactiveUI.

Jamie Mutton mentioned that writing async task code inside a subscription can lead to side effects and he posted a gist on various options. I haven’t tried those yet – I will do so soon. I have mentioned something about not being a purist and I found it easier to inject Rx elements into MVVM whilst following known (to myself) programming style.
Being a slow learner, I prefer to inject things slowly. Add reactive bits at your own pace. You can go the whole 9 yards or go a step at a time.

Today let’s see how to add Reactive Extensions to Service layer. MVVM should really be MVVMS but its a mouthful and not a palindrome.

The Daily Mail Online app has many channels that group content as desired by the editors. Within the app, there’s a Channel list and the content is displayed within FlipView.

Channel list is bound to a RelayCommand (MVVMLight) – setting the correct ChannelIndex. The FlipView is bound to a list of ChannelViewModels with ChannelIndex as the SelectedIndex. When the channel changes, the Data Service is notified and new data is requested.

The sequence is as follows

  1. When Channel Index changes, it pushes a new Tuple through the Active Channel Subject.
  2. When Active Channel Subject publishes a new item:
    • the subscriber in the service, if data is out of data, it downloads new data set and saves it to database, publishing the tuple through to Channel Data Subject
    • the subscriber in the view model, pulls and renders data from database
  3. When Channel Data Subject publishes a new item, the view model pulls it from database and renders it
public int ChannelIndex
{
    get
    {
        return this.channelIndex;
    }

    set
    {
        try
        {
            this.RaiseAndSetIfChanged(ref this.channelIndex, value);
            this.RaisePropertyChanged(nameof(this.Channel));
            this.RaisePropertyChanged(nameof(this.ChannelViewModel));

        }
        catch { }

        var channel = this.Channel;

        SubChannel selectedSub = null;

        if (channel.HasSubChannels)
        {
            selectedSub = channel.GetDefaultSubChannel();
        }

        this.ChannelDataService.SetActiveSource(new Tuple(channel, selectedSub));
    }
}

The Channel Data Service is a monstrosity in itself but that’s another issue. What I have in the service are Reactive Subject instances to publish – in this instance a Tuple for both ActiveChannel and when a new data set is available.

Subjects are considered to be educational material – however in absence of any alternative, I decided to start with those.

public BehaviorSubject ActivateChannelSubject { get; private set; }

public BehaviorSubject ChannelDataSubject { get; private set; }

As you saw above, the ChannelIndex calls a method SetActiveSource. This merely pushes item to the Subject.

public void SetActiveSource(Tuple tuple)
{
    this.SelectedChannelTuple = tuple;

    this.ActivateChannelSubject.OnNext(tuple);
}

Within the service itself, I have a subscription to this subject. Other subscriber includes the MainViewModel for the app

this.ActivateChannelSubject.Subscribe(async tuple =>
{
    try
    {
        // checks to see if existing data is up to date.
        // If it isn't, it pulls necessary content
        await this.ProcessChannel(tuple, true);
    }
    catch { }
});
public async Task ProcessChannel(
    Tuple channelTuple,
    bool notifySubscribers,
    bool forceRefresh = false)
{
    if (!NetworkHelper.Instance.ConnectionInformation.IsInternetAvailable)
    {
        return;
    }

    try
    {
        // check if data is up to date

        // download content as json

        // inject new data into the local db

        if (notifySubscribers)
        {
            this.ChannelDataSubject.OnNext(channelTuple);
        }
    }
}

I remember the warning Jamie gave about the side effects when using async task in subscription. I promise to take a look at the alternative but this is what it is right now..
The try catch around ProcessChannel are just to make sure any exceptions do not take the app down as async lambda is async void and it can burn the house down if not very careful.

Lets took at the subscriptions in the view model’s constructor

this.ChannelDataService.ActivateChannelSubject
    .ObserveOn(RxApp.TaskpoolScheduler)
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .Subscribe(async tuple =>
{
    if (tuple != null)
    {
        await this.ProcessChannelData();
    }
});

this.ChannelDataService.ChannelDataSubject
    .ObserveOn(RxApp.TaskpoolScheduler)
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .Subscribe(async tuple =>
{
    if (tuple != null)
    {
        await this.ProcessChannelData();
    }
});

The View Model itself pulls the data, groups it and renders it on UI thread

private async Task ProcessChannelData()
{
    try
    {
        await semaphore.WaitAsync();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.IsBusy = true);

        var channelData = this.ChannelDataService.GetChannelData();
        await this.RenderChannelData(channelData);
    }
    finally
    {
        semaphore.Release();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.IsBusy = false);
    }
}

private async Task RenderChannelData(ChannelData channelData)
{
    if (channelData == null)
    {
        await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.ResetData());

        return;
    }

    var configSettings = this.ChannelDataService.ConfigurationSettings;

    var itemData = await channelData?.ProcessItems(ChannelService.ChannelDictionary, ChannelService.ChannelList[0], this.AppSettings.Country);

    if (itemData == null)
    {
        return;
    }

    var puffData = channelData.ProcessPuffs(ChannelService.ChannelDictionary, ChannelService.ChannelList[0]);

    System.Diagnostics.Debug.WriteLine($"Item count is {itemData.Item1.Count}, Group count is {itemData.Item2.Count}");

    await DispatcherHelper.ExecuteOnUIThreadAsync(() =>
    {
        try
        {
            ViewHelper.SetStatusBar(this.Channel.StatusColor, Colors.White, this.AppSettings.ElementTheme);

            if (this.Items.Count > 0)
            {
                this.Items.Clear();
            }

            this.Items.AddRange(itemData.Item1);

            if (this.Content.Count > 0)
            {
                this.Content.Clear();
            }

            this.Content.AddRange(itemData.Item2);
        }
        catch { }

        try
        {
            if (PuffContent.Count > 0)
            {
                this.PuffContent.Clear();
            }

            this.PuffContent.AddRange(puffData);
        }
        catch { }
    });
}

This I believe is like 11/2 steps at max.. There a long way to go.. much to refine but things work. I force use of TaskPool unless I need a UI.. in that case, queue it up on the Dispatcher.

I have done it the non reactive way before and whilst it all works, it feels better with reactive model. Things split into independently observables

One thought on “Reactive Extensions with #uwpdev Part 2

  1. Pingback: Dew Drop - March 27, 2018 (#2692) - Morning Dew

Leave a comment