These samples demonstrate functionality available when using the hdfs
sink and its partitioning features. We have 9 different test scenarios with source data sample files. See hdfs sink documentation.
-
Output from ‘hadoop fs ls’ has been manually modified to show only size, time and path.
-
Tests using ctrl-1, ctrl-2 special characters doesn’t show up in ‘hadoop fs cat’ when pasted to this document. They are visible from the command line though.
-
Finding reactor-ip source tcp address. Use “runtime modules” and “runtime container” to match Container Id and Host where source is running. Default port is 3000. You can send any data to it by piping it via netcat, i.e. “cat hash-100000.txt | nc node3 3000”. If running singlenode instance, address would always be localhost.
-
In these instructions, tests are run using Spring XD in distributed mode with three
hdfs
sinks. However, it’s perfectly all right to run these using a singlenode installation. If you want to have only onehdfs
sink, leave out theproperties
option when usingstream deploy
command. With singlenode you just have a slightly different output when listing files.
With this test you can see that new partitioned files are opened when we get past the next minute boundary. Pattern used in dateFormat
is resolved with current timestamp.
xd:>stream create --name test1 --definition "time|hdfs --idleTimeout=10000 --partitionPath=dateFormat('yyyy/MM/dd/HH/mm')"
xd:>stream deploy --name test1 --properties "module.hdfs.count=3"
Keep the stream running and list created files. Files having a suffix .tmp
are currently open for write operation.
xd:>hadoop fs ls --recursive true --dir /xd/test1
0 2014-05-22 19:25 /xd/test1/2014
0 2014-05-22 19:25 /xd/test1/2014/05
0 2014-05-22 19:25 /xd/test1/2014/05/22
0 2014-05-22 19:26 /xd/test1/2014/05/22/19
0 2014-05-22 19:26 /xd/test1/2014/05/22/19/25
140 2014-05-22 19:26 /xd/test1/2014/05/22/19/25/test1-0.txt
140 2014-05-22 19:26 /xd/test1/2014/05/22/19/25/test1-1.txt
140 2014-05-22 19:26 /xd/test1/2014/05/22/19/25/test1-2.txt
0 2014-05-22 19:26 /xd/test1/2014/05/22/19/26
0 2014-05-22 19:26 /xd/test1/2014/05/22/19/26/test1-0.txt.tmp
0 2014-05-22 19:26 /xd/test1/2014/05/22/19/26/test1-1.txt.tmp
0 2014-05-22 19:26 /xd/test1/2014/05/22/19/26/test1-2.txt.tmp
Three XD containers will have its own writer and rolling part in file name is initialized on demand so we can write into a same directory even if we don’t have uuid identifier enabled. Idle timeout is closing old files when we get past the minute boundaries (or whatever is denoted by the partition path)
xd:>stream destroy --name test1
xd:>hadoop fs ls --recursive true --dir /xd/test1
0 2014-05-22 19:25 /xd/test1/2014
0 2014-05-22 19:25 /xd/test1/2014/05
0 2014-05-22 19:25 /xd/test1/2014/05/22
0 2014-05-22 19:26 /xd/test1/2014/05/22/19
0 2014-05-22 19:26 /xd/test1/2014/05/22/19/25
140 2014-05-22 19:26 /xd/test1/2014/05/22/19/25/test1-0.txt
140 2014-05-22 19:26 /xd/test1/2014/05/22/19/25/test1-1.txt
140 2014-05-22 19:26 /xd/test1/2014/05/22/19/25/test1-2.txt
0 2014-05-22 19:26 /xd/test1/2014/05/22/19/26
140 2014-05-22 19:26 /xd/test1/2014/05/22/19/26/test1-0.txt
140 2014-05-22 19:26 /xd/test1/2014/05/22/19/26/test1-1.txt
140 2014-05-22 19:26 /xd/test1/2014/05/22/19/26/test1-2.txt
With this test we can see that new partitioned files are opened without using partitionPath. If you setup fixedDelay and idleTimeout so that the writers will get timeouts, you’ll see how new rolling part is opened even if it’s a distributed writer. If using a singlenode you’d see only one file open at any time.
xd:>stream create --name test2 --definition "time --fixedDelay=5|hdfs --idleTimeout=10000"
xd:>stream deploy --name test2 --properties "module.hdfs.count=3"
Keep the stream running and list created files.
xd:>hadoop fs ls --recursive true --dir /xd/test2
40 2014-05-22 19:46 /xd/test2/test2-0.txt
20 2014-05-22 19:46 /xd/test2/test2-1.txt
0 2014-05-22 19:45 /xd/test2/test2-2.txt.tmp
0 2014-05-22 19:46 /xd/test2/test2-3.txt.tmp
0 2014-05-22 19:46 /xd/test2/test2-4.txt.tmp
Same as test2 but we use uuid in file names. Uuid will add a unique file name per writer so the rolling part is incremented within a writer itself, not globally.
xd:>stream create --name test3 --definition "time --fixedDelay=5|hdfs --idleTimeout=10000 --fileUuid=true"
xd:>stream deploy --name test3 --properties "module.hdfs.count=3"
Keep the stream running and list created files.
xd:>hadoop fs ls --recursive true --dir /xd/test3
0 2014-05-22 20:03 /xd/test3/test3-0a659f17-7a98-4f1b-8528-66eb2d3a33a2-0.txt.tmp
20 2014-05-22 20:03 /xd/test3/test3-351baa09-a175-433a-b176-c15ff594471c-0.txt
0 2014-05-22 20:03 /xd/test3/test3-351baa09-a175-433a-b176-c15ff594471c-1.txt.tmp
40 2014-05-22 20:03 /xd/test3/test3-96858e6a-7840-4c65-90eb-42dc42ab953e-0.txt
In this test we use time to get dummy tick, use transform to create a random content and finally use path() and dateFormat() to construct a path from a payload. This is a sample where data is delimited with ctrl-1 and ctrl-2 special characters and first part is the app id to be used for partitioning.
xd:>stream create --name test4 --definition "time | transform --expression=\"'APP'+T(Math).round(T(Math).random()*5)+'\u0001foo\u0002bar'\" | hdfs --idleTimeout=10000 --partitionPath=path(dateFormat('yyyy/MM/dd/HH'),payload.split('\u0001')[0])"
xd:>stream deploy --name test4 --properties "module.hdfs.count=3"
You should see partitioning by 6 different app id’s and rolling parts because different writers are writing into a same directory.
xd:>hadoop fs ls --recursive true --dir /xd/test4
0 2014-05-22 20:16 /xd/test4/2014
0 2014-05-22 20:16 /xd/test4/2014/05
0 2014-05-22 20:16 /xd/test4/2014/05/22
0 2014-05-22 20:16 /xd/test4/2014/05/22/20
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP0
13 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP0/test4-0.txt
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP0/test4-1.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP1
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP1/test4-0.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP1/test4-1.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP1/test4-2.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP2
26 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP2/test4-0.txt
26 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP2/test4-1.txt
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP2/test4-2.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP3
39 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP3/test4-0.txt
13 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP3/test4-1.txt
13 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP3/test4-2.txt
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP3/test4-3.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP4
26 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP4/test4-0.txt
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP4/test4-1.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP4/test4-2.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP4/test4-3.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP5
13 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP5/test4-0.txt
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP5/test4-1.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP5/test4-2.txt.tmp
0 2014-05-22 20:16 /xd/test4/2014/05/22/20/APP5/test4-3.txt.tmp
Let’s just check that content is routed correctly.
xd:>hadoop fs cat /xd/test4/2014/05/22/20/APP0/test4-0.txt
APP0foobar
xd:>hadoop fs cat /xd/test4/2014/05/22/20/APP2/test4-1.txt
APP2foobar
APP2foobar
We use the ctrl delimited data pre-written in app1to10-*.txt files. Feed this data into hdfs sinks via reactor-ip source. See notes how to find reactor tcp port and how to feed data into it. We simply use dateFormat and list partitioning to collect entries from APP1-APP5 to 1TO5 and APP6-APP10 to 6TO10. We enable fileUuid and use idleTimeout to close files at some point while stream is still deployed.
Note
|
If defined list is not resolved, simple ‘list’ will be used instead of ‘XXX_list’. This works as a fallback for data outside of specified lists. |
Path part with list() partitioning is suffixed with “_list”, these suffixes were chosen to mimic kitesdk.
xd:>stream create --name test5 --definition "reactor-ip | hdfs --idleTimeout=30000 --fileUuid=true --partitionPath=path(dateFormat('yyyy/MM/dd'),list(payload.split('\u0001')[0],{{'1TO5','APP1','APP2','APP3','APP4','APP5'},{'6TO10','APP6','APP7','APP8','APP9','APP10'}}))"
xd:>stream deploy --name test5 --properties "module.hdfs.count=3,module.reactor-ip.count=1"
Send sample data to reactor-ip
source.
# cat app1to10-1000000.txt | nc localhost 3000
When all data is ingested into sinks, we should eventually see files to be closed and data partitioned into 6 different files. You could have more files if stream chokes and timeout occurs with a writer.
xd:>hadoop fs ls --recursive true --dir /xd/test5
0 2014-05-22 20:47 /xd/test5/2014
0 2014-05-22 20:47 /xd/test5/2014/05
0 2014-05-22 20:47 /xd/test5/2014/05/22
0 2014-05-22 20:56 /xd/test5/2014/05/22/1TO5_list
2225054 2014-05-22 20:56 /xd/test5/2014/05/22/1TO5_list/test5-376d862c-9fd9-4639-8e12-3d6e604985d5-0.txt
2034890 2014-05-22 20:56 /xd/test5/2014/05/22/1TO5_list/test5-7be2abd1-3ac8-41f4-8668-451b25fc1068-0.txt
2232880 2014-05-22 20:56 /xd/test5/2014/05/22/1TO5_list/test5-c31e86f1-cf72-4a16-94dd-c4d72a8e3244-0.txt
0 2014-05-22 20:56 /xd/test5/2014/05/22/6TO10_list
2261695 2014-05-22 20:56 /xd/test5/2014/05/22/6TO10_list/test5-376d862c-9fd9-4639-8e12-3d6e604985d5-0.txt
2070202 2014-05-22 20:56 /xd/test5/2014/05/22/6TO10_list/test5-7be2abd1-3ac8-41f4-8668-451b25fc1068-0.txt
2275437 2014-05-22 20:56 /xd/test5/2014/05/22/6TO10_list/test5-c31e86f1-cf72-4a16-94dd-c4d72a8e3244-0.txt
Let’s just check one of these files to see that data for APP1-APP5 were partitioned correctly
xd:>hadoop fs copyToLocal --from /xd/test5/2014/05/22/1TO5_list/test5-376d862c-9fd9-4639-8e12-3d6e604985d5-0.txt --to /tmp
$ tail -10 /tmp/test5-376d862c-9fd9-4639-8e12-3d6e604985d5-0.txt
APP5foobar
APP3foobar
APP5foobar
APP3foobar
APP1foobar
APP3foobar
APP3foobar
APP2foobar
APP2foobar
APP2foobar
In this sample we take a simple counter data from 1 to 7500 prefixed with ‘XXX’ and partition by an Integer range:
XXX1
…
XXX1234
…
XXX7500
The range() partition function takes a key as first argument and list as a second argument. Behind the scenes this is using jvm’s binarySearch which works on an Object level so we can pass in anything. Thought meaningful range match only works if passed in Object are of same type like Integers. Range is defined by a binarySearch itself so mostly it is to match against an upper bound except the last range in a list. Having a list of {1000,3000,5000}
means that everything above 3000 will be matched with 5000. If that is an issue then simply adding Integer.MAX_VALUE as last range would overflow everything above 5000 into a new partition.
Path part with range() partitioning is suffixed with “_range”.
xd:>stream create --name test6 --definition "reactor-ip | hdfs --idleTimeout=30000 --fileUuid=true --partitionPath=path(dateFormat('yyyy/MM/dd'),range(T(Integer).parseInt(payload.substring(3)),{1000,3000,5000}))"
xd:>stream deploy --name test6 --properties "module.hdfs.count=3,module.reactor-ip.count=1"
Send sample data to reactor-ip
source.
# cat counters-7500.txt | nc localhost 3000
These 7500 data items would then go into 3 different partitions and every writer would naturally have its own partition files totalling of 9 files with 3 containers.
xd:>hadoop fs ls --recursive true --dir /xd/test6
0 2014-05-22 21:50 /xd/test6/2014
0 2014-05-22 21:50 /xd/test6/2014/05
0 2014-05-22 21:50 /xd/test6/2014/05/22
0 2014-05-22 21:51 /xd/test6/2014/05/22/1000_range
2126 2014-05-22 21:51 /xd/test6/2014/05/22/1000_range/test6-17590e6b-78e6-42c3-a8be-419a4376e3c9-0.txt
2313 2014-05-22 21:51 /xd/test6/2014/05/22/1000_range/test6-43044089-0f93-423a-afae-f191f50e7bd8-0.txt
2454 2014-05-22 21:51 /xd/test6/2014/05/22/1000_range/test6-4fc6a25d-d24f-413f-aeb2-0e33efde037a-0.txt
0 2014-05-22 21:51 /xd/test6/2014/05/22/3000_range
4576 2014-05-22 21:51 /xd/test6/2014/05/22/3000_range/test6-17590e6b-78e6-42c3-a8be-419a4376e3c9-0.txt
5744 2014-05-22 21:51 /xd/test6/2014/05/22/3000_range/test6-43044089-0f93-423a-afae-f191f50e7bd8-0.txt
5680 2014-05-22 21:51 /xd/test6/2014/05/22/3000_range/test6-4fc6a25d-d24f-413f-aeb2-0e33efde037a-0.txt
0 2014-05-22 21:51 /xd/test6/2014/05/22/5000_range
9816 2014-05-22 21:51 /xd/test6/2014/05/22/5000_range/test6-17590e6b-78e6-42c3-a8be-419a4376e3c9-0.txt
12616 2014-05-22 21:51 /xd/test6/2014/05/22/5000_range/test6-43044089-0f93-423a-afae-f191f50e7bd8-0.txt
13568 2014-05-22 21:51 /xd/test6/2014/05/22/5000_range/test6-4fc6a25d-d24f-413f-aeb2-0e33efde037a-0.txt
In this we take a simple counter data which have two fields separate by comma, first field is an app id(APP1 - APP100) and second field a counter (1 - 10000):
APP2,1
APP88,2
APP42,3
APP8,4
…
APP47,9998
APP57,9999
APP33,10000
The focus here is to have an even distribution of partition files, so that when files are processed we would not have mixed small and large files. We know that there are 10000 items in this file and 100 different app id’s. Let’s just try to partition with Object hashCode and its modulo with 7 buckets.
Hashing is using “Object.hashCode() % buckets”. Path part with range() partitioning is suffixed with “_hash”.
xd:>stream create --name test7 --definition "reactor-ip | hdfs --idleTimeout=30000 --fileUuid=true --partitionPath=path(dateFormat('yyyy/MM/dd'),hash(payload.split(',')[0],7))"
xd:>stream deploy --name test7 --properties "module.hdfs.count=3,module.reactor-ip.count=1"
Send sample data to reactor-ip
source.
# cat hash-10000.txt | nc localhost 3000
Checking the files sizes on left side shows that if bucket size is chosen wisely, depending on a data, we should get pretty good distribution into 7 buckets throughout 3 writers into total of 21 files.
xd:>hadoop fs ls --recursive true --dir /xd/test7
0 2014-05-22 22:37 /xd/test7/2014
0 2014-05-22 22:37 /xd/test7/2014/05
0 2014-05-22 22:37 /xd/test7/2014/05/22
0 2014-05-22 22:38 /xd/test7/2014/05/22/0_hash
5627 2014-05-22 22:38 /xd/test7/2014/05/22/0_hash/test7-50d88b45-9870-4c32-93ce-14ce01d46937-0.txt
5213 2014-05-22 22:38 /xd/test7/2014/05/22/0_hash/test7-567a420d-b2a5-45af-bc26-15199d220ebc-0.txt
5194 2014-05-22 22:38 /xd/test7/2014/05/22/0_hash/test7-9dae96c5-bde7-4e0d-b7bb-c82ff4d1115d-0.txt
0 2014-05-22 22:38 /xd/test7/2014/05/22/1_hash
6033 2014-05-22 22:38 /xd/test7/2014/05/22/1_hash/test7-50d88b45-9870-4c32-93ce-14ce01d46937-0.txt
5274 2014-05-22 22:38 /xd/test7/2014/05/22/1_hash/test7-567a420d-b2a5-45af-bc26-15199d220ebc-0.txt
5516 2014-05-22 22:38 /xd/test7/2014/05/22/1_hash/test7-9dae96c5-bde7-4e0d-b7bb-c82ff4d1115d-0.txt
0 2014-05-22 22:38 /xd/test7/2014/05/22/2_hash
4836 2014-05-22 22:38 /xd/test7/2014/05/22/2_hash/test7-50d88b45-9870-4c32-93ce-14ce01d46937-0.txt
4356 2014-05-22 22:38 /xd/test7/2014/05/22/2_hash/test7-567a420d-b2a5-45af-bc26-15199d220ebc-0.txt
4469 2014-05-22 22:38 /xd/test7/2014/05/22/2_hash/test7-9dae96c5-bde7-4e0d-b7bb-c82ff4d1115d-0.txt
0 2014-05-22 22:38 /xd/test7/2014/05/22/3_hash
5478 2014-05-22 22:38 /xd/test7/2014/05/22/3_hash/test7-50d88b45-9870-4c32-93ce-14ce01d46937-0.txt
4934 2014-05-22 22:38 /xd/test7/2014/05/22/3_hash/test7-567a420d-b2a5-45af-bc26-15199d220ebc-0.txt
4562 2014-05-22 22:38 /xd/test7/2014/05/22/3_hash/test7-9dae96c5-bde7-4e0d-b7bb-c82ff4d1115d-0.txt
0 2014-05-22 22:38 /xd/test7/2014/05/22/4_hash
5152 2014-05-22 22:38 /xd/test7/2014/05/22/4_hash/test7-50d88b45-9870-4c32-93ce-14ce01d46937-0.txt
4644 2014-05-22 22:38 /xd/test7/2014/05/22/4_hash/test7-567a420d-b2a5-45af-bc26-15199d220ebc-0.txt
5252 2014-05-22 22:38 /xd/test7/2014/05/22/4_hash/test7-9dae96c5-bde7-4e0d-b7bb-c82ff4d1115d-0.txt
0 2014-05-22 22:38 /xd/test7/2014/05/22/5_hash
5531 2014-05-22 22:38 /xd/test7/2014/05/22/5_hash/test7-50d88b45-9870-4c32-93ce-14ce01d46937-0.txt
4690 2014-05-22 22:38 /xd/test7/2014/05/22/5_hash/test7-567a420d-b2a5-45af-bc26-15199d220ebc-0.txt
4974 2014-05-22 22:38 /xd/test7/2014/05/22/5_hash/test7-9dae96c5-bde7-4e0d-b7bb-c82ff4d1115d-0.txt
0 2014-05-22 22:38 /xd/test7/2014/05/22/6_hash
5651 2014-05-22 22:38 /xd/test7/2014/05/22/6_hash/test7-50d88b45-9870-4c32-93ce-14ce01d46937-0.txt
5195 2014-05-22 22:38 /xd/test7/2014/05/22/6_hash/test7-567a420d-b2a5-45af-bc26-15199d220ebc-0.txt
5475 2014-05-22 22:38 /xd/test7/2014/05/22/6_hash/test7-9dae96c5-bde7-4e0d-b7bb-c82ff4d1115d-0.txt
In this we take a simple counter data which have two fields separated by comma, first field is a date yyyy-MM-dd
and second field is a counter 1 - 10000
:
1970-01-01,1
1970-01-01,2
1970-01-01,3
…
1970-01-06,9998
1970-01-06,9999
1970-01-06,10000
On default if a second argument passed to dateFormat() function is a String it is treated as an timestamp which is then used to convert against first argument instead of assuming that data conversion should be resolved from a SI message timestamp.
Note
|
See partition paths resolved to 70’s instead of present day. |
xd:>stream create --name test8 --definition "reactor-ip | hdfs --idleTimeout=30000 --fileUuid=true --partitionPath=path(dateFormat('yyyy/MM/dd',payload.split(',')[0]))"
xd:>stream deploy --name test8 --properties "module.hdfs.count=3,module.reactor-ip.count=1"
Send sample data to reactor-ip
source.
# cat date-counters-default-10000.txt | nc localhost 3000
List files and see how data is partitioned.
xd:>hadoop fs ls --recursive true --dir /xd/test8
0 2014-05-23 12:41 /xd/test8
0 2014-05-23 12:41 /xd/test8/1970
0 2014-05-23 12:41 /xd/test8/1970/01
0 2014-05-23 12:42 /xd/test8/1970/01/01
11888 2014-05-23 12:42 /xd/test8/1970/01/01/test8-0bbc109f-5261-4fdc-a3d5-0b84a7f04c8d-0.txt
7177 2014-05-23 12:42 /xd/test8/1970/01/01/test8-9658625c-b6ac-443c-b691-7db5c1605e60-0.txt
6308 2014-05-23 12:42 /xd/test8/1970/01/01/test8-99de2617-c0c5-4d7e-a873-786d50ffff3a-0.txt
0 2014-05-23 12:42 /xd/test8/1970/01/02
11776 2014-05-23 12:42 /xd/test8/1970/01/02/test8-0bbc109f-5261-4fdc-a3d5-0b84a7f04c8d-0.txt
8320 2014-05-23 12:42 /xd/test8/1970/01/02/test8-9658625c-b6ac-443c-b691-7db5c1605e60-0.txt
7552 2014-05-23 12:42 /xd/test8/1970/01/02/test8-99de2617-c0c5-4d7e-a873-786d50ffff3a-0.txt
0 2014-05-23 12:42 /xd/test8/1970/01/03
10704 2014-05-23 12:42 /xd/test8/1970/01/03/test8-0bbc109f-5261-4fdc-a3d5-0b84a7f04c8d-0.txt
9280 2014-05-23 12:42 /xd/test8/1970/01/03/test8-9658625c-b6ac-443c-b691-7db5c1605e60-0.txt
7664 2014-05-23 12:42 /xd/test8/1970/01/03/test8-99de2617-c0c5-4d7e-a873-786d50ffff3a-0.txt
0 2014-05-23 12:42 /xd/test8/1970/01/04
9424 2014-05-23 12:42 /xd/test8/1970/01/04/test8-0bbc109f-5261-4fdc-a3d5-0b84a7f04c8d-0.txt
8912 2014-05-23 12:42 /xd/test8/1970/01/04/test8-9658625c-b6ac-443c-b691-7db5c1605e60-0.txt
9312 2014-05-23 12:42 /xd/test8/1970/01/04/test8-99de2617-c0c5-4d7e-a873-786d50ffff3a-0.txt
0 2014-05-23 12:42 /xd/test8/1970/01/05
10272 2014-05-23 12:42 /xd/test8/1970/01/05/test8-0bbc109f-5261-4fdc-a3d5-0b84a7f04c8d-0.txt
8640 2014-05-23 12:42 /xd/test8/1970/01/05/test8-9658625c-b6ac-443c-b691-7db5c1605e60-0.txt
8736 2014-05-23 12:42 /xd/test8/1970/01/05/test8-99de2617-c0c5-4d7e-a873-786d50ffff3a-0.txt
0 2014-05-23 12:42 /xd/test8/1970/01/06
8385 2014-05-23 12:42 /xd/test8/1970/01/06/test8-0bbc109f-5261-4fdc-a3d5-0b84a7f04c8d-0.txt
7008 2014-05-23 12:42 /xd/test8/1970/01/06/test8-9658625c-b6ac-443c-b691-7db5c1605e60-0.txt
7536 2014-05-23 12:42 /xd/test8/1970/01/06/test8-99de2617-c0c5-4d7e-a873-786d50ffff3a-0.txt
Same as test 8 but first field in a data has a different format.
In this we take a simple counter data which have two fields separated by comma, first field is a date yyyy-MM-dd_HH:MM:SS
and second field is a counter 1 - 10000
:
1970-01-01_01:00:50,1
1970-01-01_01:01:40,2
1970-01-01_01:02:30,3
…
1970-01-06_01:02:30,9998
1970-01-06_19:52:30,9999
1970-01-06_19:53:20,10000
Third parameter in dateFormat() can be a representation of a custom format used to parse a value from a second parameter. This would allow to parse any supported date/timestamp out from a message payload and translate it to a partition path which is i.e. based on log entry’s timestamp instead of SI’s Message timestamp.
xd:>stream create --name test9 --definition "reactor-ip | hdfs --idleTimeout=30000 --fileUuid=true --partitionPath=path(dateFormat('yyyy/MM/dd',payload.split(',')[0],'yyyy-MM-DD_HH:MM:SS'))"
xd:>stream deploy --name test9 --properties "module.hdfs.count=3,module.reactor-ip.count=1"
Send sample data to reactor-ip
source.
# cat date-counters-custom-10000.txt | nc localhost 3000
List files and see how data is partitioned.
xd:>hadoop fs ls --recursive true --dir /xd
0 2014-05-23 11:18 /xd/app
2789 2014-05-23 11:18 /xd/app/modules.yml
3451 2014-05-23 11:18 /xd/app/servers.yml
18657 2014-05-23 11:18 /xd/app/spring-xd-yarn-1.0.0.BUILD-SNAPSHOT.zip
60819 2014-05-23 11:18 /xd/app/spring-xd-yarn-appmaster-1.0.0.BUILD-SNAPSHOT.jar
0 2014-05-23 13:32 /xd/test9
0 2014-05-23 13:32 /xd/test9/1970
0 2014-05-23 13:32 /xd/test9/1970/01
0 2014-05-23 13:33 /xd/test9/1970/01/01
16912 2014-05-23 13:33 /xd/test9/1970/01/01/test9-33e19563-cb6b-4bbd-a058-736f4d486640-0.txt
11500 2014-05-23 13:33 /xd/test9/1970/01/01/test9-3e3a5d16-ac8f-4274-bff8-4203b338880d-0.txt
11856 2014-05-23 13:33 /xd/test9/1970/01/01/test9-a192f6d7-9312-4fec-b730-6c4e5206232c-0.txt
0 2014-05-23 13:33 /xd/test9/1970/01/02
16400 2014-05-23 13:33 /xd/test9/1970/01/02/test9-33e19563-cb6b-4bbd-a058-736f4d486640-0.txt
13675 2014-05-23 13:33 /xd/test9/1970/01/02/test9-3e3a5d16-ac8f-4274-bff8-4203b338880d-0.txt
13125 2014-05-23 13:33 /xd/test9/1970/01/02/test9-a192f6d7-9312-4fec-b730-6c4e5206232c-0.txt
0 2014-05-23 13:33 /xd/test9/1970/01/03
14175 2014-05-23 13:33 /xd/test9/1970/01/03/test9-33e19563-cb6b-4bbd-a058-736f4d486640-0.txt
14775 2014-05-23 13:33 /xd/test9/1970/01/03/test9-3e3a5d16-ac8f-4274-bff8-4203b338880d-0.txt
14250 2014-05-23 13:33 /xd/test9/1970/01/03/test9-a192f6d7-9312-4fec-b730-6c4e5206232c-0.txt
0 2014-05-23 13:33 /xd/test9/1970/01/04
18350 2014-05-23 13:33 /xd/test9/1970/01/04/test9-33e19563-cb6b-4bbd-a058-736f4d486640-0.txt
12425 2014-05-23 13:33 /xd/test9/1970/01/04/test9-3e3a5d16-ac8f-4274-bff8-4203b338880d-0.txt
12425 2014-05-23 13:33 /xd/test9/1970/01/04/test9-a192f6d7-9312-4fec-b730-6c4e5206232c-0.txt
0 2014-05-23 13:33 /xd/test9/1970/01/05
17675 2014-05-23 13:33 /xd/test9/1970/01/05/test9-33e19563-cb6b-4bbd-a058-736f4d486640-0.txt
14025 2014-05-23 13:33 /xd/test9/1970/01/05/test9-3e3a5d16-ac8f-4274-bff8-4203b338880d-0.txt
11500 2014-05-23 13:33 /xd/test9/1970/01/05/test9-a192f6d7-9312-4fec-b730-6c4e5206232c-0.txt
0 2014-05-23 13:33 /xd/test9/1970/01/06
16051 2014-05-23 13:33 /xd/test9/1970/01/06/test9-33e19563-cb6b-4bbd-a058-736f4d486640-0.txt
9875 2014-05-23 13:33 /xd/test9/1970/01/06/test9-3e3a5d16-ac8f-4274-bff8-4203b338880d-0.txt
9900 2014-05-23 13:33 /xd/test9/1970/01/06/test9-a192f6d7-9312-4fec-b730-6c4e5206232c-0.txt