• Home
  • Quick Bytes
  • Algorithms
  • Java
  • iOS
  • Android
  • Certifications
  • About Me

Lets Code Them Up!

  • DDIA | Replication | Replication logs

    September 18th, 2023

    In the previous post, we learned about the replication technique of Leaders and Followers. How do they work under the hood? They use several techniques – 

    1. Statement based replication
    2. Write ahead log
    3. Logical (row based) replication
    4. Trigger based replication

    1. Statement based replication

    For every INSERT, UPDATE, DELETE operation, leader sends the statement log to followers and each follower parses and executes that SQL statement as if it had been received from a client.

    Problems – 

    • non deterministic functions like time(), rand(), if used for any value, will have different values for same row in different replicas
    • if auto-incrementing column is present then the replication on followers should be sequenced in the same manner to avoid faulty records.
    • Statements that have side effects (e.g., triggers, stored procedures, user-defined functions) may result in different side effects occurring on each replica, unless the side effects are absolutely deterministic.

    It is possible to work around those issues—for example, the leader can replace any nondeterministic function calls with a fixed return value when the statement is logged so that the followers all get the same value. However, because there are so many edge cases, other replication methods are now generally preferred.

    Was used in MySQL till version 5.1

    2. Write Ahead Log (WAL)

    In one of our previous posts on data structures which power databases, we learned about the logs, and how writes are appended to end of these logs. (eg. B-Trees, SST/LSM Trees).

    The log is an append-only sequence of bytes containing all writes to the database. We can use the exact same log to build a replica on another node: besides writing the log to disk, the leader also sends it across the network to its followers. When the follower processes this log, it builds a copy of the exact same data structures as found on the leader.

    Used in PostgreSQL and Oracle.

    The main disadvantage is that the log describes the data on a very low level: a WAL contains details of which bytes were changed in which disk blocks. This makes replication closely coupled to the storage engine.

     

    3. Logical log replication

    This one uses different log formats for replication and for the storage engine, which allows the replication log to be decoupled from the storage engine internals. This kind of replication log is called a logical log, to distinguish it from the storage engine’s (physical) data representation.

    A logical log for a relational database is usually a sequence of records describing writes to database tables at the granularity of a row:

    • For an inserted row, the log contains the new values of all columns.
    • For a deleted row, the log contains enough information to uniquely identify the row that was deleted.
    • For an updated row, the log contains enough information to uniquely identify the updated row, and the new values of all columns (or at least the new values of all columns that changed).

    A transaction that modifies several rows generates several such log records, followed by a record indicating that the transaction was committed.  Since a logical log is decoupled from the storage engine internals, it can more easily be kept backward compatible, allowing the leader and the follower to run different versions of the database software, or even different storage engines.

    Used in MySQL.

    4. Trigger based replication

    This one allows application to modify databases using triggers and stored procedure.

    A trigger lets you register custom application code that is automatically executed when a data change (write transaction) occurs in a database system. The trigger has the opportunity to log this change into a separate table, from which it can be read by an external process. That external process can then apply any necessary application logic and replicate the data change to another system. 

    Trigger-based replication typically has greater overheads than other replication methods, and is more prone to bugs and limitations than the database’s built-in replication. However, it can nevertheless be useful due to its flexibility.

    Thanks for stopping by! Hope this gives you a brief overview in to different techniques that power leaders and followers replication approach under the hood. Eager to hear your thoughts and chat, please leave comments below and we can discuss. 

  • DDIA | Replication | Leaders and Followers

    September 9th, 2023

    In previous post, we had brief history of distributed databases, from this post onwards we will look into how replication is handled in distributed databases. 

    As we have seen before replication is copying data to multiple nodes. Each node that stores the copy of the database is called a replica. How to ensure that data written to one replica gets written to all replicas?

    The first strategy is to designate one replica as Leader and all the other replicas as Followers. The writes are only done on the Leader, while reads can be done either by Leader or Followers. As soon as a write request comes to Leader, he makes update to local copy of database and sends a replication log to all the followers. The Followers then update their local copy of database on the basis on replication log.

    This mode of replication is a built-in feature of many relational databases, such as PostgreSQL (since version 9.0), MySQL, Oracle Data Guard, and SQL Server’s AlwaysOn Availability Groups . It is also used in some nonrelational databases, including MongoDB, RethinkDB, and Espresso. Finally, leader-based replication is not restricted to only databases: distributed message brokers such as Kafka and RabbitMQ highly available queues also use it. Some network filesystems and replicated block devices such as DRBD are similar.

    Synchronous Versus Asynchronous Replication

    Let’s say, there are one leader and two followers and write request comes. In synchronous replication, the client will wait until both followers have confirmed replication before sending write successful to clients. In asynchronous replication, the leader will write on local, sends the replication log to followers and send the write successful notification to clients, not waiting for it to be successfully written to the followers database copy. 

    The third approach is semi-synchronous, here there is synchronous communication between one follower and leader while asynchronous communication between leader and other followers. So when write request comes in, the leader updates local copy, sends replication log to all followers, wait till the selected follower return success and then sends the notification to client. If the leader suddenly fails, we can be sure that the data is still available on the follower. The disadvantage is that if the synchronous follower doesn’t respond (because it has crashed, or there is a network fault, or for any other reason), the write cannot be processed. The leader must block all writes and wait until the synchronous replica is available again.

    If all were synchronous, any one node down means whole system down, if all were as asynchronous then we cannot be sure that data on all nodes is consistent with the leader. However, a fully asynchronous configuration has the advantage that the leader can continue processing writes, even if all of its followers have fallen behind.

     

    Setting Up New Followers

    The process of setting up new followers without loosing availability is – 

    1. Take a consistent snapshot of the leader’s database at some point in time—if possible, without taking a lock on the entire database. Most databases have this feature, as it is also required for backups. In some cases, third-party tools are needed, such as innobackupex for MySQL.

    2. Copy the snapshot to the new follower node.

    3. The follower connects to the leader and requests all the data changes that have happened since the snapshot was taken. This requires that the snapshot is associated with an exact position in the leader’s replication log. That position has various names: for example, PostgreSQL calls it the log sequence number, and MySQL calls it the binlog coordinates.

    4. When the follower has processed the backlog of data changes since the snapshot, we say it has caught up. It can now continue to process data changes from the leader as they happen.

    Handling Node Outages

    Follower failure: Catch-up recovery

    On its local disk, each follower keeps a log of the data changes it has received from the leader. If a follower crashes and is restarted, or if the network between the leader and the follower is temporarily interrupted, the follower can recover quite easily: from its log, it knows the last transaction that was processed before the fault occurred. Thus, the follower can connect to the leader and request all the data changes that occurred during the time when the follower was disconnected. When it has applied these changes, it has caught up to the leader and can continue receiving a stream of data changes as before.

    Leader failure: Failover

    Handling a failure of the leader is trickier: one of the followers needs to be promoted to be the new leader, clients need to be reconfigured to send their writes to the new leader, and the other followers need to start consuming data changes from the new leader. This process is called failover.

    Failover can happen manually through a database administrator, or automatic. An automatic failover looks like – 

    1. Determining leader has failed, normally done by pinging system periodically, if no response, means leader has failed. 
    2. Choosing new leader, could be done through election, normally the synchronous replica is made the leader as it has most upto date data in comparison with the failed leader node. 
    3. Reconfiguring the system to use the new leader.

    Problems with failover – 

    • If asynchronous replication is used, the new leader may not have received all the writes from the old leader before it failed. If the former leader rejoins the cluster after a new leader has been chosen, what should happen to those writes? The new leader may have received conflicting writes in the meantime. The most common solution is for the old leader’s unreplicated writes to simply be discarded, which may violate clients’ durability expectations.

    • Discarding writes is especially dangerous if other storage systems outside of the database need to be coordinated with the database contents. It could cause inconsistency between systems which can lead to big failures.
    • In certain fault scenarios it could happen that two nodes both believe that they are the leader. This situation is called split brain, and it is dangerous: if both leaders accept writes, and there is no process for resolving conflicts, data is likely to be lost or corrupted.
    • What is the right timeout before the leader is declared dead? A longer timeout means a longer time to recovery in the case where the leader fails. However, if the timeout is too short, there could be unnecessary failovers.

    Thanks for stopping by! Hope this gives you a brief overview in to replication and leaders and followers approach in distributed systems. Eager to hear your thoughts and chat, please leave comments below and we can discuss.

  • DDIA | Distributed Data

    September 2nd, 2023

    So far this book dealt with how we can store data on a single system. From this post onwards we will see how to manage data if we distribute the data to between multiple systems. 

    Why distribute – 

    1. Scalability – multiple machines can be added if data load increases
    2. Low latency – data distributed to multiple regions, can be served from the closest region, means low latency.
    3. Fault tolerance/high availability – If one machine goes down, another machine can take over without loss of availability.

    Scaling to Higher Load/ Vertical Scaling/ Scaling up

    In this case, whenever needed, we increase the size of RAM/number of CPUs, etc of the machine to handle the increasing load. 

    Drawbacks – 

    1. High cost, machine with twice amount of RAM, twice CPU power or twice storage size, cost way higher than another machine of same size that you have.
    2. Low fault tolerance, single machine, single geography, any failure, you can easily replace parts but could take time.
    3. High latency, single machine, single geography, more amount of time spent in serving requests from far off

    Shared-disk architecture

    Another approach is the shared-disk architecture, which uses several machines with independent CPUs and RAM, but stores data on an array of disks that is shared between the machines, which are connected via a fast network. This architecture is used for some data warehousing workloads, but contention and the overhead of locking limit the scalability of the shared-disk approach.

    Shared-Nothing Architectures/ Horizontal Scaling/Scaling out

    Increase number of machines when needed. Each machine (also called node) has it’s own CPU, RAM, disk, etc, nothing is shared. Any coordination between nodes is done at the software level, using a conventional network.

    Replication Versus Partitioning

    Replication

    Keeping a copy of the same data on several different nodes, potentially in different locations. Replication provides redundancy: if some nodes are unavailable, the data can still be served from the remaining nodes. Replication can also help improve performance.

    Partitioning

    Splitting a big database into smaller subsets called partitions so that different partitions can be assigned to different nodes (also known as sharding).

    In the subsequent posts, we will start going into details into the shared nothing architecture and what tradeoffs are needed for implementing them.

    Thanks for stopping by! Hope this gives you a brief overview in to distributed data. Eager to hear your thoughts and chat, please leave comments below and we can discuss.

  • DDIA | Chapter 4 |  Encoding and Evolution | Modes of Data Flow

    August 27th, 2023

    In the previous post, we went through different methods used for encoding/decoding data. In this post we will go through the three common methods through which data flows between processes.

    Typically there are three ways in which data flows between processes – 

    1. Via databases
    2. Via service calls
    3. via asynchronous messaging systems

    1. Data flow through Databases

    Process that writes to database encodes the data and process that reads the database decodes the data. 

    Both backward and forward compatibility is needed for the encoders/decoders. Say, we have multiple applications accessing the database, but not all of them are on the same code version, it may happen, one application writes to database using newer version of code but another one is reading using an older version of code. In this case forward compatibility of encoding/decoding schema is necessary.

    Say we add column to the database, and only the newer version of applications know about it, but not the older one, so the older one would not write to that column. The encoding formats take care of this.

    Also if you add new columns, you can rewrite the whole database using the new schema, but this is expensive operation if we have large database. And so most relational databases allow simple schema changes, such as adding a new column with a null default value, without rewriting existing data. When an old row is read, the database fills in nulls for any columns that are missing from the encoded data on disk. LinkedIn’s document database Espresso uses Avro for storage, allowing it to use Avro’s schema evolution rules.

    Schema evolution thus allows the entire database to appear as if it was encoded with a single schema, even though the underlying storage may contain records encoded with various historical versions of the schema.

    For cases, where you are migrating data to a data warehouse for analytics purposes, it is better to rewrite data using new encoding/decoding schema which supports column oriented format. 
     

    2. Dataflow Through Services: REST and RPC

    When you have processes that need to communicate over a network, then formats like REST and RPC are used. 
     

    REST/SOAP

    REST is a design philosophy that builds upon the principles of HTTP. It emphasizes simple data formats, using URLs for identifying resources and using HTTP features for cache control, authentication, and content type negotiation. REST is often associated with microservices. 
     
    SOAP is an XML-based protocol for making network API requests. The API of a SOAP web service is described using an XML-based language called the Web Services Description Language, or WSDL. WSDL enables code generation so that a client can access a remote service using local classes and method calls (which are encoded to XML messages and decoded again by the framework). 
     

    RPC

    The RPC model tries to make a request to a remote network service look the same as calling a function or method in your programming language, within the same process (this abstraction is called location transparency). 

    Main assumption when it comes to data encoding for RPC is that all the servers will be updated first, and all the clients second. Thus, you only need backward compatibility on requests, and forward compatibility on responses. The backward and forward compatibility properties of an RPC scheme are inherited from whatever encoding it uses. 

    Thrift and Avro come with RPC support included, gRPC is an RPC implementation using Protocol Buffers, Finagle also uses Thrift, and Rest.li uses JSON over HTTP.

    Drawbacks of RPC is that local function call is different from network call, lets look at differences between local function call and network call – 

    1. A local function call is predictable and either succeeds or fails, depending only on parameters that are under your control. A network request is unpredictable: the request or response may be lost due to a network problem, or the remote machine may be slow or unavailable, and such problems are entirely outside of your control. 
    2. A local function call either returns a result, or throws an exception, or never returns (because it goes into an infinite loop or the process crashes). A network request has another possible outcome: it may return without a result, due to a timeout.  In that case, you simply don’t know what happened: if you don’t get a response from the remote service, you have no way of knowing whether the request got through or not. 
    3.  In that case, you simply don’t know what happened: if you don’t get a response from the remote service, you have no way of knowing whether the request got through or not. 
    4. Every time you call a local function, it normally takes about the same time to execute. A network request is much slower than a function call, and its latency is also wildly variable.
    5. When you call a local function, you can efficiently pass it references (pointers) to objects in local memory. When you make a network request, all those parameters need to be encoded into a sequence of bytes that can be sent over the network. That’s okay if the parameters are primitives like numbers or strings, but quickly becomes problematic with larger objects.

    6. The client and the service may be implemented in different programming languages, so the RPC framework must translate datatypes from one language into another.

    Message-Passing Dataflow

    These are are similar to RPC in that a client’s request (usually called a message) is delivered to another process with low latency. They are similar to databases in that the message is not sent via a direct network connection, but goes via an intermediary called a message broker (also called a message queue or message-oriented middleware), which stores the message temporarily.

    Using a message broker has several advantages compared to direct RPC:

    • It can act as a buffer if the recipient is unavailable or overloaded, and thus improve system reliability.

    • It can automatically redeliver messages to a process that has crashed, and thus prevent messages from being lost.

    • It avoids the sender needing to know the IP address and port number of the recipient (which is particularly useful in a cloud deployment where virtual machines often come and go).

    • It allows one message to be sent to several recipients.

    • It logically decouples the sender from the recipient (the sender just publishes messages and doesn’t care who consumes them).

    Message Brokers

    Usually, one process sends a message to a named queue or topic, and the broker ensures that the message is delivered to one or more consumers of or subscribers to that queue or topic. 

    Message brokers typically don’t enforce any particular data model—a message is just a sequence of bytes with some metadata, so you can use any encoding format. If the encoding is backward and forward compatible, you have the greatest flexibility to change publishers and consumers independently and deploy them in any order.

    Distributed actor frameworks

    The actor model is a programming model for concurrency in a single process. Rather than dealing directly with threads (and the associated problems of race conditions, locking, and deadlock), logic is encapsulated in actors. Each actor typically represents one client or entity, it may have some local state (which is not shared with any other actor), and it communicates with other actors by sending and receiving asynchronous messages. Message delivery is not guaranteed: in certain error scenarios, messages will be lost.

    In distributed actor frameworks, this programming model is used to scale an application across multiple nodes. The same message-passing mechanism is used, no matter whether the sender and recipient are on the same node or different nodes. If they are on different nodes, the message is transparently encoded into a byte sequence, sent over the network, and decoded on the other side.

    If you want to perform rolling upgrades of your actor-based application, you still have to worry about forward and backward compatibility, as messages may be sent from a node running the new version to a node running the old version, and vice versa.

    examples of distributed actor framework – Akka, Orleans, Erlang OTP

    Thanks for stopping by! Hope this gives you a brief overview in to various modes of data flow between processes. Eager to hear your thoughts and chat, please leave comments below and we can discuss.

  • DDIA | Chapter 4 |  Encoding and Evolution | Formats for Encoding Data

    August 19th, 2023

    Programs usually work with data in (at least) two different representations:

    1. In memory, data is kept in objects, structs, lists, arrays, hash tables, trees, and so on. These data structures are optimized for efficient access and manipulation by the CPU (typically using pointers).

    2. When you want to write data to a file or send it over the network, you have to encode it as some kind of self-contained sequence of bytes (for example, a JSON document). Since a pointer wouldn’t make sense to any other process, this sequence-of-bytes representation looks quite different from the data structures that are normally used in memory.

    Thus, we need some kind of translation between the two representations. The translation from the in-memory representation to a byte sequence is called encoding (also known as serialization or marshalling), and the reverse is called decoding (parsing, deserialization, unmarshalling)

    Language specific formats

    Languages have built-in support for encoding/decoding, example – Java has java.io.Serializable, Ruby has Marshal, Python has pickle, and so on. Many third-party libraries also exist, such as Kryo for Java.

    Problems – 

    1. Using language specific encoding solutions gets us tied to one language for your application. Cannot integrate with systems of other organizations if they use different language. 
    2. Introduces security vulnerabilities
    3. Usually not backward/forward compatible
    4. Not efficient, example java’s serialization is known to take up lot of CPU time.

    JSON, XML, and Binary Variants

    Known problems – 

    1. Ambiguity around encoding of numbers. XML and CSV cannot distinguish between numbers and strings. JSON cannot distinguish between numbers and floating point numbers.
    2. JSON and XML have good support for Unicode character strings (i.e., human-readable text), but they don’t support binary strings (sequences of bytes without a character encoding).
    3. There is optional schema support for both XML and JSON. These schema languages are quite powerful, and thus quite complicated to learn and implement. Use of XML schemas is fairly widespread, but many JSON-based tools don’t bother using schemas. Since the correct interpretation of data (such as numbers and binary strings) depends on information in the schema, applications that don’t use XML/JSON schemas need to potentially hardcode the appropriate encoding/decoding logic instead.
    4. CSV does not have any schema, so it is up to the application to define the meaning of each row and column. If an application change adds a new row or column, you have to handle that change manually. 

    Binary encoding

    Binary encoding takes less space than JSON and XML. Some binary encodings developed for JSON are NMessagePack, BSON, BJSON, UBJSON, BISON, and Smile, etc. And some binary encodings developed for XML are WBXML and Fast Infoset, etc. But these are not widely used. Major problems with binary encoding is that they are not human readable.

    Popular binary encoding based on schemas are – Apache Thrift (by facebook) and protocol buffers (google), Avro.

    Advantages of these binary encodings based on schems – 

    • They can be much more compact than the various “binary JSON” variants, since they can omit field names from the encoded data.
    • The schema is a valuable form of documentation, and because the schema is required for decoding, you can be sure that it is up to date (whereas manually maintained documentation may easily diverge from reality).
    • Keeping a database of schemas allows you to check forward and backward compatibility of schema changes, before anything is deployed.
    • For users of statically typed programming languages, the ability to generate code from the schema is useful, since it enables type checking at compile time.

    Thanks for stopping by! Hope this gives you a brief overview in to various encoding formats. Eager to hear your thoughts and chat, please leave comments below and we can discuss.

  • DDIA | Chapter 3 | Storage And Retrieval | Column Oriented Storage

    August 12th, 2023

    In the first post on chapter 3, we went over data structures that power our databases, and in the second got to know about OLTP and OLAP. In this post, we will look how OLAP databases use column oriented storage and what are its advantages. 

    Transactional databases which are generally relational databases store and retrieves data on basis of rows (indexing is done on rows). But when we consider data warehouses, and queries are more analytic, they are usually querying few columns from the database. And if we go with row based, then we will be returning entire row with 100s of columns for each of the records parse and filter out data. All of this takes up lot of compute resources. Therefore, indexing on columns in this case makes more sense. 

    In column based storage, columns are stored together instead of rows. And if you want the full row, you can find the record from each column and combine them. While saving in columns we need to make sure that rows are ordered in same way across all column indexes. If query needs only two columns, then we just need to load those two columns, parse and filter them. This reduces amount of data to be loaded from disk. 

    Column Compression

    As we are storing on the basis of columns, lot of data in each column is repeating. So instead of storing them as is, we can compress them using techniques like bitmap compression.

    Say you have 100,000 rows in a product_sk column, but you have only 10 types of products. So names of products in this column are repeating. Instead of storing each value separately, in bitmap compression, each product is stored as bitmap of 100000 bits where 1 represents that product presence and 0 absence. Further, if number of rows is small, these bitmaps can be stored in one bit per row, if n is bigger, then we can have n separate bitmaps, one bitmap for each distinct value and one bit for each row. 

    Column compression not only reduces amount of data to be loaded from disk, but also contributes in making efficient use of CPU cycles. For example, the query engine can take a chunk of compressed column data that fits comfortably in the CPU’s L1 cache and iterate through it in a tight loop (that is, with no function calls). A CPU can execute such a loop much faster than code that requires a lot of function calls and conditions for each record that is processed. Column compression allows more rows from a column to fit in the same amount of L1 cache. Operators, such as the bitwise AND and OR, can be designed to operate on such chunks of compressed column data directly. This technique is known as vectorized processing.

    Sort Order in Column Storage

    It is easiest to store the data in the manner they are inserted, so for adding new data, we just add to the end of each column files.All the rows need to have same sort order though, otherwise how will we know which data belongs to which row. 

    A technique generally used by databases is to have multiple sort orders. As we know data is replicated to multiple machine, it can be stored using different sort orders on each machine. So the sort order most effective for certain type of query can be used for fast results. 

    Having multiple sort orders in a column-oriented store is a bit similar to having multiple secondary indexes in a row-oriented store. But the big difference is that the row-oriented store keeps every row in one place (in the heap file or a clustered index), and secondary indexes just contain pointers to the matching rows. In a column store, there normally aren’t any pointers to data elsewhere, only columns containing values.

    Writing to Column-Oriented Storage

    Column-oriented storage, compression, and sorting all help to make those read queries faster. However, they have the downside of making writes more difficult.

    LSM-trees are used over B-Trees. All writes first go to an in-memory store, where they are added to a sorted structure and prepared for writing to disk. It doesn’t matter whether the in-memory store is row-oriented or column-oriented. When enough writes have accumulated, they are merged with the column files on disk and written to new files in bulk.

    Aggregation: Data Cubes and Materialized Views

    data warehouse queries often involve an aggregate function, such as COUNT, SUM, AVG, MIN, or MAX in SQL. If the same aggregates are used by many different queries, it can be wasteful to crunch through the raw data every time. Why not cache some of the counts or sums that queries use most often?

    One way of creating such a cache is a materialized view. In a relational data model, it is often defined like a standard (virtual) view: a table-like object whose contents are the results of some query. The difference is that a materialized view is an actual copy of the query results, written to disk, whereas a virtual view is just a shortcut for writing queries. When you read from a virtual view, the SQL engine expands it into the view’s underlying query on the fly and then processes the expanded query.

    A common special case of a materialized view is known as a data cube or OLAP cube. It is a grid of aggregates grouped by different dimensions.

    Thanks for stopping by! Hope this gives you a brief overview in to column based storage. Eager to hear your thoughts and chat, please leave comments below and we can discuss.

←Previous Page
1 2 3 4 5 … 22
Next Page→

Proudly powered by WordPress