serenity-rs/songbird

Not able to use default microphone input as LiveInput for Discord Bot

Opened this issue · 0 comments

Songbird version: { version = "0.4.3", features = ["driver"] }

Rust version (rustc -V): rustc 1.82.0 (f6e511eec 2024-10-15)

Serenity/Twilight version: { version = "0.12.2", features = ["client", "framework", "gateway", "voice"] }

Output of ffmpeg -version, yt-dlp --version (if relevant):
...

Description:
Not able to use default microphone input as LiveInput for Discord Bot

Steps to reproduce:

audio.rs

use std::sync::Arc;
use anyhow::Result;
use cpal::traits::{DeviceTrait, HostTrait};
use tokio::sync::mpsc::Receiver;
use std::io::{Read, Seek, SeekFrom, Result as IoResult};
use byteorder::{LittleEndian, WriteBytesExt};
use songbird::input::{Input, RawAdapter};
use symphonia_core::io::MediaSource;

pub struct LiveInput {
    receiver: Receiver<Vec<f32>>,
    buffer: Vec<u8>,
    position: usize,
}

impl LiveInput {
    pub fn new(receiver: Receiver<Vec<f32>>, sample_rate: u32, channel_count: u32) -> Self {
        let mut header = Vec::with_capacity(16);
        header.extend_from_slice(b"SbirdRaw");
        header.write_u32::<LittleEndian>(sample_rate).unwrap();
        header.write_u32::<LittleEndian>(channel_count).unwrap();

        Self {
            receiver,
            buffer: header,
            position: 0,
        }
    }
}
// Implementing MediaSource for LiveInput
impl MediaSource for LiveInput {
    fn is_seekable(&self) -> bool {
        false // Live input is not seekable
    }

    fn byte_len(&self) -> Option<u64> {
        None // Since it's a live stream, the length is unknown
    }
}
// Implement std::io::Read for LiveInput
impl Read for LiveInput {
    fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
        if self.position >= self.buffer.len() {
            match self.receiver.blocking_recv() {
                Some(data) => {
                    let byte_slice = bytemuck::cast_slice(&data);
                    self.buffer = byte_slice.to_vec();
                    self.position = 0;
                }
                None => return Ok(0), // No more data
            }
        }
        let remaining = self.buffer.len() - self.position;
        let to_copy = std::cmp::min(remaining, buf.len());
        buf[..to_copy].copy_from_slice(&self.buffer[self.position..self.position + to_copy]);
        self.position += to_copy;
        Ok(to_copy)
    }
}

// Implement std::io::Seek for LiveInput
impl Seek for LiveInput {
    fn seek(&mut self, _pos: SeekFrom) -> IoResult<u64> {
        // Seeking is not supported in live input
        Err(std::io::Error::new(
            std::io::ErrorKind::Unsupported,
            "Cannot seek in live input",
        ))
    }
}

pub async fn create_live_audio_input() -> Result<Input> {
    let host = cpal::default_host();

    println!("Available audio devices:");
    let devices = host.devices().expect("Failed to get audio devices");
    for (idx, device) in devices.enumerate() {
        let name = device.name().unwrap_or_else(|_| "Unknown Device".to_string());
        println!("Device {}: {}", idx, name);
    }
    let default_device = host.default_input_device().expect("No input device available");
    let default_device_name = default_device.name().unwrap_or_else(|_| "Unknown Device".to_string());
    println!("Default input device: {}", default_device_name);

    let config = default_device
        .default_input_config()
        .expect("Failed to get default input config");

    let sample_rate = config.sample_rate().0;
    let channel_count = config.channels() as u32;

    let (sender, receiver) = tokio::sync::mpsc::channel::<Vec<f32>>(32);
    let sender = Arc::new(sender);
    println!("Sample rate: {}, Channel count: {}", sample_rate, channel_count);

    // Build the CPAL input stream
    let _stream = match config.sample_format() {
        cpal::SampleFormat::F32 => {
            default_device.build_input_stream(
                &config.into(),
                move |data: &[f32], _: &cpal::InputCallbackInfo| {
                    let sender = sender.clone();
                    let mut samples = Vec::with_capacity(data.len());

                    for &sample in data {
                        samples.push(sample);
                    }

                    tokio::spawn(async move {
                        if let Err(err) = sender.send(samples).await {
                            eprintln!("Failed to send PCM data: {}", err);
                        }
                    });
                },
                move |err| {
                    eprintln!("Error in the stream: {}", err);
                },
            )?
        }
        cpal::SampleFormat::I16 => {
            default_device.build_input_stream(
                &config.into(),
                move |data: &[i16], _: &cpal::InputCallbackInfo| {
                    let sender = sender.clone();
                    let mut samples = Vec::with_capacity(data.len());

                    for &sample in data {
                        samples.push(sample as f32); // Convert i16 to f32
                    }

                    tokio::spawn(async move {
                        if let Err(err) = sender.send(samples).await {
                            eprintln!("Failed to send PCM data: {}", err);
                        }
                    });
                },
                move |err| {
                    eprintln!("Error in the stream: {}", err);
                },
            )?
        }
        cpal::SampleFormat::U16 => {
            default_device.build_input_stream(
                &config.into(),
                move |data: &[u16], _: &cpal::InputCallbackInfo| {
                    let sender = sender.clone();
                    let mut samples = Vec::with_capacity(data.len());

                    for &sample in data {
                        samples.push(sample as f32); // Convert u16 to f32
                    }

                    tokio::spawn(async move {
                        if let Err(err) = sender.send(samples).await {
                            eprintln!("Failed to send PCM data: {}", err);
                        }
                    });
                },
                move |err| {
                    eprintln!("Error in the stream: {}", err);
                },
            )?
        }
    };
    // Create the LiveInput
    let live_input = LiveInput::new(receiver, sample_rate, channel_count);
    // Wrap the LiveInput in a RawAdapter
    let raw_adapter = RawAdapter::new(
        live_input,
        sample_rate,
        channel_count,
    );
    // Create the Input from the RawAdapter
    let input = Input::from(raw_adapter);
    Ok(input)
}

Then in bot.rs

pub struct Handler {
    track_handles: Arc<Mutex<HashMap<GuildId, songbird::tracks::TrackHandle>>>, // Add track_handles
}
#[async_trait]
impl EventHandler for Handler {
    async fn message(&self, ctx: Context, msg: Message) {
        match msg.content.as_str() {
            "!ping" => {
                if let Err(why) = msg.channel_id.say(&ctx.http, "pong").await {
                    eprintln!("Error sending pong: {:?}", why);
                }
            }
            "!join" => {
                let guild_id = match msg.guild_id {
                    Some(guild_id) => guild_id,
                    None => {
                        let _ = msg.channel_id.say(&ctx.http, "Guild not found").await;
                        return;
                    }
                };
                
                let channel_id = match msg.guild(&ctx.cache).and_then(|guild| {
                    guild
                        .voice_states
                        .get(&msg.author.id)
                        .and_then(|vs| vs.channel_id)
                }) {
                    Some(channel_id) => channel_id,
                    None => {
                        let _ = msg.channel_id.say(&ctx.http, "Not in a Voice Channel").await;
                        return;
                    }
                };
                
                // Get Songbird manager
                let manager = songbird::get(&ctx)
                    .await
                    .expect("Songbird Voice client placed in at initialization.")
                    .clone();
                
                // Join the voice channel
                match manager.join(guild_id, channel_id).await {
                    Ok(handler_lock) => {
                        let mut handler = handler_lock.lock().await;
                        if let Err(why) = msg.channel_id.say(&ctx.http, &format!("Joined {}", channel_id.mention())).await {
                            eprintln!("Error joining channel: {:?}", why);
                        }
                        // Create the live audio input
                        let live_audio_input = match create_live_audio_input().await {
                            Ok(input) => input,
                            Err(err) => {
                                eprintln!("Failed to create live audio input: {:?}", err);
                                let _ = msg
                                    .channel_id
                                    .say(&ctx.http, "Failed to create live audio input")
                                    .await;
                                return;
                            }
                        };
                        // Play the input and store the TrackHandle
                        match handler.play_only_input(live_audio_input) {
                            Ok(track_handle) => {
                                // Store the TrackHandle to prevent it from being dropped
                                self.track_handles.lock().await.insert(guild_id, track_handle);
                                info!("Playing live audio input.");
                            }
                            Err(err) => {
                                eprintln!("Failed to play live audio input: {:?}", err);
                                let _ = msg
                                    .channel_id
                                    .say(&ctx.http, "Failed to play live audio input")
                                    .await;
                            }
                        }
                    }
                    Err(err) => {
                        eprintln!("Failed to join voice channel: {:?}", err);
                        let _ = msg
                            .channel_id
                            .say(&ctx.http, "Failed to join voice channel")
                            .await;
                    }
                }
            }
            }
            _ => {}
        }
    }
}

pub async fn start_bot(token: &str, keyboard_input_tx: Arc<Mutex<mpsc::Sender<KeyboardInputCommand>>>) -> serenity::Result<()> {
    let intents = GatewayIntents::GUILDS
        | GatewayIntents::GUILD_MESSAGES
        | GatewayIntents::MESSAGE_CONTENT
        | GatewayIntents::GUILD_VOICE_STATES; // Also include voice state intents
    let handler = Handler::new();
    let mut client = Client::builder(token, intents)
        .event_handler(handler)
        .register_songbird() // Register Songbird
        .await?;
    client.start().await
}

Bot is joining VC.
No output.
Console:
Failed to send PCM data: Channel Closed