wanglingsong/JsonSurfer

Add Support For Knowing Parsing Completed

Closed this issue · 8 comments

Right now on trying to convert this Api to Reactive Streams we don't have any way to
know that parsing completed, hence can con signal onComplete event in Reactive Streams. So please add support for same.
If I am missing something then correct me.

The parsing complete when the surfing API exit.

        JsonSurfer surfer = JsonSurferGson.INSTANCE;
        surfer.configBuilder()
                .bind("$.store.book[*]", new JsonPathListener() {
                    @Override
                    public void onValue(Object value, ParsingContext context) {
                        System.out.println(value);
                    }
                })
                .buildAndSurf(sample);
        // Parsing complete here

Thanks, for helping.
Can, I use this snippet of Spring Reactor Flux with Spring Reactor WebFlux Controllers, Will it be non blocking??
public static Flux jsonPath(String json, String jsonPath) {
JsonSurfer surfer = JsonSurferJackson.INSTANCE;
return Flux.create(sink -> {
surfer.configBuilder().bind(jsonPath, (value, context) -> {
sink.next(value);
}).buildAndSurf(json);
sink.complete();
});
}

I think so. Just try it and see what happen

Hi, @ankurpathak . Does it work?

Yes Its working with both Mono and Flux,
Here is a code I am using.

//Mono

import org.jsfr.json.*;
import org.jsfr.json.provider.JacksonProvider;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collections;

import static java.util.Collections.emptyMap;
import static org.springframework.core.ResolvableType.forClass;

public class MonoUtil {

    private MonoUtil() {
    }

    public static <T> Flux<DataBuffer> toDataBuffer(DataBufferFactory bufferFactory, T t, Class<T> type, Jackson2JsonEncoder encoder) {
        ResolvableType elementType = ResolvableType.forClass(type);
        return encoder.encode(Mono.just(t), bufferFactory, elementType, MediaType.APPLICATION_JSON, emptyMap());
    }


    public static <T> Mono<T> fromDataBuffer(Flux<DataBuffer> body, Class<T> type, Jackson2JsonDecoder decoder) {
        ResolvableType elementType = forClass(type);
        return decoder.decodeToMono(body, elementType, MediaType.APPLICATION_JSON, Collections.emptyMap()).cast(type);
    }


    public static <T> Mono jsonPath(String json, String jsonPath) {
        JsonSurfer surfer = JsonSurferJackson.INSTANCE;
        return Mono.create(sink -> {
            surfer.configBuilder().bind(jsonPath, (value, context) -> sink.success(value)).buildAndSurf(json);
        });
    }

   
}

//Flux
import org.jsfr.json.JsonSurfer;
import org.jsfr.json.JsonSurferJackson;
import org.jsfr.json.ParsingContext;
import org.jsfr.json.TypedJsonPathListener;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collection;
import java.util.Collections;

import static java.util.Collections.emptyMap;
import static org.springframework.core.ResolvableType.forClass;

public class FluxUtil {

    private FluxUtil(){}


    public static <T> Flux<DataBuffer> toDataBuffer(DataBufferFactory bufferFactory, Collection<T> collection, Class<T> type, Jackson2JsonEncoder encoder){
        ResolvableType elementType = ResolvableType.forClass(type);
        return encoder.encode(Flux.fromIterable(collection), bufferFactory, elementType, MediaType.APPLICATION_JSON, emptyMap());
    }


    public static <T> Flux<T> fromDataBuffer(Flux<DataBuffer> body, Class<T> type, Jackson2JsonDecoder decoder){
        ResolvableType elementType = forClass(type);
        return decoder.decode(body, elementType, MediaType.APPLICATION_JSON, Collections.emptyMap()).cast(type);
    }
    public static <T> Flux<T> jsonPath(String json, Class<T> type, String jsonPath) {
        JsonSurfer surfer = JsonSurferJackson.INSTANCE;
        return Flux.create(sink -> {
            surfer.configBuilder().bind(jsonPath, type, (t,context) -> sink.next(t)).buildAndSurf(json);
            sink.complete();
        });
    }
}

Can't we have inbuild support for Reactive Streams(RxJava and Project Reactor)??

I don't think it's good idea to include extra dependency to support it in JsonSurfer, because not everyone uses JsonSurer with Reactive Streams and It looks like not too difficult to create Reactive Streams with JsonSurfer according to your code snippet. However, I can add one more example in ReadMe to show your use case. Anyway, Thanks for your suggestion.

Ya sure, Adding the example is also a good idea.