Skip to content

Instantly share code, notes, and snippets.

@zhongwm
Created November 6, 2014 09:34
Show Gist options
  • Save zhongwm/612ec40b32034575ad43 to your computer and use it in GitHub Desktop.
Save zhongwm/612ec40b32034575ad43 to your computer and use it in GitHub Desktop.
gist for synchronization
package icej1
import Ice.Current
import apps.Item
import scala.collection.mutable
import scala.collection.JavaConverters._
/**
*
* Created by zwm on 2014/11/6.
*/
object ConcurrentCase extends App {
// var invMap = new mutable.HashMap[String, Item] with mutable.SynchronizedMap[String, Item]
// var invList = new mutable.ArrayBuffer[Item] with mutable.SynchronizedBuffer[Item]
var invList = new java.util.concurrent.ConcurrentLinkedQueue[Item]().asScala // actually not reliable!
var invMap = new java.util.concurrent.ConcurrentHashMap[String, Item].asScala // actully not reliable!
println(System.identityHashCode(invList))
println("==============new init============")
def addItems(items: Array[Item]): Unit = {
// invList.synchronized {
// the sequential way
//invList ++= items
//items.foreach( item => invMap(item.sku) = item)
invList ++= items
println(System.identityHashCode(invList))
invMap ++= items.toList.foldLeft(mutable.Map[String, Item]())((map, item) => map + (item.sku -> item))
println(invList.size)
println("invMap: " + invMap.size)
println(System.identityHashCode(invMap))
// }
}
def submit(): Unit = {
println("Submitted")
//println(this)`````
}
class WorkerThread extends Runnable {
override def run(): Unit = {
var items = Array[Item](new Item("Jiaozi", 33), new Item("Roast duck", 38))
items = items :+ new Item("Noodle", 28)
addItems(items)
submit()
}
}
(1 to 4).toList.toParArray.foreach(x => {
val t: Thread = new Thread(new WorkerThread)
println(x)
t.start()
t.join()
})
}
@Centaur
Copy link

Centaur commented Nov 6, 2014

帮你把代码整理了一下,并增加了一些调度上的随机性,有助于你理解问题。你需要知道以下几点:

  1. addItems 不是线程安全的。
  2. invList 的类型是 Iterable[Item],它有 ++ 方法,invList ++= ItemsinvList = invList ++ Items 的语法糖,因此它会更改 invList 所指向的实例。
  3. invMap的类型是 scala.collection.concurrent.Map[String, Item],它有++=方法,不是语法糖,因此 invMap ++= items.map(item => item.sku -> item).toMap 不会改变 invMap 所指向的实例。
import scala.collection.JavaConverters._

object ConcurrentCase extends App {
  case class Item(sku: String, age: Int)

  var invList = new java.util.concurrent.ConcurrentLinkedQueue[Item]().asScala // actually not reliable!
  var invMap = new java.util.concurrent.ConcurrentHashMap[String, Item].asScala // actully not reliable!
  println("==============new init============")

  def addItems(items: List[Item], id: Int): Unit = {
    println(s"thread id=$id addItem started")
    println(s"thread id=$id before invList ++= items, id of invList = ${System.identityHashCode(invList)} , items = $items")
    invList ++= items
    println(s"thread id=$id after invList ++= items, id of invList = ${System.identityHashCode(invList)} , invList.size = ${invList.size}")
    println(s"thread id=$id before invMap ++= items map, id of invMap = ${System.identityHashCode(invMap)} , invMap.size = ${invMap.size} ")
    invMap ++= items.map(item => item.sku -> item).toMap
    println(s"thread id=$id after invMap ++= items map, id of invMap = ${System.identityHashCode(invMap)} , invMap.size = ${invMap.size} ")
    println(s"thread id=$id addItem ended")
  }

  class WorkerThread(val id: Int) extends Runnable {
    override def run(): Unit = {
      var items = Array[Item](new Item("Jiaozi", 33), new Item("Roast duck", 38))
      items = items :+ new Item("Noodle", 28)
      Thread.sleep((math.random*100).toInt)
      addItems(items.toList, id)
    }
  }

  (1 to 4).foreach(x => {
    val t: Thread = new Thread(new WorkerThread(x))
    println(s"thread id=$x started")
    t.start()
  })
}

@zhongwm
Copy link
Author

zhongwm commented Nov 6, 2014

var invList = new java.util.concurrent.ConcurrentLinkedQueue[Item]().asScala// actually not reliable!

这养写真的不是线程安全的。

import scala.collection.JavaConverters._
import scala.collection.mutable

object SimpleDirectParallelWay extends App {

  case class Item(sku: String, age: Int)

  var invList = new java.util.concurrent.ConcurrentLinkedQueue[Item]().asScala// actually not reliable!
  println("==============new init============")


  class WorkerThread(val id: Int) extends Runnable {
    override def run(): Unit = {
      var items = Array[Item](new Item("Jiaozi", 33), new Item("Roast duck", 38))
      items = items :+ new Item("Noodle", 28)
      invList ++= items
      println(invList)
    }
  }

  var thl = new mutable.ArrayBuffer[Thread] with mutable.SynchronizedBuffer[Thread]
  (1 to 4).toList.par.foreach(x => {
    val t: Thread = new Thread(new WorkerThread(x))
    println(s"thread id=$x started")
    t.start()
    thl += t
    printf("Master thread id: %d%n", Thread.currentThread().getId)
  })
  println("Number of threads: " + thl.size)
  thl.foreach(_.join())
  println(invList)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment