Functional reactive programming in Python

Functional reactive programming (FRP) is probably the hottest 🔥 topic in mobile apps development world right now. GitHub trending repositories list is packed with reactive frameworks (RxSwift, ReactiveCocoa and RxJava to name just a few). But is the pattern's usability limited to iOS and Android only?

For the past couple of months, I was responsible for implementing webhooks to extend functionalities provided by my company's proprietary CMS platform. The code was packed with a variety of integrations, but a general concept was always the same: a RESTful service receiving a JSON input, which was processed using a set of different APIs and transformed back to the other JSON response. During that period I fell in love ❤️ with funcy, a functional tool belt for Python. Initially, I wanted to write an article describing the library, but that would be too straightforward, wouldn't it?

The idea has quickly evolved when I bumped into RxPY repository. I have never tried using FRP outside of mobile apps, but I have tried to chain a bunch of API calls together and was never fully satisfied with a result. Having a tool to try out and a day off I have started scraping together a small playground project.

Playground scenario

Let's imagine you have to hire a co-programmer for the upcoming project. GitHub would be an awesome place to start headhunting:

  • there are a lot of developers having experience with almost any technology stack,
  • you can verify candidates' skills instantly.

The problem is that selecting a couple of candidates is not that easy when there are over 20,000,000 accounts to choose from. Let's come up with a naive algorithm to pick potential co-workers:

  • As a software developer you probably bookmark repositories that are relevant to your work. Your co-worker should be familiar with the same technology stack as you are. Going through contributors of your favourite (starred) repositories might be a good idea.
  • We will definitely have a better shot picking accounts flagged as available for hire.

Draft of the algorithm to implement

The diagram above shows how to implement the algorithm using GitHub API:

  1. The first step is to find repositories starred by the selected user (in this case, our own account).
GET https://api.github.com/users/{username}/starred
  1. For every repository, download a list of contributors.
GET https://api.github.com/repos/{full_repo_path}/contributors
  1. Perform contributors filtering:
  • own contributions have to be removed,
  • duplicated contributors have to be merged,
  • contributors not available for hire should be dropped.

To find out whether a user is available for hire, we have to fetch user details from GitHub API. The hireable boolean flag can be read at:

GET https://api.github.com/users/{username}
  1. Contributors should be sorted by a number of starred repositories they are contributing to. We are interested the most in the people who are experts in a majority of our favourite technology stack.

Example API responses can be viewed in a browser:

Seems like there is a lot of data processing to do!

Dependencies

Apart from the aforementioned funcy and RxPY, we are also going to use requests - probably the most beautiful networking library I have ever come across. It will help us to seamlessly integrate with GitHub API. Let's create requirements.txt file:

funcy==1.7.1
requests==2.11.1
Rx==1.5.2

... and install the dependencies with pip install -r requirements.txt.

ReactiveX

RxPY is a part of the ReactiveX project. As the docs state, the ReactiveX is an extension to the observer design pattern. For the sake of this tutorial, all you need to know is that it provides three building bricks:

  • Observable is a component that produces the data. It can emit multiple values over the time (data stream). An Observable starts emitting data as soon as another object subscribes to it.
  • Functional methods for transforming data emitted by Observable (filtering, mapping, etc.).
  • Observer is a component that watches the changes emitted by the Observable. It can handle three different events:
    • on_next - invoked when the data is received from Observable,
    • on_completed - invoked after Observable emits all the data it possibly can,
    • on_error - invoked when Observable fails to produce a valid data (for example a network request failure).

Observable API

With this short theoretical introduction, we can start building utility methods to provide Observable API calls. First of all, we are going to wrap requests.request(method, url, **kwargs) method into the Observable. To do that, we implement subscribe method which defines what kind of data and in which point of time is transmitted to an observer:

def rx_request(method, url, **kwargs):
  def subscribe(observer):
    response = requests.request(method, url, **kwargs)

    try:
      response.raise_for_status()
      observer.on_next(response)
      observer.on_completed()
    except requests.HTTPError as e:
      observer.on_error(e)

    return lambda: None

  return rx.Observable.create(subscribe)

The implementation is pretty straightforward:

  1. When a new observer subscribes, we fire a network request with all of the arguments of the enclosing rx_request method.
  2. After the response is received, we need to validate its status code using raise_for_status() method. It throws HTTPError if the request was not successful.
  3. Having validated the status code, we simply return a response to the observer with on_next(response). Since the observer has finished producing the data, on_completed() method is called afterwards.
  4. When HTTPError is raised (status code validation failed), we should inform the observer of the error by calling on_error(e).

The subscribe method returns something strange: a function which does nothing (lambda: None). Why is that? It is a shortcut for generating an empty Disposable object. Disposable is a component that provides instructions for freeing resources allocated by the observable. Imagine our observable starts a threading.Thread. When the observable completes its job or reports an error, we no longer need this thread to run. The disposable should be able to stop such a thread (i.e. by calling stop() on StoppableThread). The disposable is automatically executed by the library when needed.

For anyone working with asynchronous networking code on a daily basis, empty disposable for such a code will look like a bug. However, in Python networking requests are synchronous by default, so by the time that disposable is returned from subscribe method, the request is always finished.

There is one more thing to note. We do not have to explicitly wrap raise_for_status() in a try-except block. RxPY's observable catches all the exceptions on subscribe by default, so any exception thrown out of observable will be reported back as an error to the observer. The try-except block is there for demonstration purposes only.

JSON

We have wrapped an arbitrary networking request into an observable. Since GitHub API returns the data in JSON format, we can go a step further and implement rx_json method:

def rx_json(method, url, **kwargs):
  return rx_request(method, url, **kwargs) \
    .map(lambda r: r.json())

The implementation is as simple, as mapping the requests.Response object to a JSON dict. If the mapping fails (the response is not a valid JSON), ValueError will be reported back to the observer.

For the playground application only GET requests are needed. Thus another simple wrapper can be introduced:

def rx_get_json(url, **kwargs):
  return rx_json('get', url, **kwargs)

GitHub service

As stated in the introduction, we will need three methods to access the data from GitHub API:

# Finding repositories starred by a user
def rx_starred_repositories(self, username):
  return rx_get_json('https://api.github.com/users/{0}/starred'.format(
            username))

# Listing contributors for a repository at a given path
def rx_contributors(self, repository_path):
  return rx_get_json(
    'https://api.github.com/repos/{0}/contributors'.format(
      repository_path))

# Fetching details for a user with given username
  def rx_user_details(self, username):
    return rx_get_json('https://api.github.com/users/{0}'.format(
      username))

As you can see, all of those methods simply invoke rx_get_json wrapper with a properly constructed URL.

Finding hireable users

All the basics are covered. What is left is to implement the algorithm itself. This is a fun part which enables us to unleash a functional beast. 😈 Let's start with a final version of the code. Next, we will analyse every line.

class HireableFinder: 
  # Returns hireable users
  def rx_find_hireable(self, initial_user):
    return self.starred_repo_names(initial_user) \
      .flat_map_latest(self.contributor_logins) \
      .map(funcy.rpartial(funcy.without, initial_user)) \
      .map(self.contributors_sorted_by_repos_contributed_to) \
      .flat_map_latest(self.filter_hireable_users)
  
  # Returns names of repositories starred by a user
  def starred_repo_names(self, user):
    return rx_starred_repositories(user) \
      .map(funcy.partial(funcy.lpluck, 'full_name'))
  
  # Returns logins of contributors for given repositories
  def contributor_logins(self, repos):
    contributors = funcy.lmap(rx_contributors, repos)
  
    return rx.Observable.concat(contributors) \
      .buffer_with_count(len(repos)) \
      .map(funcy.flatten) \
      .map(funcy.partial(funcy.pluck, 'login'))
  
  # Sorts contributors by a number of repositories contributed to
  def contributors_sorted_by_repos_contributed_to(self, contributors):
    users_by_repos = funcy.count_by(None, contributors).items()
    sorted_users = sorted(users_by_repos, key=itemgetter(1), reverse=True)
  
    return funcy.lpluck(0, sorted_users)
  
  # Filters hireable users only
  def filter_hireable_users(self, users):
    user_details = funcy.lmap(rx_user_details, users)

    return rx.Observable.concat(user_details) \
        .buffer_with_count(len(users)) \
        .map(funcy.partial(funcy.where, hireable=True)) \
        .map(funcy.partial(funcy.lpluck, 'login'))

The rx_find_hireable method reflects the algorithm line by line:

  1. starred_repo_names(u) finds full names of repositories starred by a user.
  2. contributor_logins(r) finds logins of users, who contributed to a given list of repositories.
  3. funcy.rpartial(funcy.without, u) removes a given user login from the list.
  4. contributors_sorted...(c) removes duplicate entries and sorts contributors by a number of times they were present in the list.
  5. filter_hireable_users(u) filters users which are hireable only.

As you can see, some of the methods are triggered with a map call and others use flat_map instead. What is the difference?

map / flat_map

Let's denote observable of X type as an Observable<X>, so that an observable str stream becomes Observable<str>:

  • The map method specifies how to transform an object contained inside a wrapper (Observable), leaving a wrapper aside. For example, having an Observable<str> as strings you can find a length of those strings by strings.map(lambda s: len(s)). The map describes str -> int transformation.
  • The flat_map method describes how to transform a wrapping container into another container, so it has a type of Observable<X> -> Observable<Y>. In ReactiveX world it means there is some additional work to be done by the observable, like a network request or a database query.

In our case, the flat_map method is used for the data transformations that require additional network requests (fetching contributors list by a repository name, filtering hireable users). On the other hand, map is used for no-additional-work data transforms, such as filtering out a name from the list or removing duplicates.

rx_find_hireable relies on funcy to drop a static user name from a list of users. without method takes a sequence and drops all of the items specified in a call. For example:

one_two_three = [1, 2, 3]
two_three = funcy.without(one_two_three, 1)
# [2, 3]

Partial appliance

For a map method we need to pass a function that takes a single parameter only (list of items). On the other hand, without needs two parameters (1. list of items and 2. values to remove). Since we know the element to remove upfront, we have a couple possibilities to transform this function:

  • Using lambda expression: lambda l: funcy.without(l, 'user'),
  • Using partial parameter appliance. Partials convert a function to a special object, which invokes a method in question with some of the parameters "hardcoded". There are two variants of partials in funcy: partial applies parameters from the left side and rpartial does the same from the right side.

Here is a simple example of partial argument appliance:

dust_remover = funcy.rpartial(funcy.without, 'dust')
room = ['desk', 'dust', 'computer']
cleaned_room = dust_remover(room) # funcy.without(room, 'dust')
# ['desk', 'computer']

Pluck

Another handy method from funcy package is pluck. It allows mapping an array of objects using __getitem__() accessor. Example:

students = [
  {'id': 1, 'name': 'John Doe'},
  {'id': 2, 'name': 'Jason Smith'},
]
names = funcy.pluck('name', students)
# ['John Doe', 'Jason Smith']

points = [(1, 2), (5, 8)]
x_axis_points = funcy.pluck(0, points)
# [1, 5]

students_grades = [[4, 5, 3, 5], [2, 4, 1], [1, 3]]
students_grades_but_last = funcy.pluck(
  slice(0, -1), students_grades) # slice(0, -1) is equivalent of x[:-1]
# [[4, 5, 3], [2, 4], [1]]

As you can see, starred_repo_names method from the HireableFinder uses lpluck method instead of pluck. Starting with Python 3 functional operators (filter, map) return generators instead of real data structures. Using generators when possible improves performance a lot. The tradeoff is that to get a filtered list it has to be manually constructed:

numbers = [1, 2, 3, 4, 5]
odd = filter(lambda n: n % 2, numbers)
# <filter object at 0x1022036d8>
odd_list = list(odd)
# [1, 3, 5]

funcy adopts the approach of returning generators only. However, to make data structure construction less verbose, it provides two variants for almost every method operating on a sequence:

  • non-prefixed - returns a generator,
  • prefixed with l - returns a real object.

So lpluck('f', l) is just a shortcut for list(pluck('f', l)).

Concatenating observables

Here comes (probably) the hardest part of the code. The contributor_logins(repos) receives a list of repository names and transforms it into a list of users who contribute to that repositories. Single observable has to be transformed into a chain of observables and merged back.

First of all, we have to create a list of Observables, one for each repository. We do this using funcy.lmap method (which is like a standard list(map(...)) in this case). The reason we need a list instead of a generator is that the concat method we are going to use later is not compatible with generators.

Now, we have to merge all of the observables into a single one. Let's examine how concat method works:

observable = Observable.from_([1, 2])
other_observable = Observable.from_([3, 4])
Observable.concat(observable, other_observable) \
  .subscribe(on_next=lambda n: print(n))
# Output:
# 1
# 2
# 3
# 4

So, the concat method emits all of the values from the first observable, then from the second, third and so on... It is good, but not ideal for our case. We need to get all of the contributors in a single emission, not partitioned by the repositories.

Fortunately, there is a buffer method which groups emissions together. In our case buffering is really simple. We know how many emitted values should be chained together - that is a count of repositories (there is a single list of contributors emitted for every repository). Hence, we can use buffer_with_count() method:

observable = rx.Observable.from_(['a', 's', 'd', 'f'])
observable.buffer_with_count(2) \
  .subscribe(on_next=lambda n: print(n))
# ['a', 's']
# ['d', 'f']

Processing of contributors is not finished yet. First of all, the buffering operation will emit a list of values. In our case every value is a list itself, so a list of lists will be emitted. To get a flat list of contributors, the flatten method can be used. Its functionality is presented below:

not_flat = [[1, 2, 3], [4, 5], [6]]
flat = funcy.flatten(not_flat)
# [1, 2, 3, 4, 5, 6]

Since we are interested in logins only, the lpluck comes to the rescue once again.

Sorting by repositories contributed to

By the time contributors_sorted_by_repos_contributed_in method is invoked, we already have a list of contributors (minus the initial user). However, those contributors will probably be duplicated because the same person can push code to multiple repositories. Moreover, one of the requirements is to put users with most repositories contributed to in front of the list, so we have to implement sorting.

Funcy has a method called count_by. It executes a mapping method on every element of the sequence and returns a dictionary which maps each result to a number of times it appeared. Example:

users = ['john', 'ALICIA', 'MARK']
user_angriness = funcy.count_by(lambda u: u.upper() == u, users)
# {False: 1, True: 2}

To count a number of occurrences in a list we need to use an identity method (just return the argument). Funcy provides a fancy (pun intended) shortcut by means of extended function semantics:

names = ['John', 'Steven', 'Patricia', 'John']
names_count = funcy.count_by(lambda n: n, names)
# Or the same expression using extended function semantics notation
names_count = funcy.count_by(None, names)
# {'Patricia': 1, 'Steven': 1, 'John': 2}

Having the mapping between every user and the count of repositories contributed to, we can sort its items from the most contributions to the least:

names_count = {'Patricia': 1, 'Steven': 1, 'John': 2}
# itemgetter(1) could be replaced with lambda i: i[1]
sorted(names_count.items(), key=itemgetter(1), reverse=True)
# [('John', 2), ('Patricia', 1), ('Steven', 1)]

The last missing piece of the puzzle is lpluck(0, _) which drops redundant counts leaving logins only.

Filtering hireable users

Filtering hireable users is almost the same as fetching contributor lists. The only difference is that instead of flattening the list (this time we get one result per one emission) we have to remove entries with hireable != True. Since the input is a list of dictionaries, we can use where method to do that:

employees = [
  {'id': 1, 'name': 'John', 'position': 'developer'},
  {'id': 2, 'name': 'Smith', 'position': 'account'},
]
developers = funcy.where(employees, position='developer')
# [{'name': 'John', 'position': 'developer', 'id': 1}]

Bonus: In this example, you can see how Python (as a dynamic language) allows building really beautiful APIs. Since arguments can be parsed by a function as a name-value mapping, you do not have to specify a dictionary of filters and you can pass them to a function directly. Otherwise, the API would have to look alongside below lines:

# This is not that beautiful
funcy.where(sequence, conditions={'position':'developer'})

Running the application

The last step remaining is to test run the HireableFinder. Let's do it!

hireable_finder = HireableFinder()
hireable_finder.rx_find_hireable('octocat')

Aaaaaaand... Ladies and gentlemen, nothing happens! Why? Because we have not subscribed to the observable yet. By the laws of observer pattern, none of the declared data processing operations will execute until someone subscribes. So, the test run should look like this:

hireable_finder = HireableFinder()
hireable_finder.rx_find_hireable('octocat') \ 
  .subscribe(on_next=print_hireables)

def print_hireables(hireables):
  hireables_text = '\n'.join(map(
    lambda l: '{0}. {1}'.format(l[0] + 1, l[1]),
    enumerate(hireables)))

  print(hireables_text)

You should see the following output:

1. Spaceghost

Conclusions

Reactive functional programming in Python is a lot of fun. The resulting code is concise and has no side effects, which makes it easy to test. If you are a Python programmer and have never tried it, I recommend you to do so! On the other hand, I really doubt this code makes any sense to a functional world's newcomer...

My final recommendation: if your team is eager for a new challenge and has a bit of spare time for experimenting, then think about giving reactive a try. On the contrary, if your team consists of developers with no previous experience in functional programming and the deadline is coming shortly, you should definitely avoid functional reactive programming. The advantages of that approach (especially for purely synchronous flow) are not big enough to outweigh numerous problems it can cause for others to even understand the code.

Show Comments