Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
73 views
in Technique[技术] by (71.8m points)

c# - What is the fastest way to do Array Table Lookup with an Integer Index?

I have a video processing application that moves a lot of data.

To speed things up, I have made a lookup table, as many calculations in essence only need to be calculated one time and can be reused.

However I'm at the point where all the lookups now takes 30% of the processing time. I'm wondering if it might be slow RAM.. However, I would still like to try to optimize it some more.

Currently I have the following:

public readonly int[] largeArray = new int[3000*2000];
public readonly int[] lookUp = new int[width*height];

I then perform a lookup with a pointer p (which is equivalent to width * y + x) to fetch the result.

int[] newResults = new int[width*height];
int p = 0;
for (int y = 0; y < height; y++) {
   for (int x = 0; x < width; x++, p++) {
      newResults[p] = largeArray[lookUp[p]];
   }
}

Note that I cannot do an entire array copy to optimize. Also, the application is heavily multithreaded.

Some progress was in shortening the function stack, so no getters but a straight retrieval from a readonly array.

I've tried converting to ushort as well, but it seemed to be slower (as I understand it's due to word size).

Would an IntPtr be faster? How would I go about that?

Attached below is a screenshot of time distribution:

enter image description here

question from:https://stackoverflow.com/questions/57991432/what-is-the-fastest-way-to-do-array-table-lookup-with-an-integer-index

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

It looks like what you're doing here is effectively a "gather". Modern CPUs have dedicated instructions for this, in particular VPGATHER** . This is exposed in .NET Core 3, and should work something like below, which is the single loop scenario (you can probably work from here to get the double-loop version);

results first:

AVX enabled: False; slow loop from 0
e7ad04457529f201558c8a53f639fed30d3a880f75e613afe203e80a7317d0cb
for 524288 loops: 1524ms

AVX enabled: True; slow loop from 1024
e7ad04457529f201558c8a53f639fed30d3a880f75e613afe203e80a7317d0cb
for 524288 loops: 667ms

code:

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Runtime.Intrinsics;
using System.Runtime.Intrinsics.X86;

static class P
{
    static int Gather(int[] source, int[] index, int[] results, bool avx)
    {   // normally you wouldn't have avx as a parameter; that is just so
        // I can turn it off and on for the test; likewise the "int" return
        // here is so I can monitor (in the test) how much we did in the "old"
        // loop, vs AVX2; in real code this would be void return

        int y = 0;
        if (Avx2.IsSupported && avx)
        {
            var iv = MemoryMarshal.Cast<int, Vector256<int>>(index);
            var rv = MemoryMarshal.Cast<int, Vector256<int>>(results);

            unsafe
            {
                fixed (int* sPtr = source)
                {
                    // note: here I'm assuming we are trying to fill "results" in
                    // a single outer loop; for a double-loop, you'll probably need
                    // to slice the spans
                    for (int i = 0; i < rv.Length; i++)
                    {
                        rv[i] = Avx2.GatherVector256(sPtr, iv[i], 4);
                    }
                }
            }
            // move past everything we've processed via SIMD
            y += rv.Length * Vector256<int>.Count;
        }
        // now do anything left, which includes anything not aligned to 256 bits,
        // plus the "no AVX2" scenario
        int result = y;
        int end = results.Length; // hoist, since this is not the JIT recognized pattern
        for (; y < end; y++)
        {
            results[y] = source[index[y]];
        }
        return result;
    }

    static void Main()
    {
        // invent some random data
        var rand = new Random(12345);
        int size = 1024 * 512;
        int[] data = new int[size];
        for (int i = 0; i < data.Length; i++)
            data[i] = rand.Next(255);

        // build a fake index
        int[] index = new int[1024];
        for (int i = 0; i < index.Length; i++)
            index[i] = rand.Next(size);

        int[] results = new int[1024];

        void GatherLocal(bool avx)
        {
            // prove that we're getting the same data
            Array.Clear(results, 0, results.Length);
            int from = Gather(data, index, results, avx);
            Console.WriteLine($"AVX enabled: {avx}; slow loop from {from}");
            for (int i = 0; i < 32; i++)
            {
                Console.Write(results[i].ToString("x2"));
            }
            Console.WriteLine();

            const int TimeLoop = 1024 * 512;
            var watch = Stopwatch.StartNew();
            for (int i = 0; i < TimeLoop; i++)
                Gather(data, index, results, avx);
            watch.Stop();
            Console.WriteLine($"for {TimeLoop} loops: {watch.ElapsedMilliseconds}ms");
            Console.WriteLine();
        }
        GatherLocal(false);
        if (Avx2.IsSupported) GatherLocal(true);
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...