We invented threads to
But, we now have two problems related to controlling operation order
Synchronization is the mechanism threads use to
We have a problem if
Shared data
Critical Sections
Race Condition
Mutual Exclusion
For example
xxxxxxxxxx
51et = uthread_create ((void* (*)(void*)) push_driver, (void*) n);
2dt = uthread_create ((void* (*)(void*)) pop_driver, (void*) n);
3uthread_join (et, 0);
4uthread_join (dt, 0);
5assert (top==0);
xxxxxxxxxx
41void push_st (struct SE* e) {
2 e->next = top;
3 top = e;
4}
xxxxxxxxxx
51struct SE* pop_st () {
2 struct SE* e = top;
3 top = (top) ? top->next : 0;
4 return e;
5}
Notice: It does work if we say uthread_init(1) But it does not work with uthread_init(2) or higher
The bug
Lock Semantics
Lock Primitives
lock acquire lock, wait if necessary
Using locks for the shared stack
xxxxxxxxxx
51void push_cs (struct SE* e) {
2 lock (&aLock);
3 push_st (e); // Critical sections are inside the lock
4 unlock (&aLock);
5}
xxxxxxxxxx
81void SE* pop_cs () {
2 struct SE* e;
3 lock (&aLock);
4 e = pop_st ();
5 unlock (&aLock);
6
7 return e;
8}
Here's a first cut
xxxxxxxxxx
11int lock = 0;
xxxxxxxxxx
41void lock(int* lock) {
2 while (*lock == 1) {}
3 *lock = 1;
4}
xxxxxxxxxx
31void unlock(int* lock) {
2 *lock = 0;
3}
We now have a race in the lock code!
The race exists even at the machine-code level
We need a new instruction
Atomicity
The Atomic Memory Exchange
Name | Semantics | Assembly |
---|---|---|
atomic exchange | r[v] ← m[r[a]] m[r[a]] ← r[v] | xchg (ra), rv |
A Spinlock is
Implementation using Atomic Exchange
xxxxxxxxxx
61 ld $lock, r1
2 ld $1, r0
3loop: xchg (r1), r0
4 beq r0, held
5 br loop
6held:
Can not be implemented just by CPU
Implemented by Memory Bus
Spin first on normal read
xxxxxxxxxx
91 ld $lock, r1 # r1 = &lock
2loop: ld (r1), r0 # r0 = lock
3 beq r0, try # goto try if lock == 0 (available)
4 br loop # goto loop if lock !=0 (held)
5try: ld $1, r0 # r0 = 1
6 xchg (r1), r0 # atomically swap r0 and lock
7 beq r0, held # goto held lock was 0 before swap
8 br loop # try again if another lock holds lock
9held: # we now hold the lock
Busy-waiting pros and cons
If a thread may wait a long time
it should block so that other threads can run
it will then unblock when it becomes runnable
Blocking locks for mutual exclusion
attempting to acquire a held lock BLOCKs calling thread
when releasing lock, UNBLOCK a waiting thread if there is one
Blocking for event notification
Spinlocks
Busy Waiting
Pros and Cons
Use When
Blocking Locks
blocking waiting
Pros and Cons
Use when
General Problem
video playback has two parts: (1) fetch/decode and (2) playback
fetch has variable latency and so we need a buffer
Bounded Buffer and Two Independent Threads
Producer Thread
Consumer Thread
How are Producer and Consumer Connected?
Mutual exclusion plus inter-thread synchronization
Monitor / MUTEX
Condition Variable
xxxxxxxxxx
161struct uthread_mutex;
2typedef struct uthread_mutex* uthread_mutex_t;
3struct uthread_cond;
4typedef struct uthread_cond* uthread_cond_t;
5
6uthread_mutex_t uthread_mutex_create ();
7void uthread_mutex_lock (uthread_mutex_t);
8void uthread_mutex_lock_readonly (uthread_mutex_t);
9void uthread_mutex_unlock (uthread_mutex_t);
10void uthread_mutex_destory (uthread_mutex_t);
11
12uthread_cond_t uthread_cond_create (uthread_mutex_t);
13void uthread_cond_wait (uthread_cond_t);
14void uthread_cond_signal (uthread_cond_t);
15void uthread_cond_broadcast (uthread_cond_t);
16void uthread_cond_destroy (uthread_cond_t);
xxxxxxxxxx
91struct video_frame;
2
3struct video_frame buf [N];
4int buf_length = 0;
5int buf_pcur = 0;
6int buf_ccur = 0;
7
8uthread_mutex_t mx;
9uthread_cond_t need_frames;
xxxxxxxxxx
121void producer() {
2 uthread_lock (mx);
3 while(1) {
4 while (buf_length < N) {
5 buf [pcur] = get_next_frame();
6 buf_pcur = (pcur + 1) % N;
7 buf_length +- 1;
8 }
9 uthread_cond_wait (need_frames);
10 }
11 uthread_unlock (mx);
12}
Basic formulation
xxxxxxxxxx
51uthread_mutex_lock (aMutex);
2 while(!aDesiredState)
3 uthread_cond_wait(aCond);
4 aDesiredState = 0;
5uthread_mutex_unlock (aMutex);
xxxxxxxxxx
41uthread_mutex_lock (aMutex);
2 aDesiredState = 1;
3 uthread_cond_signal(aCond);
4uthread_mutex_unlock (aMutex);
wait releases the mutex and blocks thread
signal awakens at most one thread
broadcast awakens all threads
Beer pitcher is shared data structure with these operations
Implementation goal
Data Structure for Beer Pitcher
What if you want to refill automatically?
Blocking Read
xxxxxxxxxx
81void read(char* buf, int nbytes, int blockno) {
2 uthread_mutex_lock(mx);
3
4 scheduleRead(buf, nbytes, blockno);
5 uthread_cond_wait(readComplete);
6
7 uthread_mutex_unlock(mx);
8}
Read Completion
xxxxxxxxxx
71void readComplete() {
2 uthread_mutex_lock(mx);
3
4 uthread_cond_signal(readComplete);
5
6 uthread_mutex_unlock(mx);
7}
xxxxxxxxxx
61void read(char* buf, int nbytes, int blockno) {
2 uthread_mutex_lock(mx);
3 scheduleRead(buf, nbytes, blockno);
4 uthread_cond_wait(readComplete);
5 uthread_mutex_unlock(mx);
6}
xxxxxxxxxx
41void read(char* buf, int nbytes, int blockno) {
2 scheduleRead(buf, nbytes, blockNo);
3 uthread_cond_wait(readComplete);
4}
The Problem
The Solution
Preventing the Race
Naked Notify
that's what we call a signal outside of a monitor
it's sometimes necessary
xxxxxxxxxx
211void enqueue (uthread_queue_t* queue, uthread_t thread) {
2 thread->next = 0;
3 if (queue->tail)
4 queue->tail->next = thread;
5 queue->tail = thread;
6 if (queue->head == 0)
7 queue->head = queue->tail;
8}
9
10uthread_t* dequeue (uthread_queue_t queue) {
11 uthread_t thread;
12 if (queue->head) {
13 thread = queue->head;
14 queue->head = queue->head->next;
15 if (queue->head==0)
16 queue->tail=0;
17 } else
18 thread = 0;
19 thread->next = 0;
20 return thread;
21}
xxxxxxxxxx
221void enqueue (uthread_queue_t* queue, uthread_t thread) {
2 uthread_mutex_lock(&queue->mx);
3
4 thread->next = 0;
5 if (queue->tail)
6 queue->tail->next = thread;
7 queue->tail = thread;
8 if (queue->head == 0)
9 queue->head = queue->tail;
10 uthread_cond_signal(&queue->not_empty)
11 uthread_mutex_unlock(&queue->mx);
12}
13
14uthread_t* dequeue (uthread_queue_t queue) {
15 uthread_t thread;
16 uthread_mutex_lock(&queue->mx);
17 while(queue->head == 0)
18
19 thread->next = 0;
20 uthread_mutex_unlock(&queue->mx);
21 return thread;
22}
This code seems like it would be right
xxxxxxxxxx
71void enqueue (uthread_queue_t* queue, uthread_t thread) {
2 uthread_mutex_lock(&queue->mx);
3 ...
4 if (queue->head == 0)
5 uthread_cond_signal (&queue->not_empty);
6 uthread_mutex_unlock(&queue->mx);
7}
But it is WRONG
Lets say there are N threads waiting in dequeue on queue->not_empty
If there are N enqueues, there MUST be N signals to wakeup all the dequeues
If you get two enqueues in a row before a dequeue runs, however
you thus get N enqueues but fewer than N signals
If we classify critical sections as
Then we can weaken the mutual exclusion constraint
Read-Writer Monitors
monitor state is one of
mutex_lock()
mutex_lock_read_only()
mutex_unlock()
Policy Question
Disallowing new readers while writer is waiting
Allowing new readers while writer is waiting
What should we do
Introduced by Edsget Dijkstra
A Semaphore is
P(s) – wait(s)
V(s) – signal(s)
but
xxxxxxxxxx
71struct uthread_sem;
2typedef struct uthread_sem* uthread_sem_t;
3
4uthread_sem_t uthread_sem_create (int initial_value);
5void uthread_sem_destroy (uthread_sem_t);
6void uthread_sem_wait (uthread_sem_t);
7void uthread_sem_signal (uthread_sem_t);
Use semaphore to store glasses held by pitcher
xxxxxxxxxx
11uthread_sem_t glasses = uthread_sem_create (0);
Pouring and refilling don't require a monitor
xxxxxxxxxx
71void pour () {
2 uthread_sem_wait (glasses);
3}
4void refill (int n) {
5 for (int i = 0; i < n; i++)
6 uthread_sem_signal (glasses)
7}
Implementing Monitors
Implementing Condition Variables
Blocking Synchronous Operations
Using Monitors and Condition Variables
to avoid wait-signal race, wait and signal must be done while mutex is held
xxxxxxxxxx
61void read (...) {
2 uthread_mutex_lock (mx);
3 scheduleRead (buf, nbytes, bno);
4 uthread_cond_wait (complete);
5 uthread_mutex_unlock (mx);
6}
xxxxxxxxxx
51void readCompletionHandler () {
2 uthread_mutex_lock (mx);
3 uthread_cond_signal (complete);
4 uthread_mutex_unlock (mx);
5}
Using Semaphores
no critical section, wait-signal race problem goes away … why?
xxxxxxxxxx
41void read (...) {
2 scheduleRead (buf, nbyte, bno);
3 uthread_sem_wait (complete);
4}
xxxxxxxxxx
31void readCompletionHandler () {
2 uthread_sem_signal (complete);
3}
With condition variables
xxxxxxxxxx
71int dequeue (struct Q* q) {
2 uthread_mutex_lock (q->mx);
3 while (q->length == 0)
4 uthread_cond_wait (q->notEmpty);
5 ....
6 uthread_mutex_unlock (q->mx);
7}
xxxxxxxxxx
51void enqueue (struct Q* q, int i) {
2 uthread_mutex_lock (q->mx);
3 uthread_cond_signal (q->notEmpty);
4 uthread_mutex_unlock (q->mx);
5}
With semaphores
xxxxxxxxxx
41struct Q {
2 uthread_sem_t mutex; // initialize to 1
3 uthread_sem_t length; // initialize to 0
4}
xxxxxxxxxx
71int dequeue (struct Q* q) {
2 uthread_sem_wait (q->length);
3
4 uthread_sem_wait (q->mutex);
5 ...
6 uthread_sem_signal (q->mutex);
7}
xxxxxxxxxx
61void enqueue (struct Q* q, int i) {
2 uthread_sem_wait (q->mutex);
3 ....
4 uthread_sem_signal (q->mutex);
5 uthread_sem_signal (q->length);
6}
If you thread A to wait for thread B
xxxxxxxxxx
21// Thread A
2uthread_sem_wait (b);
xxxxxxxxxx
21// Thread B
2uthread_sem_signal (b);
Rendezvous: both threads wait for each other
xxxxxxxxxx
31// Thread A
2uthread_sem_signal (a);
3uthread_sem_wait (b);
xxxxxxxxxx
31// Thread B
2uthread_sem_signal (b);
3uthread_sem_wait (a);
What if you reversed the order of wait and signal on either (or both) threads
Local, Non-Reusable Barrier
xxxxxxxxxx
81// create N threads
2uthread_create (one, a);
3uthread_create (two, b);
4...
5uthread_create(last, z);
6// wait for them to finish
7for (int i = 0; i < N; i++)
8 uthread_sem_wait (barrier)
xxxxxxxxxx
41void* one (void* a) {
2 ...
3 uthread_sem_signal (barrier);
4}
Global, Reusable Barrier
Mutex
xxxxxxxxxx
71Lock l = ...;
2l.lock();
3try {
4 ...
5} finally {
6 l.unlock();
7}
xxxxxxxxxx
91Lock l = ...;
2try {
3 l.lockInterruptibility();
4 try {
5 ...
6 } finally {
7 l.unlock();
8 }
9} catch (InterruptException ie) {}
xxxxxxxxxx
31ReadWriteLock l = ...;
2Lock rl = l.readLock();
3Lock wl = l.writeLock();
Conditions
xxxxxxxxxx
261class Beer {
2 Lock l = ...;
3 Condition notEmpty = l.newCondition ();
4 int glasses = 0;
5
6 void pour() throws InterruptedException {
7 l.lock();
8 try {
9 while (glasses == 0)
10 notEmpty.await();
11 glasses--;
12 } finally {
13 l.unlock();
14 }
15 }
16
17 void refill (int n) throws InterruptedException {
18 l.lock();
19 try {
20 glasses += n;
21 notEmpty.signalAll();
22 } finally {
23 l.unlock();
24 }
25 }
26}
Semaphore Class
xxxxxxxxxx
111class Beer {
2 Semaphore glasses = new Semaphore (0);
3
4 void pour () throws InterruptedException {
5 glasses.acquire();
6 }
7
8 void refill (int n) throws InterruptedException {
9 glasses.release (n);
10 }
11}
Lock-free Atomic Variables
AtomicX where X in {Boolean, Integer, IntegerArray, Reference, ...}
atomic operations such as getAndAdd(), compareAndSet()...
V getAndSet (V newValue)
boolean compareAndSet (V expectedValue, V newValue)
V get()
void set (V newValue)
Use instead of mutual exclusion to eliminate data races..
xxxxxxxxxx
101void push_st (struct SE* e) {
2 e->next = top;
3 top = e;
4}
5
6struct SE* pop_st () {
7 struct SE* e = top;
8 top = (top) ? top->next : 0;
9 return e;
10}
xxxxxxxxxx
191class Element {
2 Element* next;
3}
4
5clsas Stack {
6 AtomicReference <Element> top;
7 Stack () {
8 top.set (NULL);
9 }
10
11 void push () {
12 Element t;
13 Element e = new Element ();
14 do {
15 t = top.get();
16 e.next = t;
17 } while (!top.compareAndSet(t, e));
18 }
19}
Race Condition
competing, unsynchronized access to shared variable
solved with synchronization
Deadlock
xxxxxxxxxx
71void foo () {
2 uthread_mutex_lock (mx);
3 count--;
4 if (count > 0)
5 foo();
6 uthread_mutex_unlock (mx);
7}
xxxxxxxxxx
111void lock (struct block_lock* l) {
2 spinlock_lock (&l->spinlock);
3 while (l->held) {
4 enqueue (&waiter_queue, uthread_self());
5 spinlock_unlock (&l->spinlock);
6 uthread_block ();
7 spinlock_lock (&l->spinlock);
8 }
9 l->held = 1;
10 spinlock_unlock (&l->spinlock);
11}
if we try to lock the mutex again, it is a deadlock
allow a thread that holds the mutex to enter again
xxxxxxxxxx
121void uthread_mutex_lock (uthread_mutex_t mutex) {
2 spinlock_lock (&mutex->spinlock);
3 while (mutex->holder && mutex->holder != uthread_self()) {
4 enqueue (&mutex->waiter_queue, uthread_self());
5 spinlock_unlock (&mutex->spinlock);
6 uthread_block ();
7 spinlock_lock (&muter->spinlock);
8 }
9 mutex->holder = uthread_self();
10 mutex->holderCount++;
11 spinlock_unlock(&mutex->spinlock);
12}