Multiversion concurrency control implementation

In this post I want to discuss my multiversion concurrency control implementation. Written in Java, it is very straightforward. We use Java threads (which are just native threads) to simulate concurrency in a simple database. See Runner for the scenario that runs. See TransactionC for the transaction that has contention.

When a transaction begins or restarts, it is assigned an identifier which is its timestamp. This number is unique to the transaction and increments for every transaction attempt. Runner will spin up 100 concurrent TransactionC. TransactionC will try to increment two numbers, A and B. A starts at 1 and B starts at 2 at the beginning of the simulation. Of course if you didn’t have multiversion concurrency control, these two numbers would not finish on 101 and 102 due to data races. You have to wait for the addition of the previous number before adding the next number.

When a transaction issues a Read, the fact that a read occurred is tracked. The read will only succeed if there is a committed value that has a version that is at a timestamp less than or equal to the readingg transaction’s timestamp. This essentially means that transactions can see their own writes but can only see old values and cannot see the values that are temporarily created by other transactions.

At commit time, we check the list of transactions that read the key A or B. If there are any that have a timestamp less than the current timestamp, then the current transaction is aborted and restarted. Those younger transactions “win” the chance to commit while everyone else has to restart. There is an exception to this which prevents two transactions committing at the same time – this is that if there is an older transaction that is ahead of us, then we let that transaction win.

Data maintenance system and data movement system

Denormalisation could just be an illusion if we have a system that tracks all places where similar data resides and is responsible for updating it. The same system would be useful for caching, it would keep track of all the caches of a piece of information and keep them up-to-date.

I call this the data maintenance system. The data maintenance system is a frontend to your data. It stores data in underlying systems. The data maintenance system should deploy various algorithms to help index data that can be configured for the data in question. The data maintenance system should be no stranger to indirection of data lookup and retrieval. The data maintenance system is aware of other systems and the data inside them. You register the presence of a MySql database and describe the structure of the data that is inside MySql. The same for redis, postgres and other databases you have. The data maintenance system works out how to fetch data from the underlying datastore. Have an elastic search cluster that needs to be populated with data from the database? That can be scheduled too.

I should be able to instance data structures such as a btree or a radix trie to index records and configure them. Or shard data where necessary.

I should be able to migrate my data easily by switching databases and have the data be migrated automatically. I should be able to shift data around for efficiency. Hot keys should be detected and migrated or load balanced.

The data maintenance system could be a SQL based or keyvalue based.

Data structure placement

Compilers have optimised instruction placement and register use. We don’t need to put further effort into optimising instruction selection. We now need to chase the remaining 90% of performance by optimising data structures. I propose data structure optimising machines. They should be aware of cache lines and VM paging.

A data structure optimiser would place data to optimise cache line usage. It analyses and rewrites code to make use of caches. For example:

for item in game_entities:
  x_vector = math.sin(item.angle)
  y_vector = math.cos(item.angle)
  item["x"] -= x_vector * item.speed
  item["y"] -= y_vector * item.speed

In this code, we require item.angle and item. speed. So we can store the game data like so:

game_entities = [
{"angle": 30 / Math.PI * 180, "speed": 1, "x": 100, "y": 100},
{"angle": 160 / Math.PI * 180, "speed": 1, "x": 50, "y": 50}
]

We want game_entities.angle, game_entities.x, game_entities.y, game_entities.speed to all be nearby one another so they are loaded together by the cache lines.

I want to be able to write the above code, even if the data layout is somewhat different. The compiler should make the conversions necessary to access all the data. For example, if we were running mass calculations across game entities, we might produce this layout:

xes = [100, 50]
yes = [100, 50]
speeds = [1, 1]
angles = [30, 160]
number_of_entities = 2
for item in range(0, number_of_entities):
  x = xes[item]
  y = yes[item]
  speed = speeds[item]
  angle = angles[item]
  xes[item] -= Math.sin(angle) * speed
  yes[item] -= Math.cos(angle) * speed

It’s almost like an automatic switchover to column based variables rather than row based variables. A bit like how we have column orientated databases and row based databases.

How do we grow if everything is free?

Browsers, search engines, music sharing, video sharing websites are all free. And I’m here to tell you that it is a bad thing.

Free things are very difficult to compete with, so they actually reduce competition in a market. Free is good enough. Why would anybody start a search engine to compete with Google? They cannot compete with free.

I’d like to be in a world where antitrust law kicks in and you have to pay for a search engine and a browser. There could be multiple browsers in the market, competing on performance and features. It would be like the 90s and early 2000s when software was competitive.

The computation is trivial, it’s the arrangement of objects and methods that is complicated

Two programmers can write the same program and the programs will be completely different because different variable names were used, different grouping of data was used and so on. The classes are incompatible.

What if there was a language which could be written in such a way that generally if the computation is the same, then the code is the same. There’s only one way to write an addition. But theres hundreds of ways to write a btree.

The grouping operators: variable, classes, structs, classes, methods, functions, types all need to be abstracted. They all do the same thing. They group code together or where data is stored in memory. They’re an abstraction or syntactic sugar.

Interfaces partially solve the problem of interoperability. But it’s painful to implement 100s of methods in an interface.

Most modern programming isn’t writing number crunching formulas (actual computation) but arrangement of object of the right type that contain the right data or arrangement of method calls. This book keeping is so complicated that software is slow to write and interoperability hard to impossible.

I feel we haven’t found the right abstraction for writing and reasoning about code. Methods, classes, structs, variables, types are all incomplete. Everyone has strong opinions about what code is good. But one man’s beauty is another man’s garbage code. And there’s a lot of garbage code out there.

What upsets me is how unreadable mature codebases are. There’s no separation of concerns that is readable. Every concern is mushed together and sprinkled throughout the codebase. Little of a mature codebase is actually algorithmic, it’s mostly poorly managed complexity. The complexity is self-created. The abstractions are rife and terrible.

I’m still trying to think of alternative abstractions that make code interoperable by default.

I’m worried that modern computing is too complicated

Our computing heritage is obnoxiously complicated and full of nuances and edge cases. It’s a wonder that anything can be maintained and not just rewritten.

I’ve been studying the OpenJDK JVM and reading about sneaky locking and it’s an area of code that is fundamentally complicated. Very few people actually understand the JVM’s internals. This knowledge will die out unless replaced by new people who learn the JVM.

OpenJDK is one of the few interpreters that offers true parallelism. I bet the code to support threading is very complicated and rarely touched (with the exception of maybe Loom). I worry that the number of people that have the ability to implement parallelism in an JIT interpreter is very small. It would be nice if the knowledge and skill that went into the JVM could be extracted somehow. The JVM is a marvel of computer engineering and very impressive. I just wonder if parts of it can be written in a higher level language so it becomes easier to understand without losing its performance benefits.

Imagine a basis for JIT interpreters like LLVM that had intermediate form representations that anybody could improve. Or we could implement other languages on the JVM. Python on the JVM would be nice.

Code is too subtle – use plain English

You’ve got a brilliant idea to improve performance 4000x or an algorithm that works really well. You would think open sourcing the code is enough to share that with the world but it’s not true. Code is too subtle. Nobody knows the file with the interesting stuff in it. Very few people will spend the time reading the code if they don’t have the intention of contributing. You need documentation or better yet, a design document or an article in ACM describing your approach in detail. English is the winning language for technical problems.

Please write up your learnings in your blog.

Write everything in Python then transpile or compile it to native

I think languages like Rust and C++ obfuscate algorithms. They include a lot of stuff that has no bearing on the algorithm that get in the way of understandability.

Write the core of your algorithm in python, then compile it or transpile it to a native language for performance. You get the best of both worlds – performance and developer productivity. Python just needs C style loops.

Query everywhere

It’s frustrating that every system requires a data import before data can be useful. Too much data is outside databases so it cannot be queried. Data has to be duplicated to be queryable. For example, if you store data in S3, you have to download the data before you can query it. Or you use MySql you have to upload your data to Rockset to query it fast for analytic queries.

Ideally we should be running queries where the data lives. You cannot run queries on S3. TiDB has a feature whereby it can push computation to the data storage layer, closer to where the data lives in TiKV (a keyvalue store), this is called operator pushdown.

Data is outside databases and trapped in binary formats. It would be nice to query my file system for files that were modified between dates and get results back immediately. This is where my idea to index everything comes from. Storage is cheap – we can index everything remotely and get the best of both worlds.

Graph matching

I have added graph support to hash-db. This post outlines how graph matching works.

We can match nodes in a graph by creating a matrix that represents those connections only then multiply that with a matrix that is the adjacency matrix. For example, if we want to execute the following cypher query:

match (start:Person)-[:FRIEND]->(end:Person), (start)-[:LIKES]->(post:Post), (end)-[:POSTED]->(post:Post), (post:Post)-[:REFERS]->(person:Person) return start, end, post, person

We need to pick a starting location, which I suspect will be (start:Person {‘name’: ‘Samuel’}), generate a FRIEND matrix with this start node masked.

    def create_matrix(self, adjacency_matrix, nodes):
        multiply = np.empty((adjacency_matrix.shape[0], 1))
        multiply.fill(0)
        for node in nodes:
                multiply[node["position"]][0] = 1
        return multiply

...

# Then to search the matrix:
                        for match in matches:
                            node = match["to_node"]
                            multiply_matrix = self.create_matrix(adjacency_matrix, [node])
                            edges_from = np.matmul(adjacency_matrix, multiply_matrix)

                            for item in range(0, edges_from.shape[0]):

                                if edges_from[item][0] == 1:
                                    direction_index = "{}_{}".format(node["position"], self.nodes[item]["position"])


                                    if self.directions[relationship_name].get(direction_index, False) == True:
                                        inserted = True
                                        print("{} -{}-> {}".format(node["name"], relationship_name, self.nodes[item]["name"]))
                                        forward_relationship = {
                                            "relationship": relationship_name,
                                            "from_node": copy.deepcopy(node),
                                            "to_node": copy.deepcopy(self.nodes[item]),
                                            "source_relationship": match

                                        }
                                        relationship["matches"].append(forward_relationship)

How to decide which node to begin at is a general problem. We can start at multiple nodes at once. I am guessing I can use order within the query. Or toplogical sort the variables. We need to do an attribute match too. We’re saying we’re looking for a node that has an attribute of name and a value of Samuel.

This is how we create a mask matrix which we multiply our data by to do a breadth first search:


                multiply[node["position"]][0] = 1

How does the main loop work?

We loop over the query plan, which looks like this:

[{'attributes': {}, 'kind': 'match', 'label': 'Person', 'variable': 'start'},
 {'kind': 'relationship', 'name': 'FRIEND'},
 {'attributes': {}, 'kind': 'match', 'label': 'Person', 'variable': 'end'},
 {'kind': 'match', 'variable': 'start'},
 {'kind': 'relationship', 'name': 'LIKES'},
 {'attributes': {}, 'kind': 'match', 'label': 'Post', 'variable': 'post'},
 {'kind': 'match', 'variable': 'end'},
 {'kind': 'relationship', 'name': 'POSTED'},
 {'attributes': {}, 'kind': 'match', 'label': 'Post', 'variable': 'post'},
 {'attributes': {}, 'kind': 'match', 'label': 'Post', 'variable': 'post'},
 {'kind': 'relationship', 'name': 'REFERS'},
 {'attributes': {}, 'kind': 'match', 'label': 'Person', 'variable': 'person'}]

The match nodes handle data loading into and out of variables. When we encounter a relationship node ({“kind”: “relationship”}, we run a numpy matrix multiplication query with the previous relationships object “to_node”. So every query is relative to the previous set of results. This lets us walk the query graph with very little code.

From the very first match statement, we generate a single object that tracks all the outcomes from matching against that match. We accumulate query intermediary objects in an old_matches key. For every second match statement, we mark all the resulting node objects as having seen the variable, i.e, they are bound to the variable name.

At the end, we have an object with every part of the query evaluated and a history of relationships. We need to join the data produced. We do that by walking the history and seeing which nodes are assigned to which variables, then we accumulate them.

Now we have to join the data.

How to join data

Imagine, I encounter {‘kind’: ‘match’, ‘variable’: ‘end’} in the above query plan. This means, retrieve the data that was output by variable end. This is a list of relationships that looks like the following:

Here’s one relationship. Tasya-[:POSTED]->Thoughts.

 {'from_node': {'attributes': {'label': 'Person',
			'name': 'Tasya'},
			 'matches': 'end',
			 'name': 'Tasya',
			 'position': 1},
'planning_index': 8,
'relationship': 'POSTED',
'source_relationship': ...,
'to_node': {'attributes': {'label': 'Post',
                  'name': 'Thoughts'},
		   'matches': 'post',
		   'name': 'Thoughts',
		   'position': 9}}

Notice the from_node has matched: ‘end’ while the to_node has matched: ‘post’. This is because of the “return start, end, post, person in the original query. Each node gets marked when it is the output of a relationship search. The source relationship refers to the relationship that caused this relationship – the previous search results. From the source_relationship, we can get all resolved variables. The code to accumulate variables is a simple recursive function that accumulates variables into rows.

        def accumulate_variable(output_row, match, seenbefore):
            seenbefore.append(match)
            if "from_node" in match and "matches" in match["from_node"]:
                output_row[match["from_node"]["matches"]] = match["from_node"]
            if "matches" in match["to_node"]:
                output_row[match["to_node"]["matches"]] = match["to_node"]
            if "source_relationship" in match:
                accumulate_variable(output_row, match["source_relationship"], seenbefore)
             

        
        if parser["match"]:
            for relationship in relationships:
                for match in relationship["matches"]:
                    if "inserted" not in match:
                        output_row = {}
                        accumulate_variable(output_row, match, [])
                        yield output_row

This algorithm seems to give the correct results. I don’t know if I actually need to genuinely join (hash join) or not. Oh well.