Be moar reactive with #uwpdev ?

This is the 4th and hopefully not the last post ever about Reactive Extensions with #uwpdev. After my 1st post, Jamie Mutton mentioned that writing async task code inside a subscription can lead to side effects and he posted a gist on various options.

I decided to try and reduce some of the async task code from subscription and into another observable following the first option Jamie suggested. Here’s what I had before.

The subscription in the constructor of the ViewModel.

this.ChannelDataService.ActivateChannelSubject
    .ObserveOn(RxApp.TaskpoolScheduler)
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .Subscribe(async tuple =>
{
    if (tuple != null)
    {
        await this.ProcessChannelData();
    }
});

this.ChannelDataService.ChannelDataSubject
    .ObserveOn(RxApp.TaskpoolScheduler)
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .Subscribe(async tuple =>
{
    if (tuple != null)
    {
        await this.ProcessChannelData();
    }
});

The View Model methods

private async Task ProcessChannelData()
{
    try
    {
        await semaphore.WaitAsync();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.IsBusy = true);

        var channelData = this.ChannelDataService.GetChannelData();
        await this.RenderChannelData(channelData);
    }
    finally
    {
        semaphore.Release();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.IsBusy = false);
    }
}

private async Task RenderChannelData(ChannelData channelData)
{
    if (channelData == null)
    {
        await DispatcherHelper.ExecuteOnUIThreadAsync(() => this.ResetData());

        return;
    }

    var configSettings = this.ChannelDataService.ConfigurationSettings;

    var itemData = await channelData?.ProcessItems(ChannelService.ChannelDictionary, ChannelService.ChannelList[0], this.AppSettings.Country);

    if (itemData == null)
    {
        return;
    }

    var puffData = channelData.ProcessPuffs(ChannelService.ChannelDictionary, ChannelService.ChannelList[0]);

    System.Diagnostics.Debug.WriteLine($"Item count is {itemData.Item1.Count}, Group count is {itemData.Item2.Count}");

    await DispatcherHelper.ExecuteOnUIThreadAsync(() =>
    {
        try
        {
            ViewHelper.SetStatusBar(this.Channel.StatusColor, Colors.White, this.AppSettings.ElementTheme);

            if (this.Items.Count > 0)
            {
                this.Items.Clear();
            }

            this.Items.AddRange(itemData.Item1);

            if (this.Content.Count > 0)
            {
                this.Content.Clear();
            }

            this.Content.AddRange(itemData.Item2);
        }
        catch { }

        try
        {
            if (PuffContent.Count > 0)
            {
                this.PuffContent.Clear();
            }

            this.PuffContent.AddRange(puffData);
        }
        catch { }
    });
}

I tried to split the code into two parts – Fetch and prepare data in one and render in another.

private async Task<ChannelData> ProcessChannelDataAsync()
{
    ChannelData channelData = null;
    try
    {
        await semaphore.WaitAsync();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() =>
        {
            this.IsBusy = true;
        });

        await Task.Run(() =>
        {
            channelData = this.ChannelDataService.GetChannelData();

            if (channelData != null)
            {
                channelData.ProcessChannelData(ChannelService.ChannelDictionary, ChannelService.ChannelList[0], this.AppSettings.Country);
            }
        });
    }
    catch (JsonReaderException jsonEx)
    {
        Exception handledException = new Exception("Handled data parse exception", jsonEx);
        HockeyClient.Current.TrackException(handledException);
    }
    catch (SqliteException ex)
    {
        Exception handledEx = new Exception("Handled SQLite error getting channel data", ex);
        HockeyClient.Current.TrackException(handledEx);

        DialogService ds = new DialogService();
        await ds.ShowMessage("Application encountered serious error. Please reinstall the application.", SystemInformation.ApplicationName);
    }
    finally
    {
        semaphore.Release();
    }

    return channelData;
}

private async Task RenderChannelDataAsync(ChannelData processedData)
{
    try
    {
        await semaphore.WaitAsync();

        await DispatcherHelper.ExecuteOnUIThreadAsync(() =>
        {
            ViewHelper.SetStatusBar(this.Channel.StatusColor, Colors.White, this.AppSettings.ElementTheme);

            if (processedData == null)
            {
                this.ResetData();

                return;
            }

            if (this.Items.Count > 0)
            {
                this.Items.Clear();
            }

            if (processedData != null)
            {
                this.Items.AddRange(processedData.ProcessedItems);
            }

            if (this.Content.Count > 0)
            {
                this.Content.Clear();
            }

            if (processedData != null)
            {
                this.Content.AddRange(processedData.GroupedItems);
            }

            if (PuffContent.Count > 0)
            {
                this.PuffContent.Clear();
            }

            if (processedData != null)
            {
                this.PuffContent.AddRange(processedData.PuffGroupedItems);
            }

            this.IsBusy = false;
        });
    }
    catch { }
    finally
    {
        semaphore.Release();
    }
}

Now I could chain the original observation with a new observable(s) created out of async method(s) as suggested by Jamie Mutton in his gist.

The extension method to convert async method to an observable is simply ToObservable().

this.ChannelDataService.ActivateChannelSubject
    .Where(t => t != null)
    .Select(_ => Observable.Defer(() => ProcessChannelDataAsync().ToObservable()))
    .Concat()
    .Select(processedData => Observable.Defer(() => this.RenderChannelDataAsync(processedData).ToObservable()))
    .Concat()
    .Subscribe(_ =>
    {
        System.Diagnostics.Debug.WriteLine($"Active channel changed");
    });

this.ChannelDataService.ChannelDataSubject
    .Where(t => t != null)
    .Select(_ => Observable.Defer(() => ProcessChannelDataAsync().ToObservable()))
    .Concat()
    .Select(processedData => Observable.Defer(() => this.RenderChannelDataAsync(processedData).ToObservable()))
    .Concat()
    .Subscribe(processedData =>
    {
        System.Diagnostics.Debug.WriteLine($"Channel data updated");
    });

Performance isn’t much significantly different from earlier but the code is now more reactive than before. The subscription as I was told is as light as can be and there is little room for side-effects

Happy coding.

Advertisement

Reactive Extensions with #uwpdev Part 3

The way I understood Rx, it was designed to do away with how we (developers) handle events.

A ListView control provides a few ways of handling how developers deal with user interaction. You can have ItemClick and disable Item Selection. You could have Item Selection with ability to select more than one Item.
My personal favourite is to allow ItemClick and to bind a Command property in ListViewExtensions in UWP Toolkit.

There are times when you really need to handle the event in code behind – I know purists would have a day here but say you had a video previews running in a list view. You don’t want all of them running all the time – you want to ensure that only those rendered on screen are running the video.
How would one do that ?

The usual route would be to find the ScrollViewer in the ListView control and hook into the ViewChanged event.
Every time ViewChanged event is raised, OnScrollViewerViewChanged method will be called.

By every time I mean every time.. could be maybe times every second. You then start have to add hacks like lastChecked time and if Time Elapsed is greater than 100ms etc etc.

private void WireUpListViewScroll(ListView listView)
{
    if (listView == null)
    {
        return;
    }

    if (scrollViewer != null)
    {
        scrollViewer.ViewChanged -= OnScrollViewerViewChanged;
    }

    var scrollViewer = listView.FindDescendant<ScrollViewer>();

    if (scrollViewer == null)
    {
        return;
    }

    scrollViewer.ViewChanged += OnScrollViewerViewChanged;
    
    ProcessChannelUserControls(listView, true);
}

private void OnScrollViewerViewChanged(object sender, ScrollViewerViewChangedEventArgs e)
{
    this.ProcessChannelUserControls(listView);
}

So lets look up what Reactive extensions has for us. For starters, Observable.FromEventPattern allows you to create an observable from event and to subscribe to it. No need to unsubscribe from event handers etc.. just dispose the subscription.

Notice I also use Throttle with a TimeSpan.. I don’t want too many hits – once every 0.35 seconds seemed sufficiently responsive for my use and what’s what I used. the scrollViewerSub I use is a SerialDisposable – every time I add a new subscription, it disposes the old one.

private void WireUpListViewScroll(ListView listView)
{
    if (listView == null)
    {
        return;
    }

    var scrollViewer = listView.FindDescendant<ScrollViewer>();

    if (scrollViewer == null)
    {
        return;
    }

    this.scrollViewerSub.Disposable = Observable.FromEventPattern(scrollViewer, "ViewChanged")
        .Throttle(TimeSpan.FromSeconds(0.35))
        .ObserveOn(CoreDispatcherScheduler.Current)
        .SubscribeOn(TaskPoolScheduler.Default)
        .Subscribe(x =>
        {
            this.ProcessChannelUserControls(listView);
        });

    ProcessChannelUserControls(listView, true);
}

Now that the ViewChanged logic is in place, let’s look at the rest of the code for toggling video previews.

private async void ProcessChannelUserControls(ListView listview, bool initial = false)
{
    try
    {
        var isp = listview.ItemsPanelRoot as ItemsStackPanel;

        if (isp == null)
        {
            return;
        }

        await ToggleAnimatedPreviews(listview, isp, initial);
    }
    catch { }
}

private async Task ToggleAnimatedPreviews(ListView listView, ItemsStackPanel isp, bool initial)
{
    var firstGroupPos = isp.FirstVisibleIndex;
    var lastGroupPos = isp.LastVisibleIndex;

    if (firstGroupPos == -1 && lastGroupPos == -1)
    {
        firstGroupPos = 0;
        lastGroupPos = 0;
    }

    if (initial)
    {
        DependencyObject obj = null;
        while (true)
        {
            obj = listView.ContainerFromIndex(0);

            if (obj != null)
            {
                break;
            }

            await Task.Delay(100);
        }
    }

    for (int i = 0; i < listView.Items.Count; i++)
    {
        var container = listView.ContainerFromIndex(i);

        var vp = container?.FindDescendant<AnimatedPreviewPlayerUC>();

        if (i >= firstGroupPos && i <= lastGroupPos)
        {
            vp?.Play();
        }
        else
        {
            vp?.Pause();
        }
    }
}

These are some of the places I have used Reactive Extensions whilst doing #uwpdev with MVVM. I am still learning.