Implementing a CPU-based real-time video analytics pipeline

With growth in available data and computation power, use of video analytics solutions has been growing visibly. Most real-time video analytics use-cases, however, require response times in milliseconds—a level of performance that both CPUs and GPUs cannot always meet when it comes to inference. Here we discuss implementation of a real-time video analytics pipeline on a CPU platform using Apache Spark as a distributed computing framework. As we’ll see, there are significant challenges in the inference phase, which can be overcome using a CPU+FPGA platform.

Our CPU-based pipeline makes use of  JavaCV (a Java interface to OpenCV and FFmpeg) for extracting images from input video, Megh Microservice for streaming the extracted image frames, Spark Structured Streaming for reading theses frames into the Spark environment, and Intel Analytics Zoo (a unified analytics + AI platform for Spark) for classifying images into categories using a pre-trained SqueezeNet quantized model. In Analytics Zoo we use BigDL (a deep learning library for Spark) topology for handling deep learning workloads.

The high-level architecture of the pipeline is shown in following figure.

High-level architecture of the CPU-based video analytics pipeline.

Our application deals with computation on real-time data and has two stages:

  1. Video Decoder and Image Transformer
  2. Image Classifier

In the Video Decoder and Image Transformer stage, video streams are read from a RTSP (Real Time Streaming Protocol) client and H.264-encoded video frames are decoded for extracting RGB images using JavaCV and the FFMpeg library. The extracted image data is sent to Megh Microservice, which is a persistent microservice implemented using gRPC and acts as an image data collector for providing a thin buffer.

The following code shows the JavaCV functions for extracting image frames from video. FFmpegFrameGrabber takes the input video as a parameter, decodes it, and extracts RGB image frames when the grabImage function is called. Java2DFrameConverter converts the frame into a buffered image, which is converted into a byte buffer for further processing.

FFmpegFrameGrabber grabber = new FFmpegFrameGrabber("rtsp://" + ffmpegConfig.get("server.ip") + ":" + ffmpegConfig.get("server.port") +"/" + ffmpegConfig.get("video.file"));
grabber.start();
Frame frame = null;
while((frame = grabber.grabImage()) != null) {
            BufferedImage image = new Java2DFrameConverter().convert(frame);
            byte[] bbuf = ((DataBufferByte) (image.getRaster().getDataBuffer())).getData();
        }

Extracted image data in the form of byte buffers is converted into byte arrays, which are further converted into Base64-encoded strings for sending to the persistent microservice along with image names in form of JSON strings.

In the following code, the onResultsReceived method performs all this functionality and the final data is added to the data queue.

public void onResultsReceived(VideoAnalyticsResults videoAnalyticsResults) {
    try {

      ByteArrayOutputStream baos = new ByteArrayOutputStream();

      String imageName = "IMAGE_" + frameCount + ".jpg";

      //Instantiate a byte array of length of byte buffer
      byte[] imgArray = videoAnalyticsResults.getRgb();

      // create a Buffered Image of type BGR
      BufferedImage bufImg = new BufferedImage(Integer.parseInt(serviceConfig.get("image.width")),
          Integer.parseInt(serviceConfig.get("image.height")), BufferedImage.TYPE_3BYTE_BGR);

      // set Pixels value in Buffered Image
      bufImg.setData(Raster.createRaster(bufImg.getSampleModel(),
          new DataBufferByte(imgArray, imgArray.length), new Point()));

      if (bufImg != null) {
        // encoding buffered image to jpg format
        ImageIO.write(bufImg, "jpg", baos);

        // converting buffered image into byte array
        byte[] bytes = baos.toByteArray();

        // converting byte array into String using Base64
        String data = Base64.getEncoder().encodeToString(bytes);

        dataQueue.add(new RpcCpuRecord(imageName, data));

        frameCount += 1;
      }

    }catch (Exception ex) {
      logger.error("Error while receiving image buffers");
      ex.printStackTrace();
    }
  }

In the Image Classifier stage, Spark reads image data from the microservice into the Spark environment through structured streaming, which is then preprocessed and fed to a pre-trained quantized SqueezeNet model (an image classification model trained on the ImageNet dataset). It categorizes each image and writes the classified image name along with top labels into a file. Before preprocessing, image data is converted into the Analytics Zoo custom type ImageFeature, which keeps information about each image, such as image bytes and image name.

val predictImageUDF = udf(
      (uri: String, data: Array[Byte], latency: String) => {
        try {
          val featureSteps = featureTransformersBC.value.clonePreprocessing()
          val localModel = modelBroadCast.value
          val labels = labelBroadcast.value

          val bytesData = Base64.getDecoder.decode(data)
          val imf = ImageFeature(bytesData, uri = uri)

          if (imf.bytes() == null) {
            "-2"
          }

          val imgSet: ImageSet = ImageSet.array(Array(imf))
          var inputTensor = featureSteps(imgSet.toLocal().array.iterator).next()
          inputTensor = inputTensor.reshape(Array(1) ++ inputTensor.size())
          val prediction = localModel
            .doPredict(inputTensor)
            .toTensor[Float]
            .squeeze()
            .toArray()
          val predictClass = prediction.zipWithIndex.maxBy(_._1)._2

          if (predictClass < 0 || predictClass > (labels.length - 1)) {
            "unknown"
          }

          val labelName: String = labels(predictClass.toInt).toString()

          labelName
        } catch {
          case e: Exception =>
            logger.error(e)
            e.printStackTrace()
            "not found"
        }
      }: String
    )

For measuring throughput, we used Spark Structured Query Listener to create a custom listener and added it to sparkSession. Throughput per second is saved into a file on disk and varies with number of executors and number of cores per executor.

val queryMonitor = new StructuredQueryListener(
      prop.getProperty("fps.out.file"),
      prop.getProperty("multiplication.factor").toInt
    )
    SQLContext.getOrCreate(sc).sparkSession.streams.addListener(queryMonitor)

Here is the performance of our CPU-based solution.

InfrastructureCluster with one worker node with Xeon Bronze Processor
Video Specification1080p, H.264 Encoded
Throughput~22 FPS
Latency>250 ms

Several challenges with our CPU-based implementation are apparent.

  1. Latency does not always meet real-time requirements.
  2. The throughput achieved is not well-suited for real-world scenarios and does not scale linearly with number of nodes.
  3. The throughput indicates that a single CPU can barely handle feeds from single camera. So, for achieving high throughput the only option left is to scale-up the number of nodes. This results in increased cost.

Fortunately, a field programmable gate array (FPGA) can be used as a hardware accelerator in the inference phase, as it has I/O for direct ingestion of data and support for parallel and inline processing. Acceleration of real-time workloads can be achieved by offloading compute-intensive deep learning tasks to the FPGA, thereby reducing CPU utilization. TCO reduces 3X, as a single FPGA can handle feeds from multiple cameras and shows deterministic latency while scaling-up the solution.

We will explore a CPU+FPGA-based implementation, including performance comparison, in a future post.

For more, see: Accelerating Real Time Video Analytics on a Heterogenous CPU + FPGA Platform.

References:

  1. https://analytics-zoo.github.io/master/.
  2. https://www.infoq.com/articles/video-stream-analytics-opencv/.

Authors

Categories

Share this page
Facebook
Twitter
LinkedIn