Java distream: data continuous stream processing, different java stream


I still prefer the streaming data processing of java8 stream. Its map, filter and other operations make me look at Java again. It's like a flash in front of you when you accidentally meet your beautiful ex girlfriend after breaking up for many years (I'm talking nonsense, don't take it seriously!!!)

However, after a period of time together, I found that it was still a little painful to use (or maybe I couldn't use it), such as:

  • Cannot be continuous and concise Come out and operate
  • It is difficult to modify the values in the list in batches

Therefore, combined with the use scenario, I developed Distream simply and rudely, hoping that it can realize the real data flow silk sliding processing

Brief instructions

1. Comprehensive case

Suppose there is an Entity list listframe < Entity > entities, and the Entity is as follows:

public class Entity{
	private String name;
	private Double value;
	private Double percent;

The required operation steps are to traverse entities, and then:

  • The value of value keeps two decimal places
  • Assign "" when name is null
  • When value is null, assign a value of 0, otherwise add 2
  • Replace the # in name
  • Calculate the proportion of value and assign it to percent
  • Finally, calculate the sum of percent according to the name grouping

Then, the above six operation steps can be written as follows:

List<Entity> entityList = xxxx;
MapFrame<Object,Double> groups = ListFrame.fromList(entityList )

You can also combine the expression as:

MapFrame<Object,Double> groups = entities

2. Functions used within expressions

Character expressions can be used not only for object types, but also for maps. The handle method will process each element in the list

ListFrame<Map<String, Object>> lines = xxx;
/*code The value of is converted to int and assigned to ID, which is equivalent to line put("id",Integer.valueOf(code))*/
lines = lines.handle("id=int(code)");

/*value The value of is changed to double and assigned to percent, which is equivalent to line put("percent",Double.valueOf(code))*/
lines = lines.handle("percent=double(value)");

/*value The value of is converted to string and assigned to name, which is equivalent to line put("name",value+"")*/
lines = lines.handle("name=string(value)");

/*substring equivalent to string*/
lines = lines.handle("name=substring(name,1,2)");

/*Replace "xxx" of name with "yyy"*/
lines = lines.handle("name=replace(name,'xxx','yyy')");
/*Replace "xxx" of name with ""*/
lines = lines.handle("name=name-'xxx'");

/*Similar to ndexof*/
lines = lines.handle("id=index(name,'xxx')");

/*percent Keep two decimal places*/
lines = lines.handle("percent=format(percent,2)");

3. Find the most value

The maximum value is valid only for numeric types

List<Integer> list = Arrays.asList(2,3,4,5,8,9,4);
ListFrame<Integer> listFrame = ListFrame.fromList(list );
int max = listFrame.max();
int min= listFrame.min();
int avg = listFrame.avg();
double sum= listFrame.sum();

Maximum index

/*Index position of maximum value*/
int argmax= listFrame.argmax();
/*The location of the minimum index value*/
int argmin= listFrame.argmin();

4. Type conversion

List<String> list = Arrays.asList("1","2","3","4");
ListFrame<Integer> listFrame = ListFrame.fromList(list );
ListFrame<Integer> listInt= listFrame.asInteger();
ListFrame<Double> listDouble= listFrame.asDouble();
ListFrame<Float> listFloat= listFrame.asFloat();
ListFrame<String> listString= listFloat.asString();

5. Number of statistical elements

List<Integer> list = Arrays.asList(2,2,2,4);
MapFrame<Integer,Integer> listFrame = ListFrame.fromList(list).frequency()
/*Get map {2=3,4=1}*/

6. Variance and standard deviation

List<Integer> list = Arrays.asList(2,2,2,4);
ListFrame<Integer> listFrame = ListFrame.fromList(list );
listFrame.standardDeviation();//standard deviation

7. Weight removal

List<Integer> list = Arrays.asList(2,2,2,4);
ListFrame<Integer> listFrame = ListFrame.fromList(list ).distinct();

8. Grouping statistics

MapFrame<Object, ListFrame> agesGroup = lines.groupBy("Age");
MapFrame<Object, Integer> count = agesGroup.count();
MapFrame<Object, Double> incomeAvg = agesGroup.avg("income");
MapFrame<Object, Double> incomeSum = agesGroup.sum("income");
MapFrame<Object, ListFrame> incomeConcat = agesGroup.concat("income");
/*Continuous grouping*/
MapFrame<Object, MapFrame<Object, ListFrame>> incomeAgeConcat = lines.groupBy("income").groupBy("Age");

9. map and object transformation

ListFrame<Map> maps= ListFrame.readMap(path);
ListFrame<User> users = maps.toObjectList(User.class);
ListFrame<Map> map2s = users.toMapList();
/*Continuous conversion*/
ListFrame<User2> user2s = lines.toObjectList(User.class).toObjectList(User2.class);

10. File data reading

Read file by line

ListFrame<String> lines = ListFrame.readString("test.txt");

Read csv file as map (or object)

Serial number,full name,Age,income
1,Zhang San,23,5000.11
2,Li Si,22,4000.22
3,Li Ergou,20,5000.33
//ListFrame<Map<String, Object>> lines = ListFrame.readMap(path);
//ListFrame<Map<String, Object>> lines = ListFrame.readMap(path,",");
//Read and specify data type
ListFrame<Map<String, Object>> lines = ListFrame.readMap(path,new Class[]{Integer.class,String.class,Integer.class,Double.class});
lines = lines
        .handle("Serial number='0'+Serial number;full name=Serial number+full name")//add "0" at the front of serial number; rename name by serial number + name
        .handle(new MapHandler());//If the expression is inconvenient to handle, you can customize the processor and implement DataHandler
public class MapHandler implements DataHandler<Map<String, Object>> {
    public Map<String, Object> handle(Map<String, Object> line) {
        return line;

11. Database read

Datasource datesource = xxx;
ListFrame list = new ListFrame();
//Recommend new DruidDataSource()
lines = list.readSql("select * from xxx").handle(a->...).handle(a->...)...;

Documentation and open source address

Open source address and instructions

Welcome to make suggestions, thank you!

