Asynchronous Programming

2018年12月12日 15:58

Asynchronous Programming

by Eduard Bargués

Brief introduction

Here at Fluendo, we love technology and are constantly pushing our limits. Our codecs for both audio and video are used all over the world by many trusted third parties (Microsoft, Intel, AMD...)

Fluendo has been involved in many development projects for our clients. Our last adventure was born almost 2 years ago and it's called RiftGG, an analysis tool aimed at helping amateur and professional players to unleash their potential at League Of Legends (LoL). It is the first of its kind and we are... oh so proud of it!

RiftGG is a really cool tool offering gamers an all-in-one sort of holistic approach for their improvement. However, building this didn't come without challenges. Throughout its development, we had to deal with puzzling technical issues. It was hard but we learned a lot and we want to share that knowledge with you. We do believe that RiftGG and the things we learned during the process are worth sharing. Whether you are a developer like us or just someone who likes learning new things every day, we encourage you to check out this post.

This is the first of the many technical posts about RiftGG that we are preparing here at Fluendo. Hope you enjoy it!

Summary

RiftGG is a vast desktop application with too many aspects to be covered in just one post -we don't want you to doze off! :) In this post, we will focus on asynchronous programming and how our team is using it to make RiftGG awesome.

Throughout RiftGG's development, we struggled with many situations where asynchronous programming drove us kind of crazy. We had to read and study many papers and documents on the subject, and there was also this cumbersome try-and-fail process.

Here is a collection of articles and blog posts that we found useful:

In this post, we will talk about:

  • Why is it so important to use asynchronous programming in RiftGG?
  • Main patterns used in RiftGG: MVVM, State and Service patterns
  • RiftGG as an Event-Driven application
  • Problem: When consecutive events get to close ...
  • Solution: Schedule tasks on each event arrival
    • How do services define and schedule tasks?
    • Best exception handling practices when scheduling tasks
    • How do we update the UI in each tasks?
  • Conclusions

Why is it so important to use asynchronous programming in RiftGG?

RiftGG is designed to be a fast desktop application with a continuously responsive User Interface (UI). Many operations happen in the background. Calling APIs, loading and saving information to RiftGG databases, refreshing champions and summoner images in the UI... All these operations happen without the user noticing, and they are possible thanks to asynchronous programming since it allows all these operations not to block the user interface.

Just consider the amount of data RiftGG manages, it would be prohibitive not to use asynchronous programming to develop it. The users demand immediate feedback in every operation and a fluent and responsive interface (they also want it to be cool, but... you know, but that's a horse of a different color :P).

In the next sections, we are going to present some situations where the asynchronous programming has proven useful and we are going to explain how these solutions have been implemented. But first things first: we need to know the basics of its structure and understand how we, developers, designed RiftGG.

Main patterns used in RiftGG: MVVM, State and Service patterns

RiftGG has been developed following a MVVMS pattern. This weird acronym stands for Model-View-View Model (MVVM) pattern + Service patterns all combined together. We provide a simplified definition and explanation:

MVVM is a design pattern to provide a clean separation of concerns between the UI and their logic. The View is separated with an intermediate layer called the view model (also known as presenter in the the MVP pattern), with a 2-way data binding where the view model notifies changes to the View and the View interacts with the view model with commands. This pattern enables a developer-designer workflow, makes more code reusable and makes it easier to unit test more components since Views are usually hard to test. Also, the MVVM approach allows to completely forget about our views during the development and only take care of the properties of the View models. The services and states running in the application are only aware of its view model and, once a property is updated in the view model, it immediately propagates to the view by its binding.

To have a better feature decoupling, RiftGG is divided into several views where the user can see different things. Roughly, each view and its logic has been implemented in a separate state so that it can be fully tested. Each state contains and manages several services which are responsible for performing most of the actions inside its state.

RiftGG as an Event-Driven application

Suppose that you have a service that is constantly running on your app (so, it does not belong to a specific state). For simplicity, we are going to suppose that this service (let's call it DownloaderService) downloads information from an API by intervals and notifies another service (called FeedBackService) each time another piece of information is available.

If we are concerned about decoupling, we should design a way that both services communicate with each other without knowing their existence. Somehow, the FeedBackService should know each time a new piece of information is available and not depend on the existence of the DownloaderService. A solution is to provide RiftGG with some sort of events broker. A class responsible to publish events of any type and that is available, as a singleton or static instance, to any service.

Here, for simplicity, we stick to the interface of this events broker and, as a remainder, RiftGG services can publish and subscribe to events using an implementation of the following interface.


public interface IEventsBroker
{
	void SubscribeAsync (Func action);

	Task Publish (TEvent e);
}
			

Problem: Overlapping task update the UI

So, now we have our DownloaderService that is constantly pulling pieces of information from a third party API while it's publishing events to our IEventsBroker interface. On the other side, our FeedBackService subscribes to the IEventsBroker and listens only to those same events. Every time a new event is published, our FeedBackService performs an action to update its view model and, due to its binding with a view, the information is updated for the user.

Everyone seems to be performing well, but there is a possible source of errors. We do not know the time span between 2 consecutive events, because each API request may take a different amount of time. This might cause that the update actions to overlap with each other, leading to an inconsistency in the UI.

This is something that actually happened in RiftGG. If the update action is a method that requires asynchronous operations (maybe calling another API or waiting for a database process to finish) that might take long, the next update action overlapped the first one. This is a very unpleasant bug because it is not something that can be easily reproduced.

So, how do we control those events that are constantly coming and perform the update actions in a consistent way?

IMAGES: Overlapping update actions

Solution: Schedule tasks on each event arrival

So, wouldn't it be enough that, when a new event is published, our FeedBackService created an Action and put it at the end of a queue where all the other actions would be waiting to be executed? This simple idea made us implement a single queue task scheduler. A class where our FeedBackService can enqueue tasks and execute them consecutively without them overlapping with each other. The actual implementation of the SingleQueueTaskScheduler is provided:


public class SingleQueueTaskScheduler
{
	// We store the tasks here.
	ConcurrentQueue tasks = new ConcurrentQueue ();
	Task currentTask;
	string currentTaskName;

	// Event to inform outsiders that a task has finished
	public event Action TaskFinished;

	// Exposes the Id of the current task 
	public Guid? CurrentTaskId { get; private set; }

	// The name of the scheduler. useful when debugging.
	public string Name { get; }

	// Exposes the state of the scheduler (running or paused)
	public bool Paused { get; private set; }

	// Gets the name of the tasks currently enqueued
	public List TasksNames => tasks.Select (t => t.TaskName).ToList ();

	// Get the Id of the tasks currently enqueued
	public List TaskIds => tasks.Select (t => t.TaskId).ToList ();

	public SingleQueueTaskScheduler (string name)
	{
		Name = name;
	}

	// Enqueues a task
	public Guid Enqueue (Expression> task)
	{
		ScheduledTaskInfo info = new ScheduledTaskInfo (taskName: GetTaskName (task), taskId: Guid.NewGuid (), expression: task.Compile ());
		tasks.Enqueue (info);
		LogEnqueuedTaskMessage (info.TaskName, info.TaskId);

		if (IsCurrentTaskDone ()) {
			ExecuteNextTask ();
		}

		return info.TaskId;
	}

	// Pauses the scheduler
	public async Task Pause ()
	{
		Paused = true;
		if (!IsCurrentTaskDone ()) {
			await currentTask;
		}
	}

	// Starts or resumes the scheduler
	public void Run ()
	{
		if (!Paused) {
			return;
		}
		Paused = false;
		ExecuteNextTask ();
	}

	// Empties the queue of tasks
	public void Clear ()
	{
		while (tasks.TryDequeue (out ScheduledTaskInfo taskInfo)) { }
	}

	// Pauses the scheduler and empties the queue
	public async Task Stop ()
	{
		await Pause ();
		Clear ();
	}

	void ExecuteNextTask ()
	{
		bool thereIsAFinishedTask = !string.IsNullOrEmpty (currentTaskName) && CurrentTaskId != null;
		if (thereIsAFinishedTask) {
			LogFinishedTaskMessage ();
			Guid finishedTaskId = CurrentTaskId.Value;
			currentTaskName = null;
			CurrentTaskId = null;
			TaskFinished?.Invoke (finishedTaskId);
		}

		if (Paused || !tasks.Any () || !tasks.TryDequeue (out ScheduledTaskInfo candidate)) {
			return;
		}

		currentTaskName = candidate.TaskName;
		CurrentTaskId = candidate.TaskId;
		LogStartingTaskMessage ();
		currentTask = Task.Run (candidate.TaskExpression);
		currentTask.ContinueWith (t => ExecuteNextTask ());
	}

	bool IsCurrentTaskDone () => currentTask == null || currentTask.IsCanceled || currentTask.IsCompleted;

	string GetTaskName (Expression> task) => task.Body is MethodCallExpression exp ? exp.Method.Name : task.Name;

	void LogStartingTaskMessage () => Log ($"{Name} - STARTED TASK : \"{currentTaskName}\" / {CurrentTaskId.Value}.");

	void LogFinishedTaskMessage () => Log ($"{Name} - FINISHED TASK : \"{currentTaskName}\" / {CurrentTaskId.Value}.");

	void LogEnqueuedTaskMessage (string taskName, Guid taskId) => Log ($"{Name} - ENQUEUED TASK : \"{taskName}\" / {taskId}.");
}
			

The scheduler contains 3 fields, the most important one is the tasks fields of type ConcurrentQueue. Here the incoming tasks are stored and are waiting to be executed. In the Enqueue method, the tasks are stored and the next available task is processed with the method ExecuteNextTask(). As you may have noticed, all the action happens in that last method. First, if there is a finished task, the scheduler logs some information with the method LogFinishedTaskMessage() and invokes the TaskFinished event. Then, it tries to pull the next task out of the queue and, if it does not succeed, it stops the execution. Finally, if a new task has been dequeued, the scheduler logs the start of this new task with the method LogStartingTaskMessage(), starts and sets the currentTask field and creates a task that will call the method ExecuteNextTask() itself once finished.

So essentially, the scheduler is a class that recursively calls the same method to process the next available tasks until no more tasks are available. Also, it exposes some methods to add a new task to the queue, pause, start and resume itself and alert anybody when a task is completed. Pretty simple, right? :) This powerful yet simple class has helped us in many situations.

But, how does the scheduler work in real life? Can we see it in action? Yes, of course! Let's see a simplified example extracted directly from our repository.

How do services define and schedule tasks?

Suppose, as explained before, that we have the DownloaderService constantly downloading info from the API. The downloader service constantly pulls information from the API, launches events to notify it and stops once the information is completed. The rough implementation would be something like this:


public class DownloaderService
{
	public async Task DownloadInformationInBackground()
	{
		bool isInfoCompleted = false;
		while (!isInfoCompleted) {
			Information info = await DownloadInfo();
			InvokeEvent(info);
			isInfoCompleted = info.IsComplete;
		}
	}

	void InvokeEvent(Information info)
	{
		// We publish the notification to our events broker
		App.EventsBroker.Publish(new AppEvent(info));
	}

	async Task DownloadInfo()
	{
		// Call the api ...
	}
}
				

public class InformationEvent
{
	public T Data { get; set; }

	public InformationEvent(T data)
	{
		Data = data;
	}
}
				

public class Information
{
	public int Id { get; set; }

	public bool IsComplete { get; set; }

	public T Data { get; set; }
}
				

On the other side of the EventsBroker, there is the FeedBackService that is listening to those events and acts consequently. It's worth noticing the method OnNewInformationAvailable(InformationEvent e). Here, instead of executing directly the operation after a new event arrives, this operation is enqueued and will be executed when required by the scheduler.


public class FeedBackService
{
	SingleQueueTaskScheduler scheduler;

	public FeedBackService()
	{
		// We listening to the correct events.
		App.EventsBroker.SubscribeAsync (e => OnNewInformationAvailable(e));

		// We run the scheduler so once we start enqueueing it processes the tasks.
		scheduler.Run();
	}

	async Task OnNewInformationAvailable(InformationEvent e)
	{
		// We enqueue the required task instead of inmediately execute it :)
		scheduler.Enqueue(() => UpdateInformation(e.Data));
	}

	async Task UpdateInformation(Information info)
	{
		try 
		{
			Log($"Update information started.");
			await WaitForSomethingToFinish(info);
			// Update the view model information in the UI thread.
			await App.Invoke(delegate { ViewModel.Information = info });
		} 
		catch (Exception e)
		{
			Log($"Update information failed: {e.ToString()}.")
		}
		finally
		{
			Log($"Update information finished.")
		}
	}

	async Task WaitForSomethingToFinish (Information info)
	{
		// You may need to call other apis or just wait for your database operation to conlude.
		await Task.Delay(TimeSpan.FromSeconds(2));
	}
}
				

Best exception handling practices when scheduling tasks

It is important to notice that, although the presented approach is very powerful, it comes with some risks. As you may have noticed, the tasks are enqueued and will be executed at an unknown moment so nobody can wait for them explicitly. This may lead to unhandled exceptions if one task fails. To avoid that, it is important that each task manages its exception independently and one way to do that is to surround our logic with a try+catch+finally structure and manage the exception there. You may see that inside the method UpdateInformation (Information info).

How do we update the UI on each task?

Until now, we assumed we were able to somehow execute code from any thread in the main thread (also known as UI thread), but how do we do that?

You may have noticed that inside the method UpdateInformation (information info) we are calling App.Invoke(...). This action is an implementation to execute delegates in the main thread. In our case, due to the fact that we use Xamarin Forms to develop RiftGG, the implementation is quite simple:


public static class App
{
	// Manages events from different sources.
	public IEventsBroker EventsBroker { get; set; }

	// Invokes the EventHandler in the main thread.
	public void Invoke (EventHandler handler)
	{
		Xamarin.Forms.Device.BeginInvokeOnMainThread (() => handler (this, null));
	}
}
				

Obviously, the implementation of the Invoke(...) method will depend on the framework you are using. In our case, Xamarin provides a Device class with a BeginInvokeOnMainThread (Action action) method. If you are planning to implement a similar approach, you should check first how to do the same with the technology you are using.

Conclusions

In this post, we have seen several things. First, a brief introduction about Fluendo and RiftGG was presented. Second, RiftGG was succinctly introduced as an application. The main design patterns used to develop RiftGG were discussed: MVVM, State, Service, Event-Driven, etc. Then, we talked about issues that appeared when we were listening to notifications from a source at an unknown interval of time. The adopted solution was proposed with some additional resources as pieces of code from our repositories.

RiftGG users demand the app to be responsive and quick. They do not care about secondary operations that don't provide them with added value. The asynchronous programming has proven to be a "must" in developing an app with these characteristics.

The SingleQueueTaskScheduler enables us to protect the app from events from external sources. Also, provides a clean and fluent interface to manage interdependent tasks that need to be executed in order. In addition, classes and a simple skeleton were provided in case a similar implementation was attended. Finally, some guidelines in exception handling, communication between services and schedulers and executing actions the UI thread were provided.

Hope you enjoyed the post and see you soon :)!