Jimmy Chen

A Programmer

(原创)IPCThreadState和ProcessState

  Android中的ProcessState是客户端和服务端的公共部分,作为Binder通信的基础,ProcessState是一个singleton类,每个进程只有一个对象,每个对象负责打开Binder驱动,建立线程池,让其进程里面的所有线程都能够进行binder通信。与之相关的是IPCThreadState,每个线程都有一个IPCThreadState实例登记在Linux线程的上下文附属数据中,主要负责Binder的读取、写入和请求处理。

一. ProcessState

1.1 ProcessState.h

class ProcessState : public virtual RefBase
{
public:
            // 单例模式
    static  sp<ProcessState>    self();

            void                setContextObject(const sp<IBinder>& object);
            sp<IBinder>         getContextObject(const sp<IBinder>& caller);
        
            void                setContextObject(const sp<IBinder>& object,
                                                 const String16& name);
            sp         getContextObject(const String16& name,
                                                 const sp<IBinder>& caller);
            // 启动线程池
            void                startThreadPool();
                        
    typedef bool (*context_check_func)(const String16& name,
                                       const sp<IBinder>& caller,
                                       void* userData);
            // 判断是否是ServiceManager
            bool                isContextManager(void) const;
            // 注册称为ServiceManager
            bool                becomeContextManager(
                                    context_check_func checkFunc,
                                    void* userData);

            sp<IBinder>         getStrongProxyForHandle(int32_t handle);
            wp<IBinder>         getWeakProxyForHandle(int32_t handle);
            void                expungeHandle(int32_t handle, IBinder* binder);

            void                spawnPooledThread(bool isMain);
            
            status_t            setThreadPoolMaxThreadCount(size_t maxThreads);
            void                giveThreadPoolName();

private:
    friend class IPCThreadState;
    
                                ProcessState();
                                ~ProcessState();

                                ProcessState(const ProcessState& o);
            ProcessState&       operator=(const ProcessState& o);
            String8             makeBinderThreadName();

            struct handle_entry {
                IBinder* binder;
                RefBase::weakref_type* refs;
            };

            handle_entry*       lookupHandleLocked(int32_t handle);

            // 保存打开的binder驱动的文件描述符
            int                 mDriverFD;
            void*               mVMStart;

            // Protects thread count variable below.
            pthread_mutex_t     mThreadCountLock;
            pthread_cond_t      mThreadCountDecrement;
            // Number of binder threads current executing a command.
            size_t              mExecutingThreadsCount;
            // Maximum number for binder threads allowed for this process.
            size_t              mMaxThreads;

    mutable Mutex               mLock;  // protects everything below.

            Vector<handle_entry>mHandleToObject;

            bool                mManagesContexts;
            context_check_func  mBinderContextCheckFunc;
            void*               mBinderContextUserData;

            // 映射,服务名字和IBinder对应
            KeyedVector<String16, sp<IBinder> >
                                mContexts;


            String8             mRootDir;
            // 标记线程池是否已经建立
            bool                mThreadPoolStarted;
            // 记录当前进程中启动的线程个数
volatile int32_t            mThreadPoolSeq;
};

接下来看看ProcessState类中几个重要的方法。

1.2 ProcessState::self()函数

sp<ProcessState> ProcessState::self()
{
    Mutex::Autolock _l(gProcessMutex);
    if (gProcess != NULL) {
        return gProcess;
    }
    gProcess = new ProcessState;
    return gProcess;
}

  self函数好理解,单例模式,一个进程中只有一个,如果没有创建就调用ProcessState的构造函数,下面看看ProcessState的构造函数。

1.3 ProcessState的构造函数

ProcessState::ProcessState()
    // 在这里调用open_driver函数,打开binder驱动
    : mDriverFD(open_driver())
    , mVMStart(MAP_FAILED)
    , mThreadCountLock(PTHREAD_MUTEX_INITIALIZER)
    , mThreadCountDecrement(PTHREAD_COND_INITIALIZER)
    , mExecutingThreadsCount(0)
    , mMaxThreads(DEFAULT_MAX_BINDER_THREADS)
    , mManagesContexts(false)
    , mBinderContextCheckFunc(NULL)
    , mBinderContextUserData(NULL)
    , mThreadPoolStarted(false)
    , mThreadPoolSeq(1)
{
    // 确认Binder驱动已经打开并获得文件描述符
    if (mDriverFD >= 0) {
        // XXX Ideally, there should be a specific define for whether we
        // have mmap (or whether we could possibly have the kernel module
        // availabla).
#if !defined(HAVE_WIN32_IPC)
        // 调用mmap获取内存映射
        // mmap the binder, providing a chunk of virtual address space to receive transactions.
        mVMStart = mmap(0, BINDER_VM_SIZE, PROT_READ, MAP_PRIVATE | MAP_NORESERVE, mDriverFD, 0);
        if (mVMStart == MAP_FAILED) {
            // *sigh*
            ALOGE("Using /dev/binder failed: unable to mmap transaction memory.\n");
            close(mDriverFD);
            mDriverFD = -1;
        }
#else
        mDriverFD = -1;
#endif
    }

    LOG_ALWAYS_FATAL_IF(mDriverFD < 0, "Binder driver could not be opened.  Terminating.");
}

1.4 open_driver()函数

static int open_driver()
{
    // 打开binder驱动,获取文件描述符
    int fd = open("/dev/binder", O_RDWR);
    if (fd >= 0) {
    // FD_CLOEXEC意思是close on exec,即调用exec创建线程就会关闭这个文件描述符,但是如果调用fork就不会
        fcntl(fd, F_SETFD, FD_CLOEXEC);
        int vers = 0;
        // 获取binder的版本信息
        status_t result = ioctl(fd, BINDER_VERSION, &vers);
        if (result == -1) {
            ALOGE("Binder ioctl to obtain version failed: %s", strerror(errno));
            close(fd);
            fd = -1;
        }
        if (result != 0 || vers != BINDER_CURRENT_PROTOCOL_VERSION) {
            ALOGE("Binder driver protocol does not match user space protocol!");
            close(fd);
            fd = -1;
        }
        // 设置进行进程binder最大线程数
        size_t maxThreads = DEFAULT_MAX_BINDER_THREADS;
        result = ioctl(fd, BINDER_SET_MAX_THREADS, &maxThreads);
        if (result == -1) {
            ALOGE("Binder ioctl to set max threads failed: %s", strerror(errno));
        }
    } else {
        ALOGW("Opening '/dev/binder' failed: %s\n", strerror(errno));
    }
    return fd;
}

1.5 startThreadPool()函数

void ProcessState::startThreadPool()
{
    AutoMutex _l(mLock);
    if (!mThreadPoolStarted) {
        mThreadPoolStarted = true;
        spawnPooledThread(true);
    }
}

  代码比较短,这里继续调用到spawnPooledThread函数,传进去的是true,这里稍微记一下。下面看看spawnPooledThread函数

1.6 spawnPooledThread()函数

void ProcessState::spawnPooledThread(bool isMain)
{
    // mThreadPoolStarted已经在startThreadPool函数中设置为true了
    if (mThreadPoolStarted) {
        String8 name = makeBinderThreadName();
        ALOGV("Spawning new pooled thread, name=%s\n", name.string());
        // 创建PoolThread实例,传入true,并调用其run方法
        sp<Thread> t = new PoolThread(isMain);
        t->run(name.string());
    }
}

  接下来看PoolThread类发现PoolThread类是继承了Thread类的,所以当调用其run方法是会自动调用其threadLoop方法,下面看PoolThread类

1.7 PoolThread类

class PoolThread : public Thread
{
public:
    PoolThread(bool isMain)
        : mIsMain(isMain)
    {
    }

protected:
    virtual bool threadLoop()
    {
        // 这里调用IPCThreadState的self方法获取IPCThreadState实例,并调用其joinThreadPool方法,传入true
        IPCThreadState* ipc = IPCThreadState::self();
        if(ipc)
            ipc->joinThreadPool(mIsMain);
        //IPCThreadState::self()->joinThreadPool(mIsMain);
        return false;
    }

    const bool mIsMain;
};

  这里先看到这里,其余的函数大概看一下基本也能明白是用来干什么的,在后续的binder文章中应该会看到其中的大部分。

二. IPCThreadState

2.1 IPCThreadState.h

class IPCThreadState
{
public:
    // 单例模式,获取实例对象
    static  IPCThreadState*     self();
    static  IPCThreadState*     selfOrNull();  // self(), but won't instantiate
    
            sp<ProcessState>    process();
            
            status_t            clearLastError();

            pid_t               getCallingPid() const;
            uid_t               getCallingUid() const;

            void                setStrictModePolicy(int32_t policy);
            int32_t             getStrictModePolicy() const;

            void                setLastTransactionBinderFlags(int32_t flags);
            int32_t             getLastTransactionBinderFlags() const;

            int64_t             clearCallingIdentity();
            void                restoreCallingIdentity(int64_t token);
            
            int                 setupPolling(int* fd);
            status_t            handlePolledCommands();
            void                flushCommands();

            // 加入线程池
            void                joinThreadPool(bool isMain = true);
            
            // Stop the local process.
            void                stopProcess(bool immediate = true);
            
            // 进行数据传输
            status_t            transact(int32_t handle,
                                         uint32_t code, const Parcel& data,
                                         Parcel* reply, uint32_t flags);

            void                incStrongHandle(int32_t handle);
            void                decStrongHandle(int32_t handle);
            void                incWeakHandle(int32_t handle);
            void                decWeakHandle(int32_t handle);
            status_t            attemptIncStrongHandle(int32_t handle);
    static  void                expungeHandle(int32_t handle, IBinder* binder);
            status_t            requestDeathNotification(   int32_t handle,
                                                            BpBinder* proxy); 
            status_t            clearDeathNotification( int32_t handle,
                                                        BpBinder* proxy); 

    static  void                shutdown();

    // Call this to disable switching threads to background scheduling when
    // receiving incoming IPC calls.  This is specifically here for the
    // Android system process, since it expects to have background apps calling
    // in to it but doesn't want to acquire locks in its services while in
    // the background.
    static  void                disableBackgroundScheduling(bool disable);

            // Call blocks until the number of executing binder threads is less than
            // the maximum number of binder threads threads allowed for this process.
            void                blockUntilThreadAvailable();

private:
                                IPCThreadState();
                                ~IPCThreadState();
            // 发送回复数据
            status_t            sendReply(const Parcel& reply, uint32_t flags);
            // 等待回应数据
            status_t            waitForResponse(Parcel *reply,
                                                status_t *acquireResult=NULL);
            // 进行数据传输
            status_t            talkWithDriver(bool doReceive=true);
            status_t            writeTransactionData(int32_t cmd,
                                                     uint32_t binderFlags,
                                                     int32_t handle,
                                                     uint32_t code,
                                                     const Parcel& data,
                                                     status_t* statusBuffer);
            status_t            getAndExecuteCommand();
            // 执行指定command
            status_t            executeCommand(int32_t command);
            void                processPendingDerefs();

            void                clearCaller();

    static  void                threadDestructor(void *st);
    static  void                freeBuffer(Parcel* parcel,
                                           const uint8_t* data, size_t dataSize,
                                           const binder_size_t* objects, size_t objectsSize,
                                           void* cookie);
    
    const   sp<ProcessState>    mProcess;
    const   pid_t               mMyThreadId;
            Vector<BBinder*>    mPendingStrongDerefs;
            Vector<RefBase::weakref_type*> mPendingWeakDerefs;

            Parcel              mIn;
            Parcel              mOut;
            status_t            mLastError;
            pid_t               mCallingPid;
            uid_t               mCallingUid;
            int32_t             mStrictModePolicy;
            int32_t             mLastTransactionBinderFlags;
};

2.2 IPCThreadState::self()函数

IPCThreadState* IPCThreadState::self()
{
    if (gHaveTLS) {
restart:
        const pthread_key_t k = gTLS;
        IPCThreadState* st = (IPCThreadState*)pthread_getspecific(k);
        if (st) return st;
        return new IPCThreadState;
    }

#ifdef _MTK_ENG_BUILD_
    if (gShutdown) {
        IPCThreadState* st = (IPCThreadState*)pthread_getspecific(gTLS);
        ALOGD("IPCThreadState 0x%p, gTLS:%d gHaveTLS:%d\n", &st, gTLS, gHaveTLS);
        return NULL;
    }
#else
    if (gShutdown) return NULL;
#endif

    pthread_mutex_lock(&gTLSMutex);
    if (!gHaveTLS) {
        if (pthread_key_create(&gTLS, threadDestructor) != 0) {
            pthread_mutex_unlock(&gTLSMutex);
            return NULL;
        }
        gHaveTLS = true;
    }
    pthread_mutex_unlock(&gTLSMutex);
    goto restart;
}

  pthread_getspecific、pthread_setspecific和pthread_key_create都和线程局部性有关。函数pthread_setspecific将pointer的值与key相关联。而函数pthread_getspecific则是将与key相关联的数据读取出来。返回的数据类型都是void *,所以可以指向任何的数据类型。同时因为是线程相关的,所以不同线程调用pthread_setspecific设置与key关联的值后,并不会影响到其他的线程。举例来说就是,在线程A中调用pthread_setspecific设置与key相关的值为a,线程B中调用pthread_setspecific设置与key相关的值为b,那么A线程中调用pthread_getspecific获取与key关联的值会得到a,并不会因为在线程B中设置为b了而影响到A线程中的值。

  那么具体的与key关联的值是在IPCThreadState的构造函数中设置的

IPCThreadState::IPCThreadState()
    : mProcess(ProcessState::self()),
      mMyThreadId(gettid()),
      mStrictModePolicy(0),
      mLastTransactionBinderFlags(0)
{
    pthread_setspecific(gTLS, this);
    clearCaller();
    mIn.setDataCapacity(256);
    mOut.setDataCapacity(256);
}

其中与gTLS相关联的值就是IPCThreadState自己了。

2.3 joinThreadpool函数

  在ProcessState的PoolThread中最后会调用到IPCThreadState的joinThreadPool函数,下面就看看这个函数做了些什么事情。

void IPCThreadState::joinThreadPool(bool isMain)
{
    LOG_THREADPOOL("**** THREAD %p (PID %d) IS JOINING THE THREAD POOL\n", (void*)pthread_self(), getpid());

    // 创建Binder线程
    mOut.writeInt32(isMain ? BC_ENTER_LOOPER : BC_REGISTER_LOOPER);
    
    // This thread may have been spawned by a thread that was in the background
    // scheduling group, so first we will make sure it is in the foreground
    // one to avoid performing an initial transaction in the background.
    // 设置调度策略
    set_sched_policy(mMyThreadId, SP_FOREGROUND);
        
    status_t result;
    do {
        processPendingDerefs();
        // now get the next command to be processed, waiting if necessary
        // 处理下一条指令
        result = getAndExecuteCommand();

        if (result < NO_ERROR && result != TIMED_OUT && result != -ECONNREFUSED && result != -EBADF) {
            ALOGE("getAndExecuteCommand(fd=%d) returned unexpected error %d, aborting",
                  mProcess->mDriverFD, result);
            abort();
        }
        
        // Let this thread exit the thread pool if it is no longer
        // needed and it is not the main process thread.
        // 超时而且不是主线程就退出循环
        if(result == TIMED_OUT && !isMain) {
            break;
        }
    } while (result != -ECONNREFUSED && result != -EBADF);

    LOG_THREADPOOL("**** THREAD %p (PID %d) IS LEAVING THE THREAD POOL err=%p\n",
        (void*)pthread_self(), getpid(), (void*)result);
    // 退出线程
    mOut.writeInt32(BC_EXIT_LOOPER);
    // false代表bwr数据的read_buffer为空
    talkWithDriver(false);
}

mOut.writeInt32(isMain ? BC_ENTER_LOOPER : BC_REGISTER_LOOPER);根据这行代码可以得出这两个点:

  • 如果isMain为true,那么写到mOut中的command为BC_ENTER_LOOPER,代表该线程是主线程,再根据if(result == TIMED_OUT && !isMain),可以得知主线程是不会退出循环的。
  • 如果isMain为false,那么写到mOut中的command为BC_REGISTER_LOOPER代表向binder驱动中注册一个线程,如果该线程在处理command时超时,那么就会退出线程循环。

2.4 getAndExecuteCommand函数

status_t IPCThreadState::getAndExecuteCommand()
{
    status_t result;
    int32_t cmd;

    // 与binder驱动交互
    result = talkWithDriver();
    if (result >= NO_ERROR) {
        // 判断mIn中的数据并读取cmd值
        size_t IN = mIn.dataAvail();
        if (IN < sizeof(int32_t)) return result;
        cmd = mIn.readInt32();
        IF_LOG_COMMANDS() {
            alog << "Processing top-level Command: "
                 << getReturnString(cmd) << endl;
        }

        pthread_mutex_lock(&mProcess->mThreadCountLock);
        mProcess->mExecutingThreadsCount++;
        pthread_mutex_unlock(&mProcess->mThreadCountLock);

        // 执行相应的command值
        result = executeCommand(cmd);

        pthread_mutex_lock(&mProcess->mThreadCountLock);
        mProcess->mExecutingThreadsCount--;
        pthread_cond_broadcast(&mProcess->mThreadCountDecrement);
        pthread_mutex_unlock(&mProcess->mThreadCountLock);

        // After executing the command, ensure that the thread is returned to the
        // foreground cgroup before rejoining the pool.  The driver takes care of
        // restoring the priority, but doesn't do anything with cgroups so we
        // need to take care of that here in userspace.  Note that we do make
        // sure to go in the foreground after executing a transaction, but
        // there are other callbacks into user code that could have changed
        // our group so we want to make absolutely sure it is put back.
        set_sched_policy(mMyThreadId, SP_FOREGROUND);
    }

    return result;
}

2.5 talkWithDriver函数

status_t IPCThreadState::talkWithDriver(bool doReceive)
{
    if (mProcess->mDriverFD <= 0) {
        return -EBADF;
    }
    
    // 创建binder_write_read结构体
    binder_write_read bwr;
    
    // Is the read buffer empty?
    // 判断是否有数据未读取完
    const bool needRead = mIn.dataPosition() >= mIn.dataSize();
    
    // 判断是否有数据需要写到bwr中
    const size_t outAvail = (!doReceive || needRead) ? mOut.dataSize() : 0;
    
    bwr.write_size = outAvail;
    bwr.write_buffer = (uintptr_t)mOut.data();

    // This is what we'll read.
    if (doReceive && needRead) {
        bwr.read_size = mIn.dataCapacity();
        bwr.read_buffer = (uintptr_t)mIn.data();
    } else {
        bwr.read_size = 0;
        bwr.read_buffer = 0;
    }

    ……………..
    
    // Return immediately if there is nothing to do.
    // 如果所有数据都处理完了,就直接返回
    if ((bwr.write_size == 0) && (bwr.read_size == 0)) return NO_ERROR;

    bwr.write_consumed = 0;
    bwr.read_consumed = 0;
    status_t err;
    do {
#if defined(HAVE_ANDROID_OS)
        // 会调用到Binder驱动的ioctl操作
        if (ioctl(mProcess->mDriverFD, BINDER_WRITE_READ, &bwr) >= 0)
            err = NO_ERROR;
        else {
            err = -errno;
        }
#else
        err = INVALID_OPERATION;
#endif
        if (mProcess->mDriverFD <= 0) {
            err = -EBADF;
        }
    } while (err == -EINTR);

    …………..

    if (err >= NO_ERROR) {
        if (bwr.write_consumed > 0) {
            if (bwr.write_consumed < mOut.dataSize())
                mOut.remove(0, bwr.write_consumed);
            else
                mOut.setDataSize(0);
        }
        if (bwr.read_consumed > 0) {
            mIn.setDataSize(bwr.read_consumed);
            mIn.setDataPosition(0);
        }
        ………….
        return NO_ERROR;
    }
    
    return err;
}

  这里最后会调用到Binder驱动的相应功能,具体的函数这里先略过,后面计划在理解Binder数据传输流程的时候再看吧。

  回到getAndExecuteCommand函数,然后会对返回的结果进行解析获取对应的cmd值,然后再执行executeCommand函数。

2.6 executeCommand函数

这里只看BC_ENTER_LOOPER返回的相应值

status_t IPCThreadState::executeCommand(int32_t cmd)
{
    BBinder* obj;
    RefBase::weakref_type* refs;
    status_t result = NO_ERROR;

    ………………………………..  
     
    case BR_SPAWN_LOOPER:
        mProcess->spawnPooledThread(false);
        break;
        
    default:
#ifdef _MTK_ENG_BUILD_
        ALOGD("*** BAD COMMAND %d received from Binder driver\n", cmd);
#else
        printf("*** BAD COMMAND %d received from Binder driver\n", cmd);
#endif
        result = UNKNOWN_ERROR;
        break;
    }

    if (result != NO_ERROR) {
#ifdef _MTK_ENG_BUILD_
        ALOGD("EXECMD cmd %d return %d\n", cmd, (int32_t)result);
#endif
        mLastError = result;
    }
    
    return result;
}

  这里最后会调用到ProcessState里面的spawnPooledThread函数,无非也是在里面执行start开启一个线程

发表评论

电子邮件地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d 博主赞过: