How do we grow if everything is free?

Browsers, search engines, music sharing, video sharing websites are all free. And I’m here to tell you that it is a bad thing.

Free things are very difficult to compete with, so they actually reduce competition in a market. Free is good enough. Why would anybody start a search engine to compete with Google? They cannot compete with free.

I’d like to be in a world where antitrust law kicks in and you have to pay for a search engine and a browser. There could be multiple browsers in the market, competing on performance and features. It would be like the 90s and early 2000s when software was competitive.

The computation is trivial, it’s the arrangement of objects and methods that is complicated

Two programmers can write the same program and the programs will be completely different because different variable names were used, different grouping of data was used and so on. The classes are incompatible.

What if there was a language which could be written in such a way that generally if the computation is the same, then the code is the same. There’s only one way to write an addition. But theres hundreds of ways to write a btree.

The grouping operators: variable, classes, structs, classes, methods, functions, types all need to be abstracted. They all do the same thing. They group code together or where data is stored in memory. They’re an abstraction or syntactic sugar.

Interfaces partially solve the problem of interoperability. But it’s painful to implement 100s of methods in an interface.

Most modern programming isn’t writing number crunching formulas (actual computation) but arrangement of object of the right type that contain the right data or arrangement of method calls. This book keeping is so complicated that software is slow to write and interoperability hard to impossible.

I feel we haven’t found the right abstraction for writing and reasoning about code. Methods, classes, structs, variables, types are all incomplete. Everyone has strong opinions about what code is good. But one man’s beauty is another man’s garbage code. And there’s a lot of garbage code out there.

What upsets me is how unreadable mature codebases are. There’s no separation of concerns that is readable. Every concern is mushed together and sprinkled throughout the codebase. Little of a mature codebase is actually algorithmic, it’s mostly poorly managed complexity. The complexity is self-created. The abstractions are rife and terrible.

I’m still trying to think of alternative abstractions that make code interoperable by default.

I’m worried that modern computing is too complicated

Our computing heritage is obnoxiously complicated and full of nuances and edge cases. It’s a wonder that anything can be maintained and not just rewritten.

I’ve been studying the OpenJDK JVM and reading about sneaky locking and it’s an area of code that is fundamentally complicated. Very few people actually understand the JVM’s internals. This knowledge will die out unless replaced by new people who learn the JVM.

OpenJDK is one of the few interpreters that offers true parallelism. I bet the code to support threading is very complicated and rarely touched (with the exception of maybe Loom). I worry that the number of people that have the ability to implement parallelism in an JIT interpreter is very small. It would be nice if the knowledge and skill that went into the JVM could be extracted somehow. The JVM is a marvel of computer engineering and very impressive. I just wonder if parts of it can be written in a higher level language so it becomes easier to understand without losing its performance benefits.

Imagine a basis for JIT interpreters like LLVM that had intermediate form representations that anybody could improve. Or we could implement other languages on the JVM. Python on the JVM would be nice.

Code is too subtle – use plain English

You’ve got a brilliant idea to improve performance 4000x or an algorithm that works really well. You would think open sourcing the code is enough to share that with the world but it’s not true. Code is too subtle. Nobody knows the file with the interesting stuff in it. Very few people will spend the time reading the code if they don’t have the intention of contributing. You need documentation or better yet, a design document or an article in ACM describing your approach in detail. English is the winning language for technical problems.

Please write up your learnings in your blog.

Write everything in Python then transpile or compile it to native

I think languages like Rust and C++ obfuscate algorithms. They include a lot of stuff that has no bearing on the algorithm that get in the way of understandability.

Write the core of your algorithm in python, then compile it or transpile it to a native language for performance. You get the best of both worlds – performance and developer productivity. Python just needs C style loops.

Query everywhere

It’s frustrating that every system requires a data import before data can be useful. Too much data is outside databases so it cannot be queried. Data has to be duplicated to be queryable. For example, if you store data in S3, you have to download the data before you can query it. Or you use MySql you have to upload your data to Rockset to query it fast for analytic queries.

Ideally we should be running queries where the data lives. You cannot run queries on S3. TiDB has a feature whereby it can push computation to the data storage layer, closer to where the data lives in TiKV (a keyvalue store), this is called operator pushdown.

Data is outside databases and trapped in binary formats. It would be nice to query my file system for files that were modified between dates and get results back immediately. This is where my idea to index everything comes from. Storage is cheap – we can index everything remotely and get the best of both worlds.

Graph matching

I have added graph support to hash-db. This post outlines how graph matching works.

We can match nodes in a graph by creating a matrix that represents those connections only then multiply that with a matrix that is the adjacency matrix. For example, if we want to execute the following cypher query:

match (start:Person)-[:FRIEND]->(end:Person), (start)-[:LIKES]->(post:Post), (end)-[:POSTED]->(post:Post), (post:Post)-[:REFERS]->(person:Person) return start, end, post, person

We need to pick a starting location, which I suspect will be (start:Person {‘name’: ‘Samuel’}), generate a FRIEND matrix with this start node masked.

    def create_matrix(self, adjacency_matrix, nodes):
        multiply = np.empty((adjacency_matrix.shape[0], 1))
        multiply.fill(0)
        for node in nodes:
                multiply[node["position"]][0] = 1
        return multiply

...

# Then to search the matrix:
                        for match in matches:
                            node = match["to_node"]
                            multiply_matrix = self.create_matrix(adjacency_matrix, [node])
                            edges_from = np.matmul(adjacency_matrix, multiply_matrix)

                            for item in range(0, edges_from.shape[0]):

                                if edges_from[item][0] == 1:
                                    direction_index = "{}_{}".format(node["position"], self.nodes[item]["position"])


                                    if self.directions[relationship_name].get(direction_index, False) == True:
                                        inserted = True
                                        print("{} -{}-> {}".format(node["name"], relationship_name, self.nodes[item]["name"]))
                                        forward_relationship = {
                                            "relationship": relationship_name,
                                            "from_node": copy.deepcopy(node),
                                            "to_node": copy.deepcopy(self.nodes[item]),
                                            "source_relationship": match

                                        }
                                        relationship["matches"].append(forward_relationship)

How to decide which node to begin at is a general problem. We can start at multiple nodes at once. I am guessing I can use order within the query. Or toplogical sort the variables. We need to do an attribute match too. We’re saying we’re looking for a node that has an attribute of name and a value of Samuel.

This is how we create a mask matrix which we multiply our data by to do a breadth first search:


                multiply[node["position"]][0] = 1

How does the main loop work?

We loop over the query plan, which looks like this:

[{'attributes': {}, 'kind': 'match', 'label': 'Person', 'variable': 'start'},
 {'kind': 'relationship', 'name': 'FRIEND'},
 {'attributes': {}, 'kind': 'match', 'label': 'Person', 'variable': 'end'},
 {'kind': 'match', 'variable': 'start'},
 {'kind': 'relationship', 'name': 'LIKES'},
 {'attributes': {}, 'kind': 'match', 'label': 'Post', 'variable': 'post'},
 {'kind': 'match', 'variable': 'end'},
 {'kind': 'relationship', 'name': 'POSTED'},
 {'attributes': {}, 'kind': 'match', 'label': 'Post', 'variable': 'post'},
 {'attributes': {}, 'kind': 'match', 'label': 'Post', 'variable': 'post'},
 {'kind': 'relationship', 'name': 'REFERS'},
 {'attributes': {}, 'kind': 'match', 'label': 'Person', 'variable': 'person'}]

The match nodes handle data loading into and out of variables. When we encounter a relationship node ({“kind”: “relationship”}, we run a numpy matrix multiplication query with the previous relationships object “to_node”. So every query is relative to the previous set of results. This lets us walk the query graph with very little code.

From the very first match statement, we generate a single object that tracks all the outcomes from matching against that match. We accumulate query intermediary objects in an old_matches key. For every second match statement, we mark all the resulting node objects as having seen the variable, i.e, they are bound to the variable name.

At the end, we have an object with every part of the query evaluated and a history of relationships. We need to join the data produced. We do that by walking the history and seeing which nodes are assigned to which variables, then we accumulate them.

Now we have to join the data.

How to join data

Imagine, I encounter {‘kind’: ‘match’, ‘variable’: ‘end’} in the above query plan. This means, retrieve the data that was output by variable end. This is a list of relationships that looks like the following:

Here’s one relationship. Tasya-[:POSTED]->Thoughts.

 {'from_node': {'attributes': {'label': 'Person',
			'name': 'Tasya'},
			 'matches': 'end',
			 'name': 'Tasya',
			 'position': 1},
'planning_index': 8,
'relationship': 'POSTED',
'source_relationship': ...,
'to_node': {'attributes': {'label': 'Post',
                  'name': 'Thoughts'},
		   'matches': 'post',
		   'name': 'Thoughts',
		   'position': 9}}

Notice the from_node has matched: ‘end’ while the to_node has matched: ‘post’. This is because of the “return start, end, post, person in the original query. Each node gets marked when it is the output of a relationship search. The source relationship refers to the relationship that caused this relationship – the previous search results. From the source_relationship, we can get all resolved variables. The code to accumulate variables is a simple recursive function that accumulates variables into rows.

        def accumulate_variable(output_row, match, seenbefore):
            seenbefore.append(match)
            if "from_node" in match and "matches" in match["from_node"]:
                output_row[match["from_node"]["matches"]] = match["from_node"]
            if "matches" in match["to_node"]:
                output_row[match["to_node"]["matches"]] = match["to_node"]
            if "source_relationship" in match:
                accumulate_variable(output_row, match["source_relationship"], seenbefore)
             

        
        if parser["match"]:
            for relationship in relationships:
                for match in relationship["matches"]:
                    if "inserted" not in match:
                        output_row = {}
                        accumulate_variable(output_row, match, [])
                        yield output_row

This algorithm seems to give the correct results. I don’t know if I actually need to genuinely join (hash join) or not. Oh well.

Distributed joins more efficiently with keyvalue storage

I’ve been thinking how to execute a distributed join when not all data is on the same server. Joins can be scheduled to run – there is some data movement that needs to happen for a join to work. This movement can be planned for or scheduled.

Essentially, the join keys should be shared across the network before the join happens. It’s the join keys that need to be synced up. The materialized data can come later.

For example;

select products.price, people.people_name,
    items.search from items
    inner join people on people.id = items.people
    inner join products on items.search = products.name

The only keys that need to be synced are all people ids, items people, items.search and products names. These are the join keys. If this fits on one machine, then the join can take place in memory. In order to find these efficiently, we need to distribute this scan work across the servers and in parallel download all the join keys.

If you have 100 million products, 1 million people and 10 million items, you’ll have to copy 100 million + 1 million + 10 million join keys before the join can begin. We join people.id = items.people] then [items.search = products.name] in pairs, so we might be able to copy the keys for a join simultaneously with the transfer for the next join. In fact, after looping through the bigger set of items to do the hash join, we don’t need the data that we have looped past because it’s just a join key.

Is there a way to distribute the join? What if the number of keys that need to be transferred is larger than system memory?

Indexing the join

Can we spend some disk space to index a join? So that for non-adhoc queries are fast because they can use a join index.

We have rows on server A and server B. If they were together on the same machine, we could spot a join condition. On insert, we do a query to another server to see if they have the matching data, if they do, we need to upload something to represent the join.

Ideally at join query time, we just want to query each server once to get a list of matched joins. The problem occurs when the joined data is across two servers. We need pairs to be on the same server, but it doesn’t matter. We can arrange for matching join key data to always be available on any given server.

First we tell the server about the join, so it knows it has to do something special when these particular keys match.

create join on items
    inner join people on people.id = items.people
    inner join products on items.search = products.name

At insert time into items, people, products we run a select joinValue where joinKey = joinValue for each server whenever we insert and for those servers we insert the corresponding join key. For the above example, two join keys need to be inserted on every insert to items. This way the joins can be run independently on each server and they will always return all join results. We pick a random server to do the initial join, they should all return the same items. The missing data is fetched in parallel. Writes become more complicated. We have to update all join keys.

Imagine we have the following Join:

select products.price, people.people_name,
    items.search from items
    inner join people on people.id = items.people
    inner join products on items.search = products.name

If we want this join to work on multiple decentralised servers we have to synchronize the join keys with other servers.

Assuming we have already inserted people and products, when we insert into items (‘Spanner’, 2), we need:

  • for each server, we need to insert an items row placeholder so joins can happen on that server
  • for each server, we need to insert a people place holder
  • for each server, we need to insert a products placeholder

The outcome is that any server can execute a full join because every server has every join key. But some data is missing in the join statement, specifically, the selects parameters – the products price, people_name are missing data. So we need to fetch the missing data by doing a second join with the results of the first join. This time we join on people_name and price and each server contributes its values for these fields.

Writing a horizontally scalable decentralized SQL database

I have written a simple SQL database and a distributed hash table. Now is the time to combine them into one product. This post is me walking through how it works.

We create a table row by storing a keyvalue per column in this format R.table_name.row_number.field_name (R.people.3.age) and for efficiency, we also store column based indexing in the format C.table_name.field_name.row_number (C.people.age.3) – this keeps the data near together in memory so that when it goes to the CPU, we get nearby entries on the cache line too. A key belongs to a server based on its consistent hash of its partition key. All columns for a row are stored on the same machine.

To do a query, we have to aggregate these key values into in memory objects which become rows. This is in order so we can run a hash join against them. We also have to do it efficiently, we don’t want to materialize the entire database into memory before we can do queries against it. We have to use a streaming approach whereby we only construct one item memory at a time everything must work in lockstep with this item in memory.

We run the majority of the SQL statement where the data resides, this should be in memory and aggregate the data from each server as each server stores a different set of rows, depending on the consistent hash.

To do an update, we have to delete all the keyvalues associated with the previous value and then repopulate the keys with the updated value.

Joins across machines are fairly ugly. Some special case logic has to happen and the materialized views are transferred to the server for the join to take place. This works. I would like to also have the possibility of doing a join on a single partition, this can be logically consistent if all the data is on one server.

Scalability

As long as data is distributed across a cluster of nodes, the SQL should execute efficiently because searches are done in parallel. We use tries to make prefix searches efficient. For example, to do a select * from people WHERE age = 31 query, we do a prefix query of S.people.age.31 to find S.tablename.column.columnvalue.rownumber This is because we have already indexed this keyvalue of the column’s data. We rely on this denormalisation of data to speed up searches. To join together the database and the where clause data, we hash join a collection of WHERE matches which look like {“id”: 4} against a prefix query of R.people which is the full data row. As a result, we get all the items that match the WHERE clause.

Speeding up

Row count information. We currently don’t have row count information, so we don’t efficiently loop over the smaller collection to do the hash join. This is because the data is streamed currently. We don’t know how large the collections are until we loop over them.