Netflix/zeno

Bug, prepareForNextCycle() is broken, when writing deltas

okrische opened this issue · 1 comments

My test has two state engines, a producer and a consumer:

The producer:

  • writes a value and then a snapshot
  • writes several values and a delta after each value
  • after each write it calls ONCE prepareForNextCycle

After each write, the consumer is updated with the written snapshot/delta.

My test checks, if value/version are the same on producer and consumer.

Has anybody an idea, what is going on? It actually fails on each second delta.

The workaround for me is right now to call prepareForNextCycle twice (its commented in the test, if you uncomment it, the test will work

This is the test:

    @Test
    public void testFastBlobStateEngine() throws Exception {
        final String typeName = new LongSerializer().getName();
        final SerializerFactory producerFactory = () -> new NFTypeSerializer[] { new LongSerializer() };
        final SerializerFactory consumerFactory = () -> new NFTypeSerializer[] { new LongSerializer() };
        final FastBlobStateEngine producer = new FastBlobStateEngine(producerFactory);
        final FastBlobStateEngine consumer = new FastBlobStateEngine(consumerFactory);

        // producer: write value "-1", bump up latest engine version
        producer.prepareForNextCycle();
        Long value = -1L;
        producer.add(typeName, value);
        producer.prepareForWrite();
        producer.setLatestVersion(""+-1);

        // producer: write snapshot
        final FastBlobWriter snapshotWriter = new FastBlobWriter(producer);
        final ByteArrayOutputStream snapshotStream = new ByteArrayOutputStream();
        snapshotWriter.writeSnapshot(snapshotStream);
        snapshotStream.close();
        producer.prepareForNextCycle();
        // BUG: always need a second call to cycle
        // producer.prepareForNextCycle();

        // consumer: read snapshot
        final FastBlobReader snapshotReader = new FastBlobReader(consumer);
        snapshotReader.readSnapshot(new ByteArrayInputStream(snapshotStream.toByteArray()));

        // test, if producer/consumer have same value/version
        assertEquals(producer.getLatestVersion(), consumer.getLatestVersion());
        assertEquals(value, consumer.getTypeDeserializationState(typeName).get(0));

        // - producer: write deltas to byte array, by setting values from 0 to n
        // - consumer: read delta to keep the consumer up to date with latest value
        // test, if producer/consumer have same value
        final int n = 10;
        for(long i = 0; i < n; i++) { 
            // producer: write value
            value = i;
            producer.add(typeName, value);
            producer.setLatestVersion(""+value);
            producer.prepareForWrite();         

            // producer: write delta stream
            final FastBlobWriter deltaWriter = new FastBlobWriter(producer);
            final ByteArrayOutputStream deltaStream = new ByteArrayOutputStream();
            deltaWriter.writeDelta(deltaStream);
            deltaStream.close();

            producer.prepareForNextCycle();
            // BUG: always need a second call to cycle
            // producer.prepareForNextCycle();

            // consumer: read delta stream
            final FastBlobReader deltaReader = new FastBlobReader(consumer);
            deltaReader.readDelta(new ByteArrayInputStream(deltaStream.toByteArray()));

            // test, if value AND version are the same on producer/consumer
            assertEquals(producer.getLatestVersion(), consumer.getLatestVersion());
            assertEquals(value, consumer.getTypeDeserializationState(typeName).get(0));
        }
    }

(tested with v2.22.3 and v2.16)