Hide sidebar

Find Median from Data Stream

HeapDesign
HardLeetCode #295
25 min

Problem Statement

The median is the middle value in an ordered integer list. If the size of the list is even, there is no middle value, and the median is the mean of the two middle values. Design a data structure that supports the following two operations: addNum which adds an integer number from the data stream to the data structure, and findMedian which returns the median of all elements so far.

Example

Example 1:

addNum(1)
addNum(2)
findMedian() -> 1.5
addNum(3)
findMedian() -> 2.0

Solution (Two Heaps)

The problem can be solved efficiently using two heaps: a max-heap to store the smaller half of the numbers and a min-heap to store the larger half. This allows us to access the median in O(1) time.

Algorithm Steps

  • Maintain two heaps: a max-heap for the smaller half and a min-heap for the larger half.
  • When adding a new number, add it to the max-heap first. Then, move the largest element from the max-heap to the min-heap.
  • If the max-heap has fewer elements than the min-heap, move the smallest element from the min-heap to the max-heap.
  • The median is either the top of the max-heap (if the total number of elements is odd) or the average of the tops of both heaps (if even).

Data Stream

1
2
3
4
5
6

Max-Heap (Small Half)

Min-Heap (Large Half)

Start: Initialize two heaps, a max-heap (small) and a min-heap (large).

Find Median from Data Stream Solution

import heapq

class MedianFinder:
    def __init__(self):
        self.small = []  # Max-heap
        self.large = []  # Min-heap

    def addNum(self, num: int) -> None:
        heapq.heappush(self.small, -1 * num)
        
        if (self.small and self.large and (-1 * self.small[0]) > self.large[0]):
            val = -1 * heapq.heappop(self.small)
            heapq.heappush(self.large, val)
            
        if len(self.small) > len(self.large) + 1:
            val = -1 * heapq.heappop(self.small)
            heapq.heappush(self.large, val)
            
        if len(self.large) > len(self.small) + 1:
            val = heapq.heappop(self.large)
            heapq.heappush(self.small, -1 * val)

    def findMedian(self) -> float:
        if len(self.small) > len(self.large):
            return -1 * self.small[0]
        if len(self.large) > len(self.small):
            return self.large[0]
        return (-1 * self.small[0] + self.large[0]) / 2.0