alexforencich/cocotbext-axi

using source() when dut has multiple inputs

catkira opened this issue · 30 comments

I have a complex multiplier with two axis inputs. I could not figure out how to use the source in this case. My workaround is to send data manually. Is there a better solution using the source?
My code looks like this

@cocotb.test()
async def multiple_multiplications_(dut):
    tb = TB(dut)    
    await tb.cycle_reset()
    tb.sink.queue = deque() # remove remaining items from last test    
    test_frames = []
    for i in range(20):
        a_bytes = get_bytes(int(tb.input_width_a/8),random_data())
        b_bytes = get_bytes(int(tb.input_width_b/8),random_data())

        dut.s_axis_a_tdata <= int.from_bytes(a_bytes, byteorder='big', signed=False)
        dut.s_axis_b_tdata <= int.from_bytes(b_bytes, byteorder='big', signed=False)
        test_frame = AxiStreamFrame([a_bytes,b_bytes])
        test_frames.append(test_frame)
        await RisingEdge(dut.clk)

    for test_frame in test_frames:
        rx_frame = await tb.sink.recv()
        receivedData = (rx_frame.tdata[0]).to_bytes(byteorder='big', length=int(tb.output_width/8), signed=False)
        received_r = receivedData[int(len(receivedData)/2):len(receivedData)]
        received_i = receivedData[0:int(len(receivedData)/2)]

        model = Model(tb.input_width_a,tb.input_width_b,tb.output_width) 
        calculatedData = model.calculate(test_frame.tdata[0],test_frame.tdata[1])
        calculated_i = calculatedData[0:int(tb.output_width/8/2)]
        calculated_r = calculatedData[int(tb.output_width/8/2):int(tb.output_width/8)]
        assert received_r == calculated_r, ("real part should have been %i but was %i " % 
                            (int.from_bytes(calculated_r,byteorder='big',signed=True),int.from_bytes(received_r,byteorder='big',signed=True)))
        assert received_i == calculated_i, ("imaginary part should have been %i but was %i " % 
                            (int.from_bytes(calculated_i,byteorder='big',signed=True),int.from_bytes(received_i,byteorder='big',signed=True)))
        assert calculatedData == receivedData, ("Error, expected %s got %s" % (calculatedData.hex(), receivedData.hex()))
        await RisingEdge(dut.clk)

Edit: how can I make the code show up correctly in this post?

You need to indent the code by four spaces (I fixed it for you). Anyway, I need to see the port definition on the HDL side, as I'm not sure what you're referring to. It sounds like maybe you want to use the generic streaming interface code instead of AXI stream (define_stream in cocotbext.axi.stream). This currently isn't documented, but it's proving to be incredibly useful, so I will likely spin it off into a separate repo at some point once it's a bit more mature. Take a look at the AXI and AXI lite models for how to use this code as it was originally developed to handle the 5 channels of AXI and AXI lite.

my hdl code looks like this

module complex_multiplier
    #(parameter INPUT_WIDTH_A `VL_RD = 16, // must be multiple of 8
      parameter INPUT_WIDTH_B `VL_RD = 16, // must be multiple of 8
      parameter OUTPUT_WIDTH `VL_RD = 32,  // must be multiple of 8
      parameter STAGES `VL_RD = 3,  // minimum value is 2
      parameter BLOCKING `VL_RD = 1,
      parameter TRUNCATE `VL_RD = 1)
    (   
        input wire             clk, nrst,
        // slave a
        input signed            [INPUT_WIDTH_A-1:0] s_axis_a_tdata,
        output reg                            s_axis_a_tready,
        input wire                            s_axis_a_tvalid,
        // slave b
        input signed            [INPUT_WIDTH_B-1:0] s_axis_b_tdata,
        output reg                            s_axis_b_tready,
        input wire                            s_axis_b_tvalid,
        // master output
        output reg signed		  [OUTPUT_WIDTH-1:0] m_axis_tdata,
        output reg                          m_axis_tvalid,
        input wire                           m_axis_tready
        );
...

The repo can be found here https://github.com/catkira/complex_multiplier

Ok, you literally have two independent AXI stream interfaces. In that case, you should use two AXI stream source instances, one for 's_axis_a' and one for 's_axis_b', and one AXI stream sink instance for 'm_axis'.

I was thinking you had something like this:

input  wire [WIDTH-1:0] s_axis_tdata_i,
input  wire [WIDTH-1:0] s_axis_tdata_q,
input  wire s_axis_tvalid,
output wire s_axis_tready

or something along those lines. In which case you need a different approach.

Also, don't do that:

tb.sink.queue = deque()

Replacing an object like that is not great, calling clear() is better. Even better, do tb.sink.clear(). Which I will go add right now, not sure how I forgot that one.

When I do

    await tb.source_a.send(AxiStreamFrame(a_bytes))
    await tb.source_b.send(AxiStreamFrame(b_bytes))

instead of

    dut.s_axis_a_tdata <= int.from_bytes(a_bytes, byteorder='big', signed=False)
    dut.s_axis_a_tvalid <= 1    
    dut.s_axis_b_tdata <= int.from_bytes(b_bytes, byteorder='big', signed=False)
    dut.s_axis_b_tvalid <= 1    
    await RisingEdge(dut.clk)
    dut.s_axis_a_tvalid <= 0
    dut.s_axis_b_tvalid <= 0 

it does not work.

It works now when I do it like this

        await tb.source_a.send(AxiStreamFrame(a_bytes[::-1]))
        await tb.source_b.send(AxiStreamFrame(b_bytes[::-1]))

it was also important to set the byte_size to 8 like this

        self.source_a = AxiStreamSource(dut, "s_axis_a", dut.clk, byte_size=8)
        self.source_a.bytesize = 8
        self.source_b = AxiStreamSource(dut, "s_axis_b", dut.clk, byte_size=8)
        self.sink = AxiStreamSink(dut, "m_axis", dut.clk)        

I think it would make sense to set the default byte size to 8, because a byte is usually 8 bits.
Another things I had to figure out was that I manually have to reverse the bytes, because I use big endian. Is it possible to set this somewhere in the source?
With the received data I do

    receivedData = (rx_frame.tdata[0]).to_bytes(byteorder='big', length=int(tb.output_width/8))

which is also a bit strange.

Bytes are all about transfer granularity. You can think of a byte as the smallest atomic unit of data that can be handled by an interface. Byte size is usually determined by len(tdata)/len(tkeep), as each wire in tkeep controls the transfer of one byte. If tkeep is not connected, then it is assumed that every clock cycle transfers one byte, of size len(tkeep). For moving packet data that comes varying lengths with byte granularity, you'll have tkeep connected. For moving ADC samples or something, you probably won't have tkeep connected, but there is a high chance that you'll want to use a byte size of other than 8 bits, such as 12 or 14 bits. In this case, it's more convenient to assume that the interface transfers one byte per cycle, since you have no way of transferring a part of a cycle.

Byte ordering: #6. The AXI stream specification is very explicit about byte ordering - tdata[0] is the least-significant bit of the first byte of the transfer. If you want to make that the most significant byte instead, then you can do the shuffling yourself. Note that if you use an AXI stream data width converter, in general it will follow the standard AXI stream byte ordering rules, which may produce unexpected results if you're assuming a different byte ordering. Besides, Verilog is little-endian by default, it's not clear why you would want to make your life more difficult by shoehorning things into big-endian unless you're dealing with network packet headers that use network byte ordering.

Also, if you're doing DSP stuff, I'm not sure why you're slicing and dicing into bytes in the first place. If you're running with, say, an 18 bit data path, why not make the byte size 18 bits? That will eliminate all of the to_bytes/from_bytes stuff. 18 bits is also the width supported by DSP slices on many FPGAs, and it's also not divisible by 8 so if you arbitrarily force things into 8 bit bytes, the datapath will be much less flexible and won't be able to efficiently utilize device resources.

Also, instead of int(tb.output_width/8), you could do an integer division tb.output_width//8 or extract the byte width from the simulation models tb.sink.byte_size.

Thanks for the detailed answer. Using the // operator is quite useful, I did not know that before.
I am unused to bytes not having 8 bits, I think its because I have so far only used AXI streams without the tkeep and tlast signal.
I have converted my design to little endian now. But now I have a little inkonvenience at another location. I use the https://bitstring.readthedocs.io/en/latest/ library and initialize it with my test data to select certain bits in my model. However this lib always assumes big endian byte order. It has an endian parameter, but that seems to be only relevant on a bit level :/

If my interface is for example 16 bits wide and I want to use random data for testing. Then I do

   tb.source_a = AxiStreamSource(dut, "s_axis_a", dut.clk, byte_size=8)
....
   a_bytes = get_bytes(tb.input_width_a//8,random_data())
   await tb.source_a.send(AxiStreamFrame(a_bytes))

that seems to work and sends 16 bits with one clk cycle. If I set byte_size to 16 as you recommended it does now work. Can You give an example how I should do it in this case?

well, if you set byte_size to 16, then you don't provide a bytes/bytearray, instead you would provide a list of 16-bit ints

and I don't know anything about bitstring. I have used bitarray before, though. What I can tell you is that cocotb BinaryValue is terrible for a number of different reasons, and the cocotb folks are looking at rewriting/replacing it with something better, although it remains to be seen exactly what that would be. Most of the existing bit-access libraries don't support x and z, which is necessary when interfacing with HDL simulators.

Okay makes sense.
However the xilinx cores are still byte oriented even if the interface is n bits wide. IE if the data width is 12 it will be extended to 16 bits and filled with zeros. For the complex multiplier one axi stream contains the real and imag part. Each part is filled up with zeros separately to have a width that is a multiple of 8. Since I want my cores to be compatible with xilinx cores, I have to stick a bit to the classical 8 bit byte scheme.

Oh, interesting, I have never used any of the Xilinx DSP components before, the most I have done is infer DSP slices. But, that being said, if there is no tkeep, then you don't have to split things into bytes, all you have to do is constrain the AXI stream data width to a multiple of 8, or perhaps a power of 2 multiple of 8. What I would recommend in that case is decoupling the AXI stream interface width from the internal data width. So you can have DATA_WIDTH = 12, and AXIS_DATA_WIDTH = ((DATA_WIDTH+7)/8)*8 or AXIS_DATA_WIDTH = 8**$clog2((DATA_WIDTH+7)/8) or something along those lines. And then leave out the byte_size and byte_lanes parameters, and instead operate on integers. Only reason to deal with bytes is if you have to do something strange like byte-swapping.

ok, thanks for helping!

No problem!

Just a quick comment: Xilinx follows the "AMBA® 4 AXI4-Stream Protocol" as defined by ARM. It specifies: "TDATA is the primary payload that is used to provide the data that is passing across the interface. The width of the data payload is an integer number of bytes." So technically only full bytes are allowed by this standard.
The Xilinx DSP IPs (I believe its all IPs with AXIS interface) are not strictly speaking "zero padding" the data from 12 bits to 16 bits for example. They use "sign extended" values which would only be zero for the unsigned or positive data values. So, if you use int8, int16, int32 in python and make sure that the value inside the variable does not exceed the number of "real" bits then no individual bit-flipping or bit extractions should be necessary in the python script to test the AXIS interfaces on your code. If you only use positive values the unsigned int16 type would do the trick.

Signed may actually be OK, at least on the source side. The AXI stream source module generates the tdata signal like so:

tdata_val = 0

for offset in range(self.byte_lanes):
    tdata_val |= (frame.tdata.pop(0) & self.byte_mask) << (offset * self.byte_size)

so whatever you pass in with get bitwise ANDed with a byte-lane-width mask, which will be the full width if byte_lanes = 1. Python will sign-extend during the masking operation. So, if you call source_inst.send([-1]) on a bus with byte_lanes = 1 and byte_size = 16, it will drive 65535 onto tdata.

However, the receive side won't recover the sign bit, so if you pass that back through to the sink, you'll get 65535 instead of -1. I would be open to revising some of this functionality if it makes sense to do so, as this could make using these modules for DSP applications more convenient. Maybe this would be as simple as adding a signed parameter to recover the sign bit in the sink and monitor.

I think it would be ideal to make changes that allow for the lets say "DSP cases". It would make this extension more versatile and interesting to more users that would hopefully contribute.

In order to make this most generic, some other use-cases should be taken into account as well:

  • We discussed the case in which a single number is in the TDATA vector.
  • TDATA could contain multiple samples (signed numbers) e.g. two 16 bit numbers stacked into a 32 bit TDATA vector (2x 12 bit numbers would end up being the same 32 bits due to the byte requirement). I think it would be necessary to separate them in order to test DSP functions properly against a model.
  • @catkira mentioned the complex number case that comes in two flavors: similar to the two sample example described and in two subsequent TDATA cycles.

Xilinx document UG1037 contains some examples (starting page 82) for the most common cases: "Real Scalar Data" and "Complex Scalar Data"
https://www.xilinx.com/support/documentation/ip_documentation/axi_ref_guide/latest/ug1037-vivado-axi-reference-guide.pdf

Wondering if it would be best to leave the data as "binary" bytes in the TX/RX AXIStream functions and to convert it in the test-bench before send and after receive as needed. I feel it would be easier to combine the Xilinx/ARM standard with all the stacked data flavors without breaking the current implementation.

I used "struct.unpack(format, buffer)" in the past for this kind of processing since it facilitates the conversion of a binary "string" into an array of signed/unsigned numbers as specified by the format description. I found it to be very powerful.

For two 16 bit samples, just set byte_lanes = 2. So long as the interface is a bunch of equal-width segments concatenated together, this works fine. Although, there is possibly a difference in terms of how the samples are handled after that: maybe presenting a list of tuples instead of a pure list would make more sense, and providing an option to support that should be straightforward.

Alternatively, maybe it makes more sense to implement modules specifically with DSP in mind, instead of adding lots of options to these modules. In that case, you have a set of modules for byte-oriented data (i.e. network packets), and a set of modules for DSP (IQ samples, etc.). If you do something like that, then you could even have the module natively process numpy arrays, complex numbers, floating point numbers, etc. and handle all of the conversions and re-packing automatically.

Actually, looking at those diagrams for moving complex numbers, it seems like you can just send separate frames, with two elements each for both cases. In the two 12 bit samples on a 32 bit bus case there, just set byte_lanes = 2, and then send([i1, q1]), send([i2, q2]), etc. If you want the sequential case, set byte_lanes = 1 and call send in the same way. But having a wrapper that would accept numpy arrays or similar would certainly make that more convenient.

Agreed. I think, I like the idea of a wrapper that accepts numpy arrays that would also make it very easy to create the circuit equivalent models without the hassle of extra conversions.

I used "struct.unpack(format, buffer)" in the past for this kind of processing since it facilitates the conversion of a binary "string" into an array of signed/unsigned numbers as specified by the format description. I found it to be very powerful.

I converted it to BitString and then selected the bits I want. But using struct.unpack is probably more elegant, I will try it.

Actually, looking at those diagrams for moving complex numbers, it seems like you can just send separate frames, with two elements each for both cases. In the two 12 bit samples on a 32 bit bus case there, just set byte_lanes = 2, and then send([i1, q1]), send([i2, q2]), etc. If you want the sequential case, set byte_lanes = 1 and call send in the same way. But having a wrapper that would accept numpy arrays or similar would certainly make that more convenient.

So for a concrete example, lets take a complex multiplier where Im and Re are 12 bits each. We assume no TLAST, so Im and Re are tranferred in one bus cycle. If I have a complex number in my test environment that I could pass to send by just invoking send(c) that would be convenient. However, since the complex numbers in python are not fixed point, I usually use some wrappers for my numbers, ie https://pypi.org/project/fixedpoint/, to quantizize them. Since the wrappers dont support complex numbers, I have to treat Im and Re as separate numbers anyway. So I think in this case, a sink/source interface that accepts complex numbers would not help to much, unless it accepts some kind of complex fixed point numbers.
The approach I would use now to send random complex data to DSP stream components is

  1. create a separate number for Im and Re with enough or more bits (ie some random bytes)
  2. quantize and truncate it to the right amount of bits, ie by using FixedPoint
  3. convert the FixtPoint to int and put it into the send routine (this conversion is usually done automatically)

Another aspect is that I want to test as much of the hdl code as possible in my model. For this reason my model now just accepts a couple of bytes similar to the tdata bits of the hdl code. If sink/source would accept more high level input like complex numbers, it would only be an andvantage if I also use them as an input to my model. But then I could miss if my hdl code has some mistakes when separating the input bits into the Re and Im numbers.

I have implemented the following function in my TB class, I use FixedPoint to do the sign extension

    def getRandomIQSample(self, width):
        real = random.randint(-2**(width//2-1)-1, 2**(width//2-1))
        imag = random.randint(-2**(width//2-1)-1, 2**(width//2-1))
        real_fp = FixedPoint(real, signed=True,m=width//2,n=0)
        imag_fp = FixedPoint(imag, signed=True,m=width//2,n=0)
        axis_width = ((width+15)//16)*16
        real_bytes = int(real_fp).to_bytes(length=axis_width//8//2, byteorder='big', signed=True)
        imag_bytes = int(imag_fp).to_bytes(length=axis_width//8//2, byteorder='big', signed=True)
        sample = real_bytes
        sample += imag_bytes
        return sample

In my test I just do

    a_bytes = tb.getRandomIQSample(tb.input_width_a)
    b_bytes = tb.getRandomIQSample(tb.input_width_b)

    # send data, ignore tready
    tb.dut.rounding_cy <= 0
    await tb.source_a.send(AxiStreamFrame(a_bytes[::-1]))
    await tb.source_b.send(AxiStreamFrame(b_bytes[::-1]))

I think its quite elegant and works very smoothly. I have changed back my test environment to big endian, because the bit order is always big endian and mixing it with little endian byte order makes it very hard to read for humans. So when I pass the bytes to source.send I just reverse endianess with a_bytes[::-1]
When I receive data I do

    rx_frame = await tb.sink.recv()
    [received_i, received_r] = tb.frameToIQ(rx_frame)

The helper function frameToIQ does the back conversion from little to big endian

    def frameToIQ(self, rx_frame):
        receivedData = (rx_frame.tdata[0]).to_bytes(byteorder='little', length=self.output_width//8)
        received_i = receivedData[len(receivedData)//2:len(receivedData)]
        received_r = receivedData[0:len(receivedData)//2]
        return [received_i[::-1], received_r[::-1]]

Its a bit weird that I have to do [::-1] two times in this function to every byte, so it should cancel out. I did not think about the reason yet, but it works :) This frameToIQ function can probably be simplified a bit.

Yes, I can simplify the frameToIQ like this

    def frameToIQ(self, rx_frame):
        receivedData = (rx_frame.tdata[0]).to_bytes(byteorder='big', length=self.output_width//8)
        received_r = receivedData[len(receivedData)//2:len(receivedData)]
        received_i = receivedData[0:len(receivedData)//2]
        return [received_i, received_r]

I am a bit confused why the data received from the sink is not in little endian? Or is it because rx_frame.tdata[0] is an int and to_bytes already converts it from little to big endian? yes I think thats it.

Not sure testing with a fixed-point number is really necessary for your multiplier verification. A large amount of random integers as I&Q inputs that do not exceed your targeted N-bits would test the multiplier sufficiently to determine functionality. Fixed-point multiplications are just two's complement numbers that are multiplied exactly the same way as signed/unsigned decimals would. The decimal point is just a matter of interpretation of the result but it does not change the bits on the output.
Am I missing something?

I could see the a use-case in which you had a captured dateset that you would like to run through the module but that is probably a special case.

I just use the FixedPoint class to do the sign extension to the next full byte size for me. There might be simpler solutions for sign extension, but creating one FixedPoint object is pretty simple. Its not really needed for the multiplication itself.

Python does sign extension just fine by itself, no need for any extra libraries: -1 & 0xffff = 0xffff.

Yes You are right. I simplified it:
catkira/complex_multiplier@de1cfd0 ;)