Teach you how to use PyAudio, a third-party Python library, to create a recording tool

Hello, I'm[ 🌑 This is the back of the moon. Today, I'd like to share with you the recording tool made by Python using PyAudio. The article directory is as follows:

  • Application platform
  • Audio recording section
  • Audio playback part
  • Part of the attribute value code required by the GUI window
  • pynput monitor keyboard
  • summary

Recently, someone was using the screen recording software to record the desktop. He had a whim in the process of using it. Can he use python as a screen recording tool and exercise his hands-on ability. Next, I'm going to write a series of articles on how to use python as a screen recording tool:

  • Recording screen making video
  • Record audio
  • Synthetic video, audio
  • Making visual window based on Pyqt5

About the above four parts, I hope I can improve them as soon as possible. In the last article, I made a screen recording part using opencv. Next, I will continue to update the series and record audio using python.

Application platform

  • windows 10
  • python 3.7

Audio recording section

Audio recording is similar to video recording. It is also recorded and saved in the form of data frames. This time, the powerful third-party package PyAudio and the built-in wave module are used to write the main part of the code: pip install PyAudio

If the installation fails, you can click here to download the corresponding whl file, cp37 stands for Python 3 7 environment, 64 stands for 64 bit operating system. If the corresponding whl package is not downloaded, the installation will fail. After the download is completed, enter the directory of whl in the cmd window and use PIP install pyaudio XX whl to complete the installation.

Audio recording main code:

from pyaudio import PyAudio, paInt16, paContinue, paComplete

# Set fixed parameters
chunk = 1024  # Frames per buffer
format_sample = paInt16  # Sampling bits
channels = 2  # Channel: 1, mono; 2. Dual channel
fps = 44100  # sampling frequency 

# Here, the audio is recorded by callback
def callback(in_data, frame_count, time_info, status):
    """Recording callback function"""
    wf.writeframes(in_data)
    if xx:  # When certain conditions are met
        return in_data, paContinue
    else:
        return in_data, paComplete

# Instantiate PyAudio
p = PyAudio()
stream = p.open(format=format_sample,
    channels=channels,
    rate=fps,
                frames_per_buffer=chunk,
                input=True,
                input_device_index=None,  # Enter the device index, and None is the default device
                stream_callback=callback   # Callback function
                )
# Start streaming recording
stream.start_stream()
# Determine whether the flow is active
while stream.is_active():
 time.sleep(0.1)    # 0.1 is the sensitivity
# After recording, close the stream and instance
stream.stop_stream()
stream.close()
p.terminate()

To stream and record with the callback function, you need to define and save the audio file first, and create a new audio binary file with wave:

import wave
wf = wave.open('test.wav', 'wb')
wf.setnchannels(channels)
wf.setsampwidth(p.get_sample_size(format_sample))
wf.setframerate(fps)

In order to reuse the follow-up code well, wrap the above code into classes

from pyaudio import PyAudio

class AudioRecord(PyAudio):

    def __init__(self,):

The source code is supplemented at the end of the article.

Audio playback part

There is little difference between the playing part and the recording part. The core part is:

wf = wave.open('test.wav', 'rb')
def callback(in_data, frame_count, time_info, status):
 data = wf.readframes(frame_count)
 return data, paContinue

stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
    channels=wf.getnchannels(),
                rate=wf.getframerate(),
    output=True,
    output_device_index=output_device_index,  # Enter device index
    stream_callback=callback  # Callback function for output
                )
stream.start_stream()
while stream.is_active():
 time.sleep(0.1)

At present, it has been tested temporarily wav and mp3 format can be recorded and played normally, and other types of audio formats can call the code to test.

Part of the attribute value code required by the GUI window

Considering that the GUI window can more humanized output and input values, write this part of the code, including audio duration and obtaining input and output devices.

# Audio duration
duration = wf.getnframes() / wf.getframerate()
# Get the input / output devices currently installed in the system
dev_info = self.get_device_info_by_index(i)
default_rate = int(dev_info['defaultSampleRate'])
if not dev_info['hostApi'] and default_rate == fps and 'Mapper' not in dev_info['name']:
 if dev_info['maxInputChannels']:
  print('Input device:', dev_info['name'])
 elif dev_info['maxOutputChannels']:
  print('Output device:', dev_info['name'])

pynput monitor keyboard

In this part, the code also temporarily uses pynput to listen to the keyboard to interrupt the recording. You can call the keyboard monitoring code in the previous article.

def hotkey(self):
    """Hotkey monitor"""
    with keyboard.Listener(on_press=self.on_press) as listener:
        listener.join()

def on_press(self, key):
    try:
        if key.char == 't':  # t key, recording ends and audio is saved
            self.flag = True
        elif key.char == 'k':  # k key, recording abort, delete file
            self.flag = True
            self.kill = True
    except Exception as e:
        print(e)

The function is similar to the previous one and will not be repeated.

summary

Hello, I'm[ 🌑 This is the back of the moon. The above is the content of using PyAudio to call windows audio devices for recording and playing. This article takes you to learn about the use class and its inheritance as a whole. The usage is only the tip of the iceberg. There are more knowledge waiting for us to explore together!

Made on December 20, 2021

Source code:

import wave
import time
from pathlib import Path
from threading import Thread
from pyaudio import PyAudio, paInt16, paContinue, paComplete
from pynput import keyboard  # pip install pynput


class AudioRecord(PyAudio):

    def __init__(self, channels=2):
        super().__init__()
        self.chunk = 1024  # Number of frames per buffer
        self.format_sample = paInt16  # Sampling bits
        self.channels = channels  # Channel: 1, mono; 2. Dual channel
        self.fps = 44100  # sampling frequency 
        self.input_dict = None
        self.output_dict = None
        self.stream = None
        self.filename = '~test.wav'
        self.duration = 0   # Audio duration
        self.flag = False
        self.kill = False

    def __call__(self, filename):
        """Reload file name"""
        self.filename = filename

    def callback_input(self, in_data, frame_count, time_info, status):
        """Recording callback function"""
        self.wf.writeframes(in_data)
        if not self.flag:
            return in_data, paContinue
        else:
            return in_data, paComplete

    def callback_output(self, in_data, frame_count, time_info, status):
        """Playback callback function"""
        data = self.wf.readframes(frame_count)
        return data, paContinue

    def open_stream(self, name):
        """Open recording stream"""
        input_device_index = self.get_device_index(name, True) if name else None
        return self.open(format=self.format_sample,
                         channels=self.channels,
                         rate=self.fps,
                         frames_per_buffer=self.chunk,
                         input=True,
                         input_device_index=input_device_index,  # Enter device index
                         stream_callback=self.callback_input
                         )

    def audio_record_run(self, name=None):
        """audio recording """
        self.wf = self.save_audio_file(self.filename)
        self.stream = self.open_stream(name)
        self.stream.start_stream()
        while self.stream.is_active():
            time.sleep(0.1)
        self.wf.close()
        if self.kill:
            Path(self.filename).unlink()
        self.duration = self.get_duration(self.wf)
        print(self.duration)
        self.terminate_run()

    def run(self, filename=None, name=None, record=True):
        """Audio recording thread"""
        thread_1 = Thread(target=self.hotkey, daemon=True)
        if record:
            # Recording
            if filename:
                self.filename = filename
            thread_2 = Thread(target=self.audio_record_run, args=(name,))
        else:
            # play
            if not filename:
                raise Exception('The audio file name is not entered and cannot be played. Please enter it and try again!')
            thread_2 = Thread(target=self.read_audio, args=(filename, name,))
        thread_1.start()
        thread_2.start()

    def read_audio(self, filename, name=None):
        """Audio playback"""
        output_device_index = self.get_device_index(name, False) if name else None
        with wave.open(filename, 'rb') as self.wf:
            self.duration = self.get_duration(self.wf)
            self.stream = self.open(format=self.get_format_from_width(self.wf.getsampwidth()),
                                    channels=self.wf.getnchannels(),
                                    rate=self.wf.getframerate(),
                                    output=True,
                                    output_device_index=output_device_index,  # Output device index
                                    stream_callback=self.callback_output
                                    )
            self.stream.start_stream()
            while self.stream.is_active():
                time.sleep(0.1)
        print(self.duration)
        self.terminate_run()

    @staticmethod
    def get_duration(wf):
        """Get audio duration"""
        return round(wf.getnframes() / wf.getframerate(), 2)

    def get_in_out_devices(self):
        """Get system I / O device"""
        self.input_dict = {}
        self.output_dict = {}
        for i in range(self.get_device_count()):
            dev_info = self.get_device_info_by_index(i)
            default_rate = int(dev_info['defaultSampleRate'])
            if not dev_info['hostApi'] and default_rate == self.fps and 'Mapper' not in dev_info['name']:
                if dev_info['maxInputChannels']:
                    self.input_dict[dev_info['name']] = i
                elif dev_info['maxOutputChannels']:
                    self.output_dict[dev_info['name']] = i

    def get_device_index(self, name, input_in=True):
        """Gets the index of the selected device"""
        if input_in and self.input_dict:
            return self.input_dict.get(name, -1)
        elif not input_in and self.output_dict:
            return self.output_dict.get(name, -1)

    def save_audio_file(self, filename):
        """Audio file saving"""
        wf = wave.open(filename, 'wb')
        wf.setnchannels(self.channels)
        wf.setsampwidth(self.get_sample_size(self.format_sample))
        wf.setframerate(self.fps)
        return wf

    def terminate_run(self):
        """End streaming recording or streaming playback"""
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
        self.terminate()

    def hotkey(self):
        """Hotkey monitor"""
        with keyboard.Listener(on_press=self.on_press) as listener:
            listener.join()

    def on_press(self, key):
        try:
            if key.char == 't':  # t key, recording ends and audio is saved
                self.flag = True
            elif key.char == 'k':  # k key, recording abort, delete file
                self.flag = True
                self.kill = True
        except Exception as e:
            print(e)


if __name__ == '__main__':
    audio_record = AudioRecord()
    audio_record.get_in_out_devices()
    # Recording
    print(audio_record.input_dict)
    audio_record.run('test.mp3')
    # play
    print(audio_record.output_dict)
    audio_record.run('test.mp3', record=False)

Added by ether on Thu, 13 Jan 2022 13:46:49 +0200