How to manage shared memory in a custom ring buffer data structure? /u/Long_Basket_5294 Python Education

I am writing a program for this following task

The task is to write a program that creates two processes:

a) The ‘producer process reads a video frame image directly into a shared memory ring buffer (i.e., a shared memory buffer that can accommodate N frame images for a configurable N – this can be kept as a command line parameter).

b) The ‘consumer process reads the frame images from the ring buffer an displays each frame for a configurable amount of time (e.g., 500 ms – again program command line parameter)

c) The producer and consumer must synchronize so that

i) The consumer displays an image only after it’s been fully produced (i.e., the entire frame has been generated)

ii) The consumer waits for the producer if the next image to be consumed has not yet been fully produced.

iii) The producer waits for the consumer the ring buffer doesn’t have space for the next image to be produced (i.e., consumer has not fully consumed any of the current images in the ring buffer)

below is my code:

from multiprocessing import Process, Lock, Value from multiprocessing.shared_memory import SharedMemory import sys import numpy as np import cv2 import time class RingBufferFullException(Exception): pass class RingBufferEmptyException(Exception): pass class RingBuffer: def __init__(self, max_size, single_frame): self.max_size = Value('i', max_size) self.shm = SharedMemory(create=True, size=max_size*single_frame.nbytes) self.queue = np.ndarray(shape=(max_size, *single_frame.shape), dtype=single_frame.dtype, buffer=self.shm.buf) self.tail = Value('i', -1) self.head = Value('i', 0) self.size = Value('i', 0) self.lock = Lock() def enqueue(self, item): with self.lock: if self.size.value == self.max_size.value: raise RingBufferFullException('Error: Queue is full') else: self.tail.value = (self.tail.value + 1) % self.max_size.value self.queue[self.tail.value] = item self.size.value += 1 def dequeue(self): with self.lock: if self.size.value == 0: raise RingBufferEmptyException('Error: Queue is empty') tmp = self.queue[self.head.value] self.head.value = (self.head.value + 1) % self.max_size.value self.size.value -= 1 return tmp def isFull(self): return self.size.value == self.max_size.value def isEmpty(self): return self.size.value == 0 def display(self): if self.size.value == 0: print('Queue is empty') else: idx = self.head.value print('---------------------------------------') for i in range(self.size.value): print(self.queue[idx]) idx = (idx + 1) % self.max_size.value print('---------------------------------------') def clean(self): self.shm.close() self.shm.unlink() def producer(buf): cap = cv2.VideoCapture(0) print(cap.isOpened()) while True: buf.display() success, frame = cap.read() if not success: print('Failed to capture frame') continue try: buf.enqueue(frame) except RingBufferFullException: time.sleep(0.5) def consumer(buf): while True: try: frame = buf.dequeue() cv2.imshow('Frame', frame) cv2.waitKey(100) except RingBufferEmptyException: time.sleep(0.2) cv2.destroyAllWindows() if __name__ == '__main__': test = cv2.VideoCapture(0) if not test.isOpened(): print('Error: Could not open camera.') sys.exit(1) success, single_frame = test.read() if not success: print('Error: Could not capture initial frame.') sys.exit(1) test.release() buf = RingBuffer(10, single_frame) test.release() produce = Process(target=producer, args=(buf,)) consume = Process(target=consumer, args=(buf,)) try: produce.start() consume.start() produce.join() consume.join() finally: buf.clean() 

At first I thought I could make the ring buffer instance itself a shared memory. I searched for the web and I couldn’t find anything. ChatGPT said making a complex custom object a shared memory is not possible in python. And then I thought I could put the instance inside a shared memory and ChatGPT said it is also not possible

Then I made the metadata of the ring buffer a shared memory. now it enqueues and dequeues frames fine but it shows only black images.

The main problem is that the cv2 window only shows black. The images are all black. when I print the dequeued frame it is just matrix with all zeroes. If I print the buffer inside the producer function or consumer function it prints np matrix with different values. But in the cv2 window it is just black images.

What am I doing wrong? Any help is appreciated. Thank you guys in advance

submitted by /u/Long_Basket_5294
[link] [comments]

​r/learnpython I am writing a program for this following task The task is to write a program that creates two processes: a) The ‘producer process reads a video frame image directly into a shared memory ring buffer (i.e., a shared memory buffer that can accommodate N frame images for a configurable N – this can be kept as a command line parameter). b) The ‘consumer process reads the frame images from the ring buffer an displays each frame for a configurable amount of time (e.g., 500 ms – again program command line parameter) c) The producer and consumer must synchronize so that i) The consumer displays an image only after it’s been fully produced (i.e., the entire frame has been generated) ii) The consumer waits for the producer if the next image to be consumed has not yet been fully produced. iii) The producer waits for the consumer the ring buffer doesn’t have space for the next image to be produced (i.e., consumer has not fully consumed any of the current images in the ring buffer) below is my code: from multiprocessing import Process, Lock, Value from multiprocessing.shared_memory import SharedMemory import sys import numpy as np import cv2 import time class RingBufferFullException(Exception): pass class RingBufferEmptyException(Exception): pass class RingBuffer: def __init__(self, max_size, single_frame): self.max_size = Value(‘i’, max_size) self.shm = SharedMemory(create=True, size=max_size*single_frame.nbytes) self.queue = np.ndarray(shape=(max_size, *single_frame.shape), dtype=single_frame.dtype, buffer=self.shm.buf) self.tail = Value(‘i’, -1) self.head = Value(‘i’, 0) self.size = Value(‘i’, 0) self.lock = Lock() def enqueue(self, item): with self.lock: if self.size.value == self.max_size.value: raise RingBufferFullException(‘Error: Queue is full’) else: self.tail.value = (self.tail.value + 1) % self.max_size.value self.queue[self.tail.value] = item self.size.value += 1 def dequeue(self): with self.lock: if self.size.value == 0: raise RingBufferEmptyException(‘Error: Queue is empty’) tmp = self.queue[self.head.value] self.head.value = (self.head.value + 1) % self.max_size.value self.size.value -= 1 return tmp def isFull(self): return self.size.value == self.max_size.value def isEmpty(self): return self.size.value == 0 def display(self): if self.size.value == 0: print(‘Queue is empty’) else: idx = self.head.value print(‘—————————————‘) for i in range(self.size.value): print(self.queue[idx]) idx = (idx + 1) % self.max_size.value print(‘—————————————‘) def clean(self): self.shm.close() self.shm.unlink() def producer(buf): cap = cv2.VideoCapture(0) print(cap.isOpened()) while True: buf.display() success, frame = cap.read() if not success: print(‘Failed to capture frame’) continue try: buf.enqueue(frame) except RingBufferFullException: time.sleep(0.5) def consumer(buf): while True: try: frame = buf.dequeue() cv2.imshow(‘Frame’, frame) cv2.waitKey(100) except RingBufferEmptyException: time.sleep(0.2) cv2.destroyAllWindows() if __name__ == ‘__main__’: test = cv2.VideoCapture(0) if not test.isOpened(): print(‘Error: Could not open camera.’) sys.exit(1) success, single_frame = test.read() if not success: print(‘Error: Could not capture initial frame.’) sys.exit(1) test.release() buf = RingBuffer(10, single_frame) test.release() produce = Process(target=producer, args=(buf,)) consume = Process(target=consumer, args=(buf,)) try: produce.start() consume.start() produce.join() consume.join() finally: buf.clean() At first I thought I could make the ring buffer instance itself a shared memory. I searched for the web and I couldn’t find anything. ChatGPT said making a complex custom object a shared memory is not possible in python. And then I thought I could put the instance inside a shared memory and ChatGPT said it is also not possible Then I made the metadata of the ring buffer a shared memory. now it enqueues and dequeues frames fine but it shows only black images. The main problem is that the cv2 window only shows black. The images are all black. when I print the dequeued frame it is just matrix with all zeroes. If I print the buffer inside the producer function or consumer function it prints np matrix with different values. But in the cv2 window it is just black images. What am I doing wrong? Any help is appreciated. Thank you guys in advance submitted by /u/Long_Basket_5294 [link] [comments] 

I am writing a program for this following task

The task is to write a program that creates two processes:

a) The ‘producer process reads a video frame image directly into a shared memory ring buffer (i.e., a shared memory buffer that can accommodate N frame images for a configurable N – this can be kept as a command line parameter).

b) The ‘consumer process reads the frame images from the ring buffer an displays each frame for a configurable amount of time (e.g., 500 ms – again program command line parameter)

c) The producer and consumer must synchronize so that

i) The consumer displays an image only after it’s been fully produced (i.e., the entire frame has been generated)

ii) The consumer waits for the producer if the next image to be consumed has not yet been fully produced.

iii) The producer waits for the consumer the ring buffer doesn’t have space for the next image to be produced (i.e., consumer has not fully consumed any of the current images in the ring buffer)

below is my code:

from multiprocessing import Process, Lock, Value from multiprocessing.shared_memory import SharedMemory import sys import numpy as np import cv2 import time class RingBufferFullException(Exception): pass class RingBufferEmptyException(Exception): pass class RingBuffer: def __init__(self, max_size, single_frame): self.max_size = Value('i', max_size) self.shm = SharedMemory(create=True, size=max_size*single_frame.nbytes) self.queue = np.ndarray(shape=(max_size, *single_frame.shape), dtype=single_frame.dtype, buffer=self.shm.buf) self.tail = Value('i', -1) self.head = Value('i', 0) self.size = Value('i', 0) self.lock = Lock() def enqueue(self, item): with self.lock: if self.size.value == self.max_size.value: raise RingBufferFullException('Error: Queue is full') else: self.tail.value = (self.tail.value + 1) % self.max_size.value self.queue[self.tail.value] = item self.size.value += 1 def dequeue(self): with self.lock: if self.size.value == 0: raise RingBufferEmptyException('Error: Queue is empty') tmp = self.queue[self.head.value] self.head.value = (self.head.value + 1) % self.max_size.value self.size.value -= 1 return tmp def isFull(self): return self.size.value == self.max_size.value def isEmpty(self): return self.size.value == 0 def display(self): if self.size.value == 0: print('Queue is empty') else: idx = self.head.value print('---------------------------------------') for i in range(self.size.value): print(self.queue[idx]) idx = (idx + 1) % self.max_size.value print('---------------------------------------') def clean(self): self.shm.close() self.shm.unlink() def producer(buf): cap = cv2.VideoCapture(0) print(cap.isOpened()) while True: buf.display() success, frame = cap.read() if not success: print('Failed to capture frame') continue try: buf.enqueue(frame) except RingBufferFullException: time.sleep(0.5) def consumer(buf): while True: try: frame = buf.dequeue() cv2.imshow('Frame', frame) cv2.waitKey(100) except RingBufferEmptyException: time.sleep(0.2) cv2.destroyAllWindows() if __name__ == '__main__': test = cv2.VideoCapture(0) if not test.isOpened(): print('Error: Could not open camera.') sys.exit(1) success, single_frame = test.read() if not success: print('Error: Could not capture initial frame.') sys.exit(1) test.release() buf = RingBuffer(10, single_frame) test.release() produce = Process(target=producer, args=(buf,)) consume = Process(target=consumer, args=(buf,)) try: produce.start() consume.start() produce.join() consume.join() finally: buf.clean() 

At first I thought I could make the ring buffer instance itself a shared memory. I searched for the web and I couldn’t find anything. ChatGPT said making a complex custom object a shared memory is not possible in python. And then I thought I could put the instance inside a shared memory and ChatGPT said it is also not possible

Then I made the metadata of the ring buffer a shared memory. now it enqueues and dequeues frames fine but it shows only black images.

The main problem is that the cv2 window only shows black. The images are all black. when I print the dequeued frame it is just matrix with all zeroes. If I print the buffer inside the producer function or consumer function it prints np matrix with different values. But in the cv2 window it is just black images.

What am I doing wrong? Any help is appreciated. Thank you guys in advance

submitted by /u/Long_Basket_5294
[link] [comments] 

Leave a Reply

Your email address will not be published. Required fields are marked *