@@ -34,7 +34,6 @@ public class DataStructureConverter implements Function<DataStructure<String>, S
3434 public static final Schema HASH_SCHEMA = SchemaBuilder .map (Schema .STRING_SCHEMA , Schema .STRING_SCHEMA ).optional ();
3535 public static final Schema LIST_SCHEMA = SchemaBuilder .array (Schema .STRING_SCHEMA ).optional ();
3636 public static final Schema SET_SCHEMA = SchemaBuilder .array (Schema .STRING_SCHEMA ).optional ();
37- public static final Schema STREAM_SCHEMA = SchemaBuilder .array (StreamMessageConverter .VALUE_SCHEMA ).optional ();
3837 public static final Schema STRING_SCHEMA = Schema .OPTIONAL_STRING_SCHEMA ;
3938 public static final Schema ZSET_SCHEMA = SchemaBuilder .map (Schema .FLOAT64_SCHEMA , Schema .STRING_SCHEMA ).optional ();
4039 public static final Schema VALUE_SCHEMA = SchemaBuilder .struct ().field (FIELD_KEY , Schema .STRING_SCHEMA )
@@ -48,20 +47,39 @@ public Struct apply(DataStructure<String> input) {
4847 struct .put (FIELD_KEY , input .getKey ());
4948 struct .put (FIELD_TTL , input .getTtl ());
5049 struct .put (FIELD_TYPE , input .getType ());
51- struct .put (fieldName (input ), fieldValue (input ));
52- return struct ;
53- }
54-
55- @ SuppressWarnings ("unchecked" )
56- private Object fieldValue (DataStructure <String > input ) {
5750 switch (input .getType ()) {
58- case DataStructure .ZSET :
59- return zsetMap ((Collection <ScoredValue <String >>) input .getValue ());
51+ case DataStructure .HASH :
52+ struct .put (FIELD_HASH , input .getValue ());
53+ break ;
54+ case DataStructure .JSON :
55+ struct .put (FIELD_JSON , input .getValue ());
56+ break ;
57+ case DataStructure .LIST :
58+ struct .put (FIELD_LIST , input .getValue ());
59+ break ;
6060 case DataStructure .SET :
61- return new ArrayList <>(((Collection <String >) input .getValue ()));
61+ struct .put (FIELD_SET , list (input ));
62+ break ;
63+ case DataStructure .STRING :
64+ struct .put (FIELD_STRING , input .getValue ());
65+ break ;
66+ case DataStructure .ZSET :
67+ struct .put (FIELD_ZSET , zsetMap (input ));
68+ break ;
6269 default :
63- return input . getValue () ;
70+ break ;
6471 }
72+ return struct ;
73+ }
74+
75+ @ SuppressWarnings ("unchecked" )
76+ private Object list (DataStructure <String > input ) {
77+ return new ArrayList <>((Collection <String >) input .getValue ());
78+ }
79+
80+ public static Map <Double , String > zsetMap (DataStructure <String > input ) {
81+ Collection <ScoredValue <String >> value = input .getValue ();
82+ return zsetMap (value );
6583 }
6684
6785 public static Map <Double , String > zsetMap (Collection <ScoredValue <String >> value ) {
@@ -72,23 +90,4 @@ public static Map<Long, Double> timeseriesMap(Collection<Sample> samples) {
7290 return samples .stream ().collect (Collectors .toMap (Sample ::getTimestamp , Sample ::getValue ));
7391 }
7492
75- public static String fieldName (DataStructure <String > input ) {
76- switch (input .getType ()) {
77- case DataStructure .HASH :
78- return FIELD_HASH ;
79- case DataStructure .JSON :
80- return FIELD_JSON ;
81- case DataStructure .LIST :
82- return FIELD_LIST ;
83- case DataStructure .SET :
84- return FIELD_SET ;
85- case DataStructure .STRING :
86- return FIELD_STRING ;
87- case DataStructure .ZSET :
88- return FIELD_ZSET ;
89- default :
90- return FIELD_STRING ;
91- }
92- }
93-
9493}
0 commit comments