1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.messaging.simp.stomp;
18
19 import java.nio.ByteBuffer;
20
21 import reactor.function.Consumer;
22 import reactor.function.Function;
23 import reactor.io.Buffer;
24 import reactor.io.encoding.Codec;
25
26 import org.springframework.messaging.Message;
27 import org.springframework.util.Assert;
28
29
30
31
32
33
34
35
36 public class Reactor11StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[]>> {
37
38 private final StompDecoder stompDecoder;
39
40 private final StompEncoder stompEncoder;
41
42 private final Function<Message<byte[]>, Buffer> encodingFunction;
43
44
45 public Reactor11StompCodec() {
46 this(new StompEncoder(), new StompDecoder());
47 }
48
49 public Reactor11StompCodec(StompEncoder encoder, StompDecoder decoder) {
50 Assert.notNull(encoder, "'encoder' is required");
51 Assert.notNull(decoder, "'decoder' is required");
52 this.stompEncoder = encoder;
53 this.stompDecoder = decoder;
54 this.encodingFunction = new EncodingFunction(this.stompEncoder);
55 }
56
57 @Override
58 public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
59 return new DecodingFunction(this.stompDecoder, messageConsumer);
60 }
61
62 @Override
63 public Function<Message<byte[]>, Buffer> encoder() {
64 return this.encodingFunction;
65 }
66
67
68 private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
69
70 private final StompEncoder encoder;
71
72 private EncodingFunction(StompEncoder encoder) {
73 this.encoder = encoder;
74 }
75
76 @Override
77 public Buffer apply(Message<byte[]> message) {
78 byte[] bytes = this.encoder.encode(message);
79 return new Buffer(ByteBuffer.wrap(bytes));
80 }
81 }
82
83 private static class DecodingFunction implements Function<Buffer, Message<byte[]>> {
84
85 private final StompDecoder decoder;
86
87 private final Consumer<Message<byte[]>> messageConsumer;
88
89 public DecodingFunction(StompDecoder decoder, Consumer<Message<byte[]>> next) {
90 this.decoder = decoder;
91 this.messageConsumer = next;
92 }
93
94 @Override
95 public Message<byte[]> apply(Buffer buffer) {
96 for (Message<byte[]> message : this.decoder.decode(buffer.byteBuffer())) {
97 this.messageConsumer.accept(message);
98 }
99 return null;
100 }
101 }
102 }