Java kan parallellisera strömningsoperationer för att utnyttja flerkärniga system. Den här artikeln ger ett perspektiv och visar hur parallellström kan förbättra prestandan med lämpliga exempel.
Strömmar i Java
En ström i Java är en sekvens av objekt representerad som en kanal av data. Den har vanligtvis en källa var data finns och en destination där den sänds. Observera att en ström inte är ett arkiv; i stället arbetar den på en datakälla som på en array eller en samling. Mellanbitarna i passagen kallas faktiskt strömmen. Under överföringsprocessen går strömmen vanligtvis igenom en eller flera möjliga transformationer, såsom filtrering eller sortering, eller det kan vara vilken annan process som helst som arbetar på data. Detta anpassar originaldata till en annan form, vanligtvis efter programmerarens behov. Därför skapas en ny ström enligt operationen som tillämpas på den. Till exempel, när en ström sorteras, resulterar det i en ny ström som ger ett resultat som sedan sorteras. Detta innebär att den nya datan är en transformerad kopia av originalet snarare än att vara i originalform.
Sekventiell ström
Alla strömningsoperationer i Java, såvida de inte uttryckligen anges som parallella, bearbetas sekventiellt. De är i princip icke-parallella strömmar som använder en enda tråd för att bearbeta sin pipeline. Sekventiella strömmar drar aldrig fördel av multicore-systemet även om det underliggande systemet kan stödja parallell exekvering. Vad händer till exempel när vi använder multithreading för att bearbeta strömmen? Även då fungerar den på en enda kärna åt gången. Det kan dock hoppa från en kärna till en annan om den inte uttryckligen är fäst vid en specifik kärna. Till exempel är bearbetning i fyra olika trådar kontra fyra olika kärnor uppenbarligen olika där den förra inte matchar den senare. Det är fullt möjligt att köra flera trådar i en enda kärnmiljö men parallell bearbetning är en helt annan genre. Ett program måste designas för parallell programmering förutom att köras i en miljö som stöder det. Detta är anledningen till att parallell programmering är en komplex arena.
Låt oss prova ett exempel för att illustrera idén ytterligare.
package org.mano.example; import java.util.Arrays; import java.util.List; public class Main2 { public static oid main(String[] args) { List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9); list.stream().forEach(System.out::println); System.out.println(); list.parallelStream().forEach(System.out::println); } }
Utdata
123456789 685973214
Detta exempel är en illustration av q sekventiell ström såväl som q parallell ström i drift. list.stream() fungerar i sekvens på en enda tråd med println() drift. list.parallelStream() , å andra sidan, bearbetas parallellt och drar full nytta av den underliggande flerkärniga miljön. Den intressanta aspekten är resultatet av föregående program. I fallet med en sekventiell ström skrivs innehållet i listan ut i en ordnad sekvens. Utsignalen från den parallella strömmen är å andra sidan oordnad och sekvensen ändras varje gång programmet körs. Detta betyder åtminstone en sak:anropet av list.parallelStream() metoden gör println satsen fungerar i flera trådar, något som list.stream() gör i en enda tråd.
Parallell ström
Den primära motivationen bakom att använda en parallell ström är att göra strömbehandling till en del av den parallella programmeringen, även om hela programmet kanske inte är parallelliserat. Parallell ström utnyttjar flerkärniga processorer, vilket resulterar i en avsevärd ökning av prestanda. Till skillnad från all parallell programmering är de komplexa och felbenägna. Java-strömbiblioteket ger dock möjligheten att göra det enkelt och på ett tillförlitligt sätt. Hela programmet får inte parallelliseras. men åtminstone den del som hanterar strömmen kan parallelliseras. De är faktiskt ganska enkla i den meningen att vi kan åberopa några metoder och resten sköts. Det finns ett par sätt att göra det. Ett sådant sätt är att få en parallell ström genom att anropa parallelStream() metod definierad av Insamling . Ett annat sätt är att anropa parallel() metod definierad av BaseStream på en sekventiell ström. Den sekventiella strömmen parallelliseras av anropet. Observera att den underliggande plattformen måste stödja parallell programmering, till exempel med ett flerkärnigt system. Annars är det ingen mening med åkallan. Strömmen skulle bearbetas i sekvens i ett sådant fall, även om vi har gjort anropet. Om anropet görs på en redan parallell ström gör den ingenting och returnerar helt enkelt strömmen.
För att säkerställa att resultatet av parallell bearbetning som tillämpas på strömmen är detsamma som erhålls genom sekventiell bearbetning, måste parallella strömmar vara tillståndslösa, icke-störande och associativa.
Ett snabbt exempel
package org.mano.example; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; public class Main { public static void main(String[] args) { List<Employee> employees = Arrays.asList( new Employee(1276, "FFF",2000.00), new Employee(7865, "AAA",1200.00), new Employee(4975, "DDD",3000.00), new Employee(4499, "CCC",1500.00), new Employee(9937, "GGG",2800.00), new Employee(5634, "HHH",1100.00), new Employee(9276, "BBB",3200.00), new Employee(6852, "EEE",3400.00)); System.out.println("Original List"); printList(employees); // Using sequential stream long start = System.currentTimeMillis(); List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.println("sorted using sequential stream"); printList(sortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); // Using parallel stream start = System.currentTimeMillis(); List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); end = System.currentTimeMillis(); System.out.println("sorted using parallel stream"); printList(anotherSortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); double totsal=employees.parallelStream() .map(e->e.getSalary()) .reduce(0.00,(a1,a2)->a1+a2); System.out.println("Total Salary expense: "+totsal); Optional<Employee> maxSal=employees.parallelStream() .reduce((Employee e1, Employee e2)-> e1.getSalary()<e2.getSalary()?e2:e1); if(maxSal.isPresent()) System.out.println(maxSal.get().toString()); } public static void printList(List<Employee> list) { for (Employee e : list) System.out.println(e.toString()); } } package org.mano.example; public class Employee { private int empid; private String name; private double salary; public Employee() { super(); } public Employee(int empid, String name, double salary) { super(); this.empid = empid; this.name = name; this.salary = salary; } public int getEmpid() { return empid; } public void setEmpid(int empid) { this.empid = empid; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getSalary() { return salary; } public void setSalary(double salary) { this.salary = salary; } @Override public String toString() { return "Employee [empid=" + empid + ", name=" + name + ", salary=" + salary + "]"; } }
I föregående kod, notera hur vi har tillämpat sortering på en ström en genom att använda sekventiell exekvering.
List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
och parallell exekvering uppnås genom att ändra koden något.
List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
Vi kommer också att jämföra systemtiden för att få en uppfattning om vilken del av koden som tar längre tid. Parallell operation börjar när den parallella strömmen explicit erhålls av parallelStream() metod. Det finns en annan intressant metod, kallad reduce() . När vi tillämpar denna metod på en parallell ström kan operationen ske i olika trådar.
Men vi kan alltid växla mellan parallell och sekventiell efter behov. Om vi vill ändra den parallella strömmen till sekventiell kan vi göra det genom att anropa sequential() metod specificerad av BaseStream . Som vi såg i vårt första program kan operationen som utförs på strömmen beställas eller oordnad enligt elementens ordning. Det betyder att ordningen beror på datakällan. Detta är dock inte fallet när det gäller parallella vattendrag. För att öka prestandan bearbetas de parallellt. Eftersom detta görs utan någon sekvens, där varje partition i strömmen bearbetas oberoende av de andra partitionerna utan någon koordination, blir konsekvensen oförutsägbart oordnad. Men om vi specifikt vill utföra en operation på varje element i den parallella strömmen som ska beställas, kan vi överväga forEachOrdered() metod, som är ett alternativ till forEach() metod.
Slutsats
Stream-API:erna har varit en del av Java under lång tid, men att lägga till tweaken av parallell bearbetning är mycket välkomnande, och samtidigt en ganska spännande funktion. Detta är särskilt sant eftersom moderna maskiner är flerkärniga och det finns ett stigma att parallell programmeringsdesign är komplex. API:erna som tillhandahålls av Java ger möjligheten att införliva en nyans av parallella programmeringsjusteringar i ett Java-program som har den övergripande designen av sekventiell exekvering. Det här är kanske den bästa delen av den här funktionen.