We invented threads to
But, we now have two problems related to controlling operation order
Synchronization is the mechanism threads use to
We have a problem if
Shared data
Critical Sections
Race Condition
Mutual Exclusion
For example
xxxxxxxxxx51et = uthread_create ((void* (*)(void*)) push_driver, (void*) n);2dt = uthread_create ((void* (*)(void*)) pop_driver, (void*) n);3uthread_join (et, 0);4uthread_join (dt, 0);5assert (top==0); xxxxxxxxxx41void push_st (struct SE* e) {2 e->next = top;3 top = e;4} xxxxxxxxxx51struct SE* pop_st () {2 struct SE* e = top;3 top = (top) ? top->next : 0;4 return e;5}Notice: It does work if we say uthread_init(1) But it does not work with uthread_init(2) or higher
The bug
Lock Semantics
Lock Primitives
lock acquire lock, wait if necessary
Using locks for the shared stack
xxxxxxxxxx51void push_cs (struct SE* e) {2 lock (&aLock);3 push_st (e); // Critical sections are inside the lock4 unlock (&aLock);5} xxxxxxxxxx81void SE* pop_cs () {2 struct SE* e;3 lock (&aLock);4 e = pop_st ();5 unlock (&aLock);6 7 return e;8}Here's a first cut
xxxxxxxxxx11int lock = 0; xxxxxxxxxx41void lock(int* lock) {2 while (*lock == 1) {}3 *lock = 1;4} xxxxxxxxxx31void unlock(int* lock) {2 *lock = 0;3}We now have a race in the lock code!
The race exists even at the machine-code level
We need a new instruction
Atomicity
The Atomic Memory Exchange
| Name | Semantics | Assembly |
|---|---|---|
| atomic exchange | r[v] ← m[r[a]]m[r[a]] ← r[v] | xchg (ra), rv |
A Spinlock is
Implementation using Atomic Exchange
xxxxxxxxxx61 ld $lock, r12 ld $1, r03loop: xchg (r1), r04 beq r0, held5 br loop6held:Can not be implemented just by CPU
Implemented by Memory Bus
Spin first on normal read
xxxxxxxxxx91 ld $lock, r1 # r1 = &lock2loop: ld (r1), r0 # r0 = lock3 beq r0, try # goto try if lock == 0 (available)4 br loop # goto loop if lock !=0 (held)5try: ld $1, r0 # r0 = 16 xchg (r1), r0 # atomically swap r0 and lock7 beq r0, held # goto held lock was 0 before swap8 br loop # try again if another lock holds lock9held: # we now hold the lockBusy-waiting pros and cons
If a thread may wait a long time
it should block so that other threads can run
it will then unblock when it becomes runnable
Blocking locks for mutual exclusion
attempting to acquire a held lock BLOCKs calling thread
when releasing lock, UNBLOCK a waiting thread if there is one
Blocking for event notification
Spinlocks
Busy Waiting
Pros and Cons
Use When
Blocking Locks
blocking waiting
Pros and Cons
Use when
General Problem
video playback has two parts: (1) fetch/decode and (2) playback
fetch has variable latency and so we need a buffer
Bounded Buffer and Two Independent Threads
Producer Thread
Consumer Thread
How are Producer and Consumer Connected?
Mutual exclusion plus inter-thread synchronization
Monitor / MUTEX
Condition Variable
xxxxxxxxxx161struct uthread_mutex;2typedef struct uthread_mutex* uthread_mutex_t;3struct uthread_cond;4typedef struct uthread_cond* uthread_cond_t;56uthread_mutex_t uthread_mutex_create ();7void uthread_mutex_lock (uthread_mutex_t);8void uthread_mutex_lock_readonly (uthread_mutex_t);9void uthread_mutex_unlock (uthread_mutex_t);10void uthread_mutex_destory (uthread_mutex_t);1112uthread_cond_t uthread_cond_create (uthread_mutex_t);13void uthread_cond_wait (uthread_cond_t);14void uthread_cond_signal (uthread_cond_t);15void uthread_cond_broadcast (uthread_cond_t);16void uthread_cond_destroy (uthread_cond_t); xxxxxxxxxx91struct video_frame;23struct video_frame buf [N];4int buf_length = 0;5int buf_pcur = 0;6int buf_ccur = 0;78uthread_mutex_t mx;9uthread_cond_t need_frames; xxxxxxxxxx121void producer() {2 uthread_lock (mx);3 while(1) {4 while (buf_length < N) {5 buf [pcur] = get_next_frame();6 buf_pcur = (pcur + 1) % N;7 buf_length +- 1;8 }9 uthread_cond_wait (need_frames);10 }11 uthread_unlock (mx);12}Basic formulation
xxxxxxxxxx51uthread_mutex_lock (aMutex);2 while(!aDesiredState)3 uthread_cond_wait(aCond);4 aDesiredState = 0;5uthread_mutex_unlock (aMutex); xxxxxxxxxx41uthread_mutex_lock (aMutex);2 aDesiredState = 1;3 uthread_cond_signal(aCond);4uthread_mutex_unlock (aMutex);wait releases the mutex and blocks thread
signal awakens at most one thread
broadcast awakens all threads
Beer pitcher is shared data structure with these operations
Implementation goal
Data Structure for Beer Pitcher
What if you want to refill automatically?
Blocking Read
xxxxxxxxxx81void read(char* buf, int nbytes, int blockno) {2 uthread_mutex_lock(mx);3 4 scheduleRead(buf, nbytes, blockno);5 uthread_cond_wait(readComplete);6 7 uthread_mutex_unlock(mx);8}Read Completion
xxxxxxxxxx71void readComplete() {2 uthread_mutex_lock(mx);3 4 uthread_cond_signal(readComplete);5 6 uthread_mutex_unlock(mx);7} xxxxxxxxxx61void read(char* buf, int nbytes, int blockno) {2 uthread_mutex_lock(mx);3 scheduleRead(buf, nbytes, blockno);4 uthread_cond_wait(readComplete);5 uthread_mutex_unlock(mx);6} xxxxxxxxxx41void read(char* buf, int nbytes, int blockno) {2 scheduleRead(buf, nbytes, blockNo);3 uthread_cond_wait(readComplete);4}The Problem
The Solution
Preventing the Race
Naked Notify
that's what we call a signal outside of a monitor
it's sometimes necessary
xxxxxxxxxx211void enqueue (uthread_queue_t* queue, uthread_t thread) {2 thread->next = 0;3 if (queue->tail)4 queue->tail->next = thread;5 queue->tail = thread;6 if (queue->head == 0)7 queue->head = queue->tail;8}910uthread_t* dequeue (uthread_queue_t queue) {11 uthread_t thread;12 if (queue->head) {13 thread = queue->head;14 queue->head = queue->head->next;15 if (queue->head==0)16 queue->tail=0;17 } else 18 thread = 0;19 thread->next = 0;20 return thread;21} xxxxxxxxxx221void enqueue (uthread_queue_t* queue, uthread_t thread) {2 uthread_mutex_lock(&queue->mx);3 4 thread->next = 0;5 if (queue->tail)6 queue->tail->next = thread;7 queue->tail = thread;8 if (queue->head == 0)9 queue->head = queue->tail;10 uthread_cond_signal(&queue->not_empty)11 uthread_mutex_unlock(&queue->mx);12}1314uthread_t* dequeue (uthread_queue_t queue) {15 uthread_t thread;16 uthread_mutex_lock(&queue->mx);17 while(queue->head == 0)18 19 thread->next = 0;20 uthread_mutex_unlock(&queue->mx);21 return thread;22}This code seems like it would be right
xxxxxxxxxx71void enqueue (uthread_queue_t* queue, uthread_t thread) {2 uthread_mutex_lock(&queue->mx);3 ...4 if (queue->head == 0)5 uthread_cond_signal (&queue->not_empty);6 uthread_mutex_unlock(&queue->mx);7}But it is WRONG
Lets say there are N threads waiting in dequeue on queue->not_empty
If there are N enqueues, there MUST be N signals to wakeup all the dequeues
If you get two enqueues in a row before a dequeue runs, however
you thus get N enqueues but fewer than N signals
If we classify critical sections as
Then we can weaken the mutual exclusion constraint
Read-Writer Monitors
monitor state is one of
mutex_lock()
mutex_lock_read_only()
mutex_unlock()
Policy Question
Disallowing new readers while writer is waiting
Allowing new readers while writer is waiting
What should we do
Introduced by Edsget Dijkstra
A Semaphore is
P(s) – wait(s)
V(s) – signal(s)
but
xxxxxxxxxx71struct uthread_sem;2typedef struct uthread_sem* uthread_sem_t;34uthread_sem_t uthread_sem_create (int initial_value);5void uthread_sem_destroy (uthread_sem_t);6void uthread_sem_wait (uthread_sem_t);7void uthread_sem_signal (uthread_sem_t);Use semaphore to store glasses held by pitcher
xxxxxxxxxx11uthread_sem_t glasses = uthread_sem_create (0);Pouring and refilling don't require a monitor
xxxxxxxxxx71void pour () {2 uthread_sem_wait (glasses);3}4void refill (int n) {5 for (int i = 0; i < n; i++)6 uthread_sem_signal (glasses)7}Implementing Monitors
Implementing Condition Variables
Blocking Synchronous Operations
Using Monitors and Condition Variables
to avoid wait-signal race, wait and signal must be done while mutex is held
xxxxxxxxxx61void read (...) {2 uthread_mutex_lock (mx);3 scheduleRead (buf, nbytes, bno);4 uthread_cond_wait (complete);5 uthread_mutex_unlock (mx);6} xxxxxxxxxx51void readCompletionHandler () {2 uthread_mutex_lock (mx);3 uthread_cond_signal (complete);4 uthread_mutex_unlock (mx);5}Using Semaphores
no critical section, wait-signal race problem goes away … why?
xxxxxxxxxx41void read (...) {2 scheduleRead (buf, nbyte, bno);3 uthread_sem_wait (complete);4} xxxxxxxxxx31void readCompletionHandler () {2 uthread_sem_signal (complete);3}With condition variables
xxxxxxxxxx71int dequeue (struct Q* q) {2 uthread_mutex_lock (q->mx);3 while (q->length == 0)4 uthread_cond_wait (q->notEmpty);5 ....6 uthread_mutex_unlock (q->mx);7} xxxxxxxxxx51void enqueue (struct Q* q, int i) {2 uthread_mutex_lock (q->mx);3 uthread_cond_signal (q->notEmpty);4 uthread_mutex_unlock (q->mx);5}With semaphores
xxxxxxxxxx41struct Q {2 uthread_sem_t mutex; // initialize to 13 uthread_sem_t length; // initialize to 04} xxxxxxxxxx71int dequeue (struct Q* q) {2 uthread_sem_wait (q->length);3 4 uthread_sem_wait (q->mutex);5 ...6 uthread_sem_signal (q->mutex);7} xxxxxxxxxx61void enqueue (struct Q* q, int i) {2 uthread_sem_wait (q->mutex);3 ....4 uthread_sem_signal (q->mutex);5 uthread_sem_signal (q->length);6}If you thread A to wait for thread B
xxxxxxxxxx21// Thread A2uthread_sem_wait (b); xxxxxxxxxx21// Thread B2uthread_sem_signal (b);Rendezvous: both threads wait for each other
xxxxxxxxxx31// Thread A2uthread_sem_signal (a);3uthread_sem_wait (b); xxxxxxxxxx31// Thread B2uthread_sem_signal (b);3uthread_sem_wait (a);What if you reversed the order of wait and signal on either (or both) threads
Local, Non-Reusable Barrier
xxxxxxxxxx81// create N threads2uthread_create (one, a);3uthread_create (two, b);4...5uthread_create(last, z);6// wait for them to finish7for (int i = 0; i < N; i++)8 uthread_sem_wait (barrier) xxxxxxxxxx41void* one (void* a) {2 ...3 uthread_sem_signal (barrier);4}Global, Reusable Barrier
Mutex
xxxxxxxxxx71Lock l = ...;2l.lock();3try {4 ...5} finally {6 l.unlock();7} xxxxxxxxxx91Lock l = ...;2try {3 l.lockInterruptibility();4 try {5 ...6 } finally {7 l.unlock();8 }9} catch (InterruptException ie) {} xxxxxxxxxx31ReadWriteLock l = ...;2Lock rl = l.readLock();3Lock wl = l.writeLock();Conditions
xxxxxxxxxx261class Beer {2 Lock l = ...;3 Condition notEmpty = l.newCondition ();4 int glasses = 0;5 6 void pour() throws InterruptedException {7 l.lock();8 try {9 while (glasses == 0)10 notEmpty.await();11 glasses--;12 } finally {13 l.unlock();14 }15 }16 17 void refill (int n) throws InterruptedException {18 l.lock();19 try {20 glasses += n;21 notEmpty.signalAll();22 } finally {23 l.unlock();24 }25 }26}Semaphore Class
xxxxxxxxxx111class Beer {2 Semaphore glasses = new Semaphore (0);3 4 void pour () throws InterruptedException {5 glasses.acquire();6 }7 8 void refill (int n) throws InterruptedException {9 glasses.release (n);10 }11}Lock-free Atomic Variables
AtomicX where X in {Boolean, Integer, IntegerArray, Reference, ...}
atomic operations such as getAndAdd(), compareAndSet()...
V getAndSet (V newValue)
boolean compareAndSet (V expectedValue, V newValue)
V get()
void set (V newValue)
Use instead of mutual exclusion to eliminate data races..
xxxxxxxxxx101void push_st (struct SE* e) {2 e->next = top;3 top = e;4}56struct SE* pop_st () {7 struct SE* e = top;8 top = (top) ? top->next : 0;9 return e;10} xxxxxxxxxx191class Element {2 Element* next;3}45clsas Stack {6 AtomicReference <Element> top;7 Stack () {8 top.set (NULL);9 }10 11 void push () {12 Element t;13 Element e = new Element ();14 do {15 t = top.get();16 e.next = t;17 } while (!top.compareAndSet(t, e));18 }19}Race Condition
competing, unsynchronized access to shared variable
solved with synchronization
Deadlock
xxxxxxxxxx71void foo () {2 uthread_mutex_lock (mx);3 count--;4 if (count > 0)5 foo();6 uthread_mutex_unlock (mx);7} xxxxxxxxxx111void lock (struct block_lock* l) {2 spinlock_lock (&l->spinlock);3 while (l->held) {4 enqueue (&waiter_queue, uthread_self());5 spinlock_unlock (&l->spinlock);6 uthread_block ();7 spinlock_lock (&l->spinlock);8 }9 l->held = 1;10 spinlock_unlock (&l->spinlock);11}if we try to lock the mutex again, it is a deadlock
allow a thread that holds the mutex to enter again
xxxxxxxxxx121void uthread_mutex_lock (uthread_mutex_t mutex) {2 spinlock_lock (&mutex->spinlock);3 while (mutex->holder && mutex->holder != uthread_self()) {4 enqueue (&mutex->waiter_queue, uthread_self());5 spinlock_unlock (&mutex->spinlock);6 uthread_block ();7 spinlock_lock (&muter->spinlock);8 }9 mutex->holder = uthread_self();10 mutex->holderCount++;11 spinlock_unlock(&mutex->spinlock);12}