Skip to content

Programming / vision / opencv / optical flow

TODO:

  • API
  • understand error and it's usage

Demo

  • Flow between two frames
LK cpu version
import cv2
import numpy as np
from pathlib import Path

def create_test_sequence():
    """
    Create simple test images to demonstrate optical flow
    """
    # Create first image with a white circle
    img1 = np.zeros((400, 400, 3), dtype=np.uint8)
    cv2.circle(img1, (150, 200), 30, (255, 255, 255), -1)
    cv2.rectangle(img1, (50, 100), (100, 150), (0, 255, 0), -1)

    # Create second image with moved objects
    img2 = np.zeros((400, 400, 3), dtype=np.uint8)  
    cv2.circle(img2, (200, 220), 30, (255, 255, 255), -1)  # Circle moved right+down
    cv2.rectangle(img2, (80, 120), (130, 170), (0, 255, 0), -1)  # Rectangle moved right+down

    # Save test images
    script_dir = Path(__file__).parent
    cv2.imwrite(script_dir / 'test_frame1.jpg', img1)
    cv2.imwrite(script_dir / 'test_frame2.jpg', img2)

def lucas_kanade_with_two_images(img1_path, img2_path):
    """
    Lucas-Kanade with two specific images
    """
    # Load two images
    script_dir = Path(__file__).parent
    img1 = cv2.imread(script_dir / img1_path)
    img2 = cv2.imread(script_dir / img2_path)

    if img1 is None or img2 is None:
        print("Error: Could not load images")
        return

    # Convert to grayscale
    gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
    gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)

    # Detect features in first image
    p0 = cv2.goodFeaturesToTrack(gray1, maxCorners=100, qualityLevel=0.3, minDistance=7)

    if p0 is None:
        print("No features detected")
        return

    # Calculate optical flow
    p1, status, error = cv2.calcOpticalFlowPyrLK(
        gray1, gray2, p0, None,
        winSize=(15, 15), maxLevel=2,
        criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
    )


    # Filter good points
    good_new = p1[status == 1]
    good_old = p0[status == 1]


    # Draw motion vectors
    result = img2.copy()
    for new, old in zip(good_new, good_old):
        a, b = new.ravel().astype(int)
        c, d = old.ravel().astype(int)

        # Draw arrow showing motion
        cv2.arrowedLine(result, (c, d), (a, b), (0, 255, 255), 2, tipLength=0.3)
        cv2.circle(result, (a, b), 3, (0, 0, 255), -1)  # Current position
        cv2.circle(result, (c, d), 3, (255, 0, 0), -1)  # Previous position

    # Display both images
    combined = np.hstack([img1, result])
    cv2.imshow('Lucas-Kanade: Before -> After', combined)
    script_dir = Path(__file__).parent
    cv2.imwrite(script_dir / 'lk_result.jpg', result)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

if __name__ == "__main__":
    # create_test_sequence()
    lucas_kanade_with_two_images("test_frame1.jpg",
                                 "test_frame2.jpg")
frame_1 frame_2 Flow

Demo: Cuda

Create cuda version using pin memory for better performance check opencv with cuda

  • cv2.cuda.createGoodFeaturesToTrackDetector
  • cv2.cuda.SparsePyrLKOpticalFlow_create

API

detect
d_pts = det.detect(d_gray, mask=None, stream=None)
  • d_gray: cv2.cuda_GpuMat, CV_8UC1, grayscale, non-empty.
  • mask (optional): cv2.cuda_GpuMat (CV_8U), same size; non-zero = allowed region.
  • stream (optional): cv2.cuda_Stream for async; if your build errors, omit it.
  • d_pts (return): cv2.cuda_GpuMat (N×1, CV_32FC2), each is (x, y) float32.
calc
nextPts, status, err = flow.calc(prevImg, nextImg, prevPts, stream=None)
  • prevImg, nextImg: cv.cuda_GpuMat grayscale (CV_8UC1)
  • prevPts: cv.cuda_GpuMat of shape (N,1), type CV_32FC2 (points as (x,y))
  • returns: nextPts (N×1, CV_32FC2), status (N×1, CV_8U), err (N×1, CV_32F)

Code

LK cuda version
import cv2
import numpy as np
from pathlib import Path

def create_test_sequence():
    """
    Create simple test images to demonstrate optical flow
    """
    # Create first image with a white circle
    img1 = np.zeros((400, 400, 3), dtype=np.uint8)
    cv2.circle(img1, (150, 200), 30, (255, 255, 255), -1)
    cv2.rectangle(img1, (50, 100), (100, 150), (0, 255, 0), -1)

    # Create second image with moved objects
    img2 = np.zeros((400, 400, 3), dtype=np.uint8)  
    cv2.circle(img2, (200, 220), 30, (255, 255, 255), -1)  # Circle moved right+down
    cv2.rectangle(img2, (80, 120), (130, 170), (0, 255, 0), -1)  # Rectangle moved right+down

    # Save test images
    script_dir = Path(__file__).parent
    cv2.imwrite(script_dir / 'test_frame1.jpg', img1)
    cv2.imwrite(script_dir / 'test_frame2.jpg', img2)

def lucas_kanade_with_two_images(img1_path, img2_path):
    """
    Lucas-Kanade with two specific images
    """
    # Load two images
    script_dir = Path(__file__).parent
    img1 = cv2.imread(script_dir / img1_path)
    img2 = cv2.imread(script_dir / img2_path)

    if img1 is None or img2 is None:
        print("Error: Could not load images")
        return

    w, h, _ = img1.shape
    # Allocate pinned host memory for faster CPU-GPU transfers
    PAGE_LOCKED = 1
    pinned_img1 = cv2.cuda.HostMem(h, w, cv2.CV_8UC3, PAGE_LOCKED)
    hmat1 = pinned_img1.createMatHeader() 
    pinned_img2 = cv2.cuda.HostMem(h, w, cv2.CV_8UC3, PAGE_LOCKED)
    hmat2 = pinned_img2.createMatHeader() 

    gpu_img1 = cv2.cuda.GpuMat()
    gpu_img2 = cv2.cuda.GpuMat()
    gpu_gray1 = cv2.cuda.GpuMat()
    gpu_gray2 = cv2.cuda.GpuMat()

    hmat1[:] = img1
    hmat2[:] = img2

    # TODO:         self.gpu_img1.upload_async(self.pinned_img1.ptr(), self.stream)
    gpu_img1.upload(hmat1)
    gpu_img2.upload(hmat2)

    # Convert to grayscale on GPU
    gpu_gray1 = cv2.cuda.cvtColor(gpu_img1, cv2.COLOR_BGR2GRAY)
    gpu_gray2 = cv2.cuda.cvtColor(gpu_img2, cv2.COLOR_BGR2GRAY)


    gpu_detector = cv2.cuda.createGoodFeaturesToTrackDetector(
            cv2.CV_8UC1,
            maxCorners=100,
            qualityLevel=0.3,
            minDistance=7,
            blockSize=7
        )

    # Detect features in first image
    p0_gpu = gpu_detector.detect(gpu_gray1)



    # Calculate optical flow
    lk = cv2.cuda.SparsePyrLKOpticalFlow_create(
        winSize=(15, 15), 
        maxLevel=2,
        iters=10, 
        useInitialFlow=False
    )

    p1_gpu, status_gpu, error_gpu = lk.calc(
        gpu_gray1,
        gpu_gray2, 
        p0_gpu,
        None,)
    p0 = p0_gpu.download().reshape(-1, 2)
    p1 = p1_gpu.download().reshape(-1, 2)
    status = status_gpu.download().reshape(-1).astype(bool)

    if p0 is None:
        print("No features detected")
        return

    # Filter good points
    good_new = p1[status == 1]
    good_old = p0[status == 1]


    # Draw motion vectors
    result = img2.copy()
    for new, old in zip(good_new, good_old):
        a, b = new.ravel().astype(int)
        c, d = old.ravel().astype(int)

        # Draw arrow showing motion
        cv2.arrowedLine(result, (c, d), (a, b), (0, 255, 255), 2, tipLength=0.3)
        cv2.circle(result, (a, b), 3, (0, 0, 255), -1)  # Current position
        cv2.circle(result, (c, d), 3, (255, 0, 0), -1)  # Previous position

    # Display both images
    combined = np.hstack([img1, result])
    cv2.imshow('Lucas-Kanade: Before -> After', combined)
    script_dir = Path(__file__).parent
    cv2.imwrite(script_dir / 'lk_result.jpg', result)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

if __name__ == "__main__":
    # create_test_sequence()
    lucas_kanade_with_two_images("test_frame1.jpg",
                                 "test_frame2.jpg")

Demo: Cuda with pin memory and stream

Cuda version with pin memory and stream
import cv2 as cv
import numpy as np
from pathlib import Path

def create_test_sequence():
    img1 = np.zeros((400, 400, 3), dtype=np.uint8)
    cv.circle(img1, (150, 200), 30, (255, 255, 255), -1)
    cv.rectangle(img1, (50, 100), (100, 150), (0, 255, 0), -1)

    img2 = np.zeros((400, 400, 3), dtype=np.uint8)
    cv.circle(img2, (200, 220), 30, (255, 255, 255), -1)
    cv.rectangle(img2, (80, 120), (130, 170), (0, 255, 0), -1)

    script_dir = Path(__file__).parent
    cv.imwrite(str(script_dir / 'test_frame1.jpg'), img1)
    cv.imwrite(str(script_dir / 'test_frame2.jpg'), img2)

def lucas_kanade_with_two_images(img1_path, img2_path):
    script_dir = Path(__file__).parent
    img1 = cv.imread(str(script_dir / img1_path), cv.IMREAD_COLOR)
    img2 = cv.imread(str(script_dir / img2_path), cv.IMREAD_COLOR)
    if img1 is None or img2 is None:
        print("Error: Could not load images")
        return

    h, w, _ = img1.shape  # NOTE: (rows, cols, ch) -> (H, W, C)

    # --- Pinned host buffers (optional speed-up) ---
    use_hostmem = hasattr(cv, "cuda_HostMem")
    if use_hostmem:
        hmem1 = cv.cuda_HostMem(h, w, cv.CV_8UC3, 1)
        hmem2 = cv.cuda_HostMem(h, w, cv.CV_8UC3, 1)
        hmat1 = hmem1.createMatHeader(); hmat1[:] = img1
        hmat2 = hmem2.createMatHeader(); hmat2[:] = img2

    # --- GPU mats + stream ---
    stream = cv.cuda_Stream()
    d_img1 = cv.cuda_GpuMat()
    d_img2 = cv.cuda_GpuMat()
    d_gray1 = cv.cuda_GpuMat()
    d_gray2 = cv.cuda_GpuMat()

    if use_hostmem:
        d_img1.upload(hmat1, stream=stream)
        d_img2.upload(hmat2, stream=stream)
    else:
        d_img1.upload(img1, stream=stream)
        d_img2.upload(img2, stream=stream)

    # Correct cv.cuda.cvtColor signature: (src, code, dst=..., stream=...)
    d_gray1 = cv.cuda.cvtColor(d_img1, cv.COLOR_BGR2GRAY, stream=stream)
    d_gray2 = cv.cuda.cvtColor(d_img2, cv.COLOR_BGR2GRAY, stream=stream)

    # --- CUDA Good Features To Track (Shi–Tomasi) ---
    detector = cv.cuda.createGoodFeaturesToTrackDetector(
        cv.CV_8UC1, maxCorners=200, qualityLevel=0.01,
        minDistance=7, blockSize=7, useHarrisDetector=False
    )
    d_p0 = detector.detect(d_gray1, stream=stream)  # Nx1 CV_32FC2
    stream.waitForCompletion()
    if d_p0.empty():
        print("No features detected")
        return

    # --- CUDA Sparse LK ---
    lk = cv.cuda.SparsePyrLKOpticalFlow_create(
        winSize=(15, 15), maxLevel=2, iters=10, useInitialFlow=False
    )
    # Order is prev -> next:
    d_p1, d_status, d_err = lk.calc(d_gray1, d_gray2, d_p0, None, stream=stream)
    stream.waitForCompletion()

    # --- Download + filter on CPU ---
    p0 = d_p0.download().reshape(-1, 2)
    p1 = d_p1.download().reshape(-1, 2)
    st = d_status.download().reshape(-1).astype(bool)

    good_old = p0[st]
    good_new = p1[st]

    # --- Draw vectors on img2 ---
    result = img2.copy()
    for (a, b), (c, d) in zip(good_new.astype(int), good_old.astype(int)):
        cv.arrowedLine(result, (c, d), (a, b), (0, 255, 255), 2, tipLength=0.3)
        cv.circle(result, (a, b), 3, (0, 0, 255), -1)
        cv.circle(result, (c, d), 3, (255, 0, 0), -1)

    combined = np.hstack([img1, result])
    cv.imshow("Lucas-Kanade: Before -> After", combined)
    cv.imwrite(str(script_dir / "lk_result.jpg"), result)
    cv.waitKey(0)
    cv.destroyAllWindows()

if __name__ == "__main__":
    # create_test_sequence()
    lucas_kanade_with_two_images("test_frame1.jpg", "test_frame2.jpg")

Demo: Cuda with Video/Image sequence

  • upload new_point back to gpu for next iteration
Cuda version with Image sequence
"""
Generate synthetic image sequences for testing optical flow algorithms.
Create motion on x axes
Using cuda to run lucas kanade optical flow
Using cuda pinned memory
"""
import cv2
import numpy as np
import logging
from typing import NamedTuple

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)


def create_test_sequence(num_frames=300,  motion_speed=3,width=640, height=480):
    # Initial positions
    circle_start = (150, 200)    # Circle starts at left
    rect_start = (120, 100)      # Rectangle starts at left

    # Object properties
    circle_radius = 30
    rect_size = (50, 50)  # width, height
    for frame_idx in range(num_frames):
        # Create black background
        img = np.zeros((height, width, 3), dtype=np.uint8)

        # Calculate current positions (moving right)
        circle_x = circle_start[0] + (frame_idx * motion_speed)
        circle_y = circle_start[1]

        rect_x = rect_start[0] + (frame_idx * motion_speed)
        rect_y = rect_start[1]

        # Wrap around if objects go off screen (optional)
        circle_x = circle_x % (width + circle_radius * 2) - circle_radius
        rect_x = rect_x % (width + rect_size[0]) - rect_size[0]

        # Draw white circle
        if -circle_radius <= circle_x <= width + circle_radius:
            cv2.circle(img, (int(circle_x), int(circle_y)), circle_radius, (0, 0, 255), -1)

        # Draw green rectangle
        if -rect_size[0] <= rect_x <= width:
            cv2.rectangle(img, 
                        (int(rect_x), int(rect_y)), 
                        (int(rect_x + rect_size[0]), int(rect_y + rect_size[1])), 
                        (0, 255, 0), -1)

        # Draw green rectangle
        if -rect_size[0] <= rect_x <= width:
            rect_y2 = rect_y + 200 * np.sin(frame_idx / 10)  # Add vertical oscillation
            cv2.rectangle(img, 
                        (int(rect_x), int(rect_y2)), 
                        (int(rect_x + rect_size[0]), int(rect_y2 + rect_size[1])), 
                        (0, 255, 0), -1)


        yield img


class LKResult(NamedTuple):
    good_new: np.ndarray
    good_old: np.ndarray

class LK():
    def __init__(self, w, h):
        self.gpu_detector = cv2.cuda.createGoodFeaturesToTrackDetector(
            cv2.CV_8UC1,
            maxCorners=100,
            qualityLevel=0.3,
            minDistance=7,
            blockSize=7
        )

        self.lk = cv2.cuda.SparsePyrLKOpticalFlow_create(
            winSize=(15, 15), 
            maxLevel=5,
            iters=10, 
            useInitialFlow=False
        )


        # create pin memeory
        PAGE_LOCKED = 1
        pinned_img = cv2.cuda.HostMem(h, w, cv2.CV_8UC3, PAGE_LOCKED)
        self.pin_image = pinned_img.createMatHeader() 
        """ pinned memory for input image """

        self.gpu_img = cv2.cuda.GpuMat() 
        """ hold the current frame as GpuMat """
        self.gpu_prev_gray = cv2.cuda.GpuMat()
        """ hold previous frame as GpuMat """
        self.p0_gpu = cv2.cuda.GpuMat()
        """ hold previous points as GpuMat """

        self.minmun_points_to_redetect = 2

    def process_frame(self, frame: np.ndarray) -> LKResult:
        """
        Process a single frame for optical flow tracking.
        upload the frame to GPU memory from pinned memory.
        run detection if no points to track. (run on gpu)
        run lk calculation. on gpu
        """
        p1_gpu: cv2.cuda.GpuMat
        status_gpu: cv2.cuda.GpuMat
        error_gpu: cv2.cuda.GpuMat

        self.pin_image[:] = frame
        self.gpu_img.upload(self.pin_image)
        gpu_gray = cv2.cuda.cvtColor(self.gpu_img, cv2.COLOR_BGR2GRAY)

        if self.p0_gpu.empty():
            self.p0_gpu = self.gpu_detector.detect(gpu_gray)
            if self.p0_gpu.empty():
                raise RuntimeError("No features detected.")

            log.debug(f"---- Initial points detected: {self.p0_gpu.size()}")
            self.gpu_prev_gray = gpu_gray.clone()

        # Calculate optical flow
        p1_gpu, status_gpu, error_gpu = self.lk.calc(
            self.gpu_prev_gray, 
            gpu_gray,
            self.p0_gpu,
            None,)

        # Download point from gpu for fautere filter and process
        p0 = self.p0_gpu.download().reshape(-1, 2)
        p1 = p1_gpu.download().reshape(-1, 2)
        status = status_gpu.download().reshape(-1).astype(bool)
        error = error_gpu.download().reshape(-1)

        good_new: np.ndarray = p1[status]
        good_old: np.ndarray = p0[status]

        # upload new point back to gpu for the next iteration
        if len(good_new) > self.minmun_points_to_redetect:
            self.p0_gpu.upload(good_new.reshape(1, -1, 2).astype(np.float32))
        else:
            log.warning("No points to track, re-detecting.")
            self.p0_gpu = cv2.cuda.GpuMat()


        self.gpu_prev_gray = gpu_gray.clone()

        return LKResult(good_new=good_new, good_old=good_old)


if __name__ == "__main__":
    COLOR_NEW = (0, 0, 255) # RED
    COLOR_OLD = (255, 0, 0) # BLUE
    new: np.ndarray 
    old: np.ndarray

    lk = LK(640, 480)
    images = create_test_sequence()
    while (frame := next(images, None)) is not None:
        good_new, good_old = lk.process_frame(frame)

        for new, old in zip(good_new, good_old):
            a, b = new.ravel().astype(int)
            c, d = old.ravel().astype(int)
            # cv2.arrowedLine(img, (c, d), (a, b), (0, 255, 255), 2, tipLength=0.3)
            cv2.circle(frame, (a, b), 3, COLOR_NEW, -1)
            cv2.circle(frame, (c, d), 3, COLOR_OLD, -1)
        cv2.imshow("LK Optical Flow CUDA", frame)
        if cv2.waitKey(100) & 0xFF == 27:
            break