Skip to content

Instantly share code, notes, and snippets.

@youkaichao
Created November 5, 2024 00:06
Show Gist options
  • Save youkaichao/8f87555bdeaaf68f4492b0dc96fbd206 to your computer and use it in GitHub Desktop.
Save youkaichao/8f87555bdeaaf68f4492b0dc96fbd206 to your computer and use it in GitHub Desktop.
cuda ipc
import os
from typing import List
# os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
import torch
import torch.distributed as dist
dist.init_process_group(backend="gloo")
rank = local_rank = dist.get_rank()
world_size = dist.get_world_size()
torch.cuda.set_device(local_rank)
def share_tensor(A: torch.Tensor, group=None) -> List[torch.Tensor]:
from torch.multiprocessing.reductions import reduce_tensor
A_meta = reduce_tensor(A)
tensor_metas = [None] * world_size
dist.all_gather_object(tensor_metas, A_meta, group=group)
rank = dist.get_rank(group)
all_tensors = []
for i, obj in enumerate(tensor_metas):
func = obj[0]
args = list(obj[1])
args[6] = A.device.index
if i != rank:
all_tensors.append(func(*args))
else:
all_tensors.append(A)
return all_tensors
A = torch.ones((10,), device=local_rank) * rank
all_tensors = share_tensor(A)
dist.barrier()
torch.cuda.synchronize()
if rank == 0:
for x in all_tensors:
x.zero_()
dist.barrier()
torch.cuda.synchronize()
for i, x in enumerate(all_tensors):
print(f"{rank=}, {i=}, {x=}")
@youkaichao
Copy link
Author

running with export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True , will get an error:

RuntimeError: pidfd_getfd: Operation not permitted

might be related with https://github.com/pytorch/pytorch/blob/3f248a57353288ac4df3a445ffa3ae0f952a6d33/c10/cuda/CUDACachingAllocator.cpp#L487

@youkaichao
Copy link
Author

the following code can compile and run actually:

#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/syscall.h>
#include <errno.h>

int main() {
    // Step 1: Obtain a pidfd for the current process
    int pidfd = syscall(SYS_pidfd_open, getpid(), 0);
    if (pidfd == -1) {
        perror("pidfd_open failed");
        return 1;
    }

    // Step 2: Open a valid file descriptor, e.g., /proc/self/status
    int fd = open("/proc/self/status", O_RDONLY);
    if (fd == -1) {
        perror("open failed");
        close(pidfd);
        return 1;
    }

    // Step 3: Try to duplicate fd using pidfd_getfd
    int new_fd = syscall(SYS_pidfd_getfd, pidfd, fd, 0);
    if (new_fd == -1) {
        perror("pidfd_getfd failed");
    } else {
        printf("pidfd_getfd succeeded, new_fd: %d\n", new_fd);
        close(new_fd);  // Close the duplicated fd if successful
    }

    // Clean up
    close(fd);
    close(pidfd);
    return 0;
}

compile : gcc test.c -o test

run: ./test

output: pidfd_getfd succeeded, new_fd: 5

@youkaichao
Copy link
Author

running on 2.6.0.dev20241112+cu124 , still get the same error RuntimeError: pidfd_getfd: Operation not permitted .

@youkaichao
Copy link
Author

IPC of expandable segment is introduced in pytorch 2.5. However, I find that some linux os does not allow this feature.

Example test code to manually test the IPC functionality:

// sender.cpp
#include <cuda.h>
#include <cuda_runtime.h>
#include <iostream>
#include <fstream>
#include <vector>
#include <cstring>
#include <unistd.h>
#include <sys/syscall.h>

// Define syscall numbers if not available
#ifndef SYS_pidfd_open
#define SYS_pidfd_open 434
#endif

struct ShareHeader {
    pid_t pid;
    size_t segment_size;
    size_t num_handles;
};

// Helper function to get CUDA error string
const char* getCudaErrorString(CUresult error) {
    const char* errorString;
    cuGetErrorString(error, &errorString);
    return errorString;
}

int main() {
    // Initialize CUDA
    CUresult result = cuInit(0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to initialize CUDA: " << getCudaErrorString(result) << std::endl;
        return 1;
    }

    // Get CUDA device
    CUdevice device;
    result = cuDeviceGet(&device, 0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to get CUDA device: " << getCudaErrorString(result) << std::endl;
        return 1;
    }

    // Create CUDA context
    CUcontext context;
    result = cuCtxCreate(&context, 0, device);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to create CUDA context: " << getCudaErrorString(result) << std::endl;
        return 1;
    }

    // Allocate memory using VMM API
    const size_t size = 20 * 1024 * 1024; // 20MB
    CUmemGenericAllocationHandle handle;

    // Set up memory allocation properties
    CUmemAllocationProp prop = {};
    prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
    prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
    prop.location.id = 0;  // Use device 0
    prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR;  // Specify handle type for export
    prop.win32HandleMetaData = nullptr;

    // Get the minimum granularity supported for allocation
    size_t granularity = 0;
    result = cuMemGetAllocationGranularity(&granularity, &prop, CU_MEM_ALLOC_GRANULARITY_MINIMUM);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to get allocation granularity: " << getCudaErrorString(result) << std::endl;
        return 1;
    }

    // Ensure size is a multiple of granularity
    if (size % granularity) {
        std::cerr << "Allocation size is not a multiple of minimum supported granularity" << std::endl;
        return 1;
    }

    std::cout << "Creating memory handle with size: " << size << " bytes" << std::endl;
    result = cuMemCreate(&handle, size, &prop, 0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to create memory handle: " << getCudaErrorString(result) << std::endl;
        return 1;
    }
    std::cout << "Successfully created memory handle" << std::endl;

    // Reserve address range
    CUdeviceptr ptr;
    std::cout << "Reserving address range" << std::endl;
    result = cuMemAddressReserve(&ptr, size, 0, 0, 0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to reserve address range: " << getCudaErrorString(result) << std::endl;
        cuMemRelease(handle);
        return 1;
    }
    std::cout << "Successfully reserved address range at: " << ptr << std::endl;

    // Map the memory
    std::cout << "Mapping memory" << std::endl;
    result = cuMemMap(ptr, size, 0, handle, 0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to map memory: " << getCudaErrorString(result) << std::endl;
        cuMemAddressFree(ptr, size);
        cuMemRelease(handle);
        return 1;
    }
    std::cout << "Successfully mapped memory" << std::endl;

    // Set access properties
    CUmemAccessDesc accessDesc = {};
    accessDesc.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
    accessDesc.location.id = 0;  // Use device 0
    accessDesc.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;

    std::cout << "Setting memory access properties" << std::endl;
    result = cuMemSetAccess(ptr, size, &accessDesc, 1);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to set memory access: " << getCudaErrorString(result) << std::endl;
        cuMemUnmap(ptr, size);
        cuMemAddressFree(ptr, size);
        cuMemRelease(handle);
        return 1;
    }
    std::cout << "Successfully set memory access properties" << std::endl;

    // Export handle to file descriptor
    int fd = 0;
    std::cout << "Exporting handle to file descriptor" << std::endl;
    result = cuMemExportToShareableHandle(&fd, handle, CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to export handle: " << getCudaErrorString(result) << std::endl;
        std::cerr << "Handle value: " << handle << std::endl;
        std::cerr << "Handle type: CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR" << std::endl;
        cuMemUnmap(ptr, size);
        cuMemAddressFree(ptr, size);
        cuMemRelease(handle);
        return 1;
    }
    std::cout << "Successfully exported handle to fd: " << fd << std::endl;

    // Write to file
    std::ofstream outfile("data.bin", std::ios::binary);
    if (!outfile) {
        std::cerr << "Failed to open output file: " << strerror(errno) << std::endl;
        cuMemUnmap(ptr, size);
        cuMemAddressFree(ptr, size);
        cuMemRelease(handle);
        return 1;
    }

    // Write header
    ShareHeader header{getpid(), size, 1};
    outfile.write(reinterpret_cast<const char*>(&header), sizeof(ShareHeader));

    // Write file descriptor
    outfile.write(reinterpret_cast<const char*>(&fd), sizeof(int));
    outfile.close();

    std::cout << "Data written to data.bin. Press Enter to continue..." << std::endl;
    std::cin.get();

    // Cleanup
    result = cuMemUnmap(ptr, size);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to unmap memory: " << getCudaErrorString(result) << std::endl;
    }

    result = cuMemAddressFree(ptr, size);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to free address range: " << getCudaErrorString(result) << std::endl;
    }

    result = cuMemRelease(handle);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to release memory handle: " << getCudaErrorString(result) << std::endl;
    }
    
    result = cuCtxDestroy(context);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to destroy CUDA context: " << getCudaErrorString(result) << std::endl;
    }

    return 0;
} 
// receiver.cpp
#include <cuda.h>
#include <cuda_runtime.h>
#include <iostream>
#include <fstream>
#include <vector>
#include <cstring>
#include <unistd.h>
#include <sys/syscall.h>

// Define syscall numbers if not available
#ifndef SYS_pidfd_open
#define SYS_pidfd_open 434
#endif
#ifndef SYS_pidfd_getfd
#define SYS_pidfd_getfd 438
#endif

struct ShareHeader {
    pid_t pid;
    size_t segment_size;
    size_t num_handles;
};

// Helper function to get CUDA error string
const char* getCudaErrorString(CUresult error) {
    const char* errorString;
    cuGetErrorString(error, &errorString);
    return errorString;
}

int main() {
    // Initialize CUDA
    CUresult result = cuInit(0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to initialize CUDA: " << getCudaErrorString(result) << std::endl;
        return 1;
    }

    // Get CUDA device
    CUdevice device;
    result = cuDeviceGet(&device, 0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to get CUDA device: " << getCudaErrorString(result) << std::endl;
        return 1;
    }

    // Create CUDA context
    CUcontext context;
    result = cuCtxCreate(&context, 0, device);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to create CUDA context: " << getCudaErrorString(result) << std::endl;
        return 1;
    }

    // Read from file
    std::ifstream infile("data.bin", std::ios::binary);
    if (!infile) {
        std::cerr << "Failed to open input file: " << strerror(errno) << std::endl;
        return 1;
    }

    // Read header
    ShareHeader header;
    infile.read(reinterpret_cast<char*>(&header), sizeof(ShareHeader));

    // Open pidfd
    auto pidfd = syscall(SYS_pidfd_open, header.pid, 0);
    if (pidfd == -1) {
        std::cerr << "pidfd_open failed: " << strerror(errno) << std::endl;
        return 1;
    }

    // Read file descriptor
    int fd = 0;
    infile.read(reinterpret_cast<char*>(&fd), sizeof(int));
    infile.close();

    // Get our own file descriptor
    auto myfd = syscall(SYS_pidfd_getfd, pidfd, fd, 0);
    if (myfd == -1) {
        std::cerr << "pidfd_getfd failed: " << strerror(errno) << std::endl;
        close(pidfd);
        return 1;
    }

    // Import handle
    CUmemGenericAllocationHandle handle;
    result = cuMemImportFromShareableHandle(
        &handle,
        reinterpret_cast<void*>(static_cast<uintptr_t>(myfd)),
        CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR
    );
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to import handle: " << getCudaErrorString(result) << std::endl;
        close(myfd);
        close(pidfd);
        return 1;
    }

    // Reserve address range
    CUdeviceptr ptr;
    result = cuMemAddressReserve(&ptr, header.segment_size, 0, 0, 0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to reserve address range: " << getCudaErrorString(result) << std::endl;
        close(myfd);
        close(pidfd);
        return 1;
    }

    // Map the memory
    result = cuMemMap(ptr, header.segment_size, 0, handle, 0);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to map memory: " << getCudaErrorString(result) << std::endl;
        cuMemAddressFree(ptr, header.segment_size);
        close(myfd);
        close(pidfd);
        return 1;
    }

    // Set access properties
    CUmemAccessDesc accessDesc = {};
    accessDesc.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
    accessDesc.location.id = 0;  // Use device 0
    accessDesc.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;

    result = cuMemSetAccess(ptr, header.segment_size, &accessDesc, 1);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to set memory access: " << getCudaErrorString(result) << std::endl;
        cuMemUnmap(ptr, header.segment_size);
        cuMemAddressFree(ptr, header.segment_size);
        close(myfd);
        close(pidfd);
        return 1;
    }

    std::cout << "Successfully imported and mapped memory at address: " << ptr << std::endl;
    std::cout << "Press Enter to continue..." << std::endl;
    std::cin.get();

    // Cleanup
    result = cuMemUnmap(ptr, header.segment_size);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to unmap memory: " << getCudaErrorString(result) << std::endl;
    }

    result = cuMemAddressFree(ptr, header.segment_size);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to free address range: " << getCudaErrorString(result) << std::endl;
    }

    result = cuMemRelease(handle);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to release memory handle: " << getCudaErrorString(result) << std::endl;
    }

    close(myfd);
    close(pidfd);

    result = cuCtxDestroy(context);
    if (result != CUDA_SUCCESS) {
        std::cerr << "Failed to destroy CUDA context: " << getCudaErrorString(result) << std::endl;
    }

    return 0;
} 

Compile with

$ nvcc receiver.cpp -o receiver -lcuda
$ nvcc sender.cpp -o sender -lcuda

In one shell, execute ./sender , and in another shell, execute ./receiver .

In some nodes, it succeeds; but in some nodes, it does not.

When it fails with Operation not permitted, execute the receiver with sudo access works.

@youkaichao
Copy link
Author

it seems, if the sender process let any process ptrace it, then it works.

add the following code to sender:

#include <sys/prctl.h>
prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY);

or python equivalent:

import ctypes

# Constants from prctl.h
PR_SET_PTRACER = 0x59616d61
PR_SET_PTRACER_ANY = -1  # Allow any process with the same UID to ptrace

libc = ctypes.CDLL("libc.so.6", use_errno=True)

result = libc.prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY, 0, 0, 0)
if result != 0:
    errno = ctypes.get_errno()
    raise OSError(errno, f"prctl(PR_SET_PTRACER, ANY) failed: {ctypes.cast(libc.strerror(errno), ctypes.c_char_p).value.decode()}")
else:
    print("✅ Allowed ptrace from any same-UID process (PR_SET_PTRACER_ANY)")

then it works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment