Linux 内核 schedule时的preemption notify机制

内核进行进程切换时,先调用了__schedule,在关抢占后调用context_switch:
static void __sched __schedule(void)
{
    struct task_struct *prev, *next;
    unsigned long *switch_count;
    struct rq *rq;
    int cpu;

need_resched:
    preempt_disable();
    cpu = smp_processor_id();
    rq = cpu_rq(cpu);
...
raw_spin_lock_irq(&rq->lock);
...

context_switch(rq, prev, next);

...

sched_preempt_enable_no_resched();
static inline void
context_switch(struct rq *rq, struct task_struct *prev,
           struct task_struct *next)
{
    struct mm_struct *mm, *oldmm;

    prepare_task_switch(rq, prev, next);

    mm = next->mm;
    oldmm = prev->active_mm;
…

    finish_task_switch(this_rq(), prev);
prepare_task_switch里面调用fire_sched_out_preempt_notifiers,进而调用prev进程注册的sched_out操作,
static inline void
prepare_task_switch(struct rq *rq, struct task_struct *prev,
            struct task_struct *next)
{
    trace_sched_switch(prev, next);
    sched_info_switch(prev, next);
    perf_event_task_sched_out(prev, next);
    fire_sched_out_preempt_notifiers(prev, next);
    prepare_lock_switch(rq, next);
    prepare_arch_switch(next);
}
这里是分支:如果是新创建的进程被调度了,要调用schedule_tail:
/**
* schedule_tail - first thing a freshly forked thread must call.
* @prev: the thread we just switched away from.
*/
asmlinkage void schedule_tail(struct task_struct *prev)
    __releases(rq->lock)
{
    struct rq *rq = this_rq();

    finish_task_switch(rq, prev);

    /*
     * FIXME: do we need to worry about rq being invalidated by the
     * task_switch?
     */
    post_schedule(rq);

#ifdef __ARCH_WANT_UNLOCKED_CTXSW
    /* In this case, finish_task_switch does not reenable preemption */
    preempt_enable();
#endif
    if (current->set_child_tid)
        put_user(task_pid_vnr(current), current->set_child_tid);
}
回到主题:finish_task_switch这里会调用fire_sched_in_preempt_notifiers:
static void fire_sched_in_preempt_notifiers(struct task_struct *curr)
{
    struct preempt_notifier *notifier;
 
    hlist_for_each_entry(notifier, &curr->preempt_notifiers, link)
        notifier->ops->sched_in(notifier, raw_smp_processor_id());
}
preempt_notifiers在哪里注册的呢,对kvm来说是这里:
static int kvm_vm_ioctl_create_vcpu(struct kvm *kvm, u32 id)
{
    int r;
    struct kvm_vcpu *vcpu, *v;
 
    if (id >= KVM_MAX_VCPUS)
        return -EINVAL;
 
    vcpu = kvm_arch_vcpu_create(kvm, id);
    if (IS_ERR(vcpu))
        return PTR_ERR(vcpu);
 
    preempt_notifier_init(&vcpu->preempt_notifier, &kvm_preempt_ops);
 
    r = kvm_arch_vcpu_setup(vcpu);

kvm模块加载的时候vmx_init->kvm_init里面初始化了kvm_preempt_ops

    kvm_preempt_ops.sched_in = kvm_sched_in;
    kvm_preempt_ops.sched_out = kvm_sched_out;
所以kvm_sched_in和kvm_sched_out被调用时的上下文是关抢占的

[MOS] deadlock detection

MOS: Modern Operating Systems

deadlock detection with one resource of each type

如果每种资源都是独一份的,可以用上面这样的图来分析资源竞争情况。
图中方块代表资源,圆形代表进程;指向进程的箭头表示资源被该进程拥有,而从进程出发的箭头表示其需要的资源。
有向图中只要存在一个闭环,就存在一个死锁

找出闭环的方法有很多种,这里给出一个比较直观的方法:
0)首先把图中所有箭头记为unmarked,初始化一个空链表L,并选中一个节点(无论是进程还是资源,这里只考虑抽象出的有向图)为当前节点
1)判断该节有没有出现在L中,如果没有则加入L尾部进入下一步;如果有则说明存在一个闭环,即发现死锁
2)当前节点如果有向外的unmarked箭头就顺着它找到下一个节点,同时将该箭头标记为marked,跳到1);如果没有unmarked箭头进入下一步
3)没有unmarked的箭头表明已经进入死胡同,删掉当前节点,回溯到源节点,将其设置为当前节点,跳到2)

deadlock detection with multiple resources of each type

当一种资源有多个可用实体时,我们使用一种基于矩阵的算法
向量E表示m种资源目前被占用的情况,A表示m种资源剩余量
矩阵C表示n个进程目前分别占用各种资源的情况,每行代表一个进程,每列是一种资源对应的各个进程占用情况,m列求和就等于Em
矩阵R表示n个进程目前对各种资源的需求量,行列意义同上
定义向量X<=Y当且仅当X的每个分量都<=Y的对应分量
算法步骤:
0)所有进程标记为unmarked
1)找到一个unmarked进程i,其占用的资源Ri<=A,下一步;没有满足该条件的进程,跳到3)
2)A=A+Ci,进程i标记为marked,跳到1)
3)如果所有进程都是marked,表示都得到了执行,没有死锁;否则,剩余的所有unmarked进程构成了死锁

虚拟机对x2apic destination mode的选择

首先简单介绍下apic的destination mode:

physical mode下高32bit(destination field)是apic id
logical mode下高32bit是MDA,又分为flat和cluster两种:
flat就是一个bitmap
而cluster包含两个部分,高16bit为cluster ID,后16bit为bitmap;这里还分flat和hierarchy;这里就不深究了。

根据intel spec:Flat logicalmode is not supported in the x2APIC mode. Hence the Destination Format Register(DFR) is eliminated in x2APIC mode. 在x2apic已经没有FDR,所以在logical mode下不再支持flat mode了,只有cluster mode由于x2apic在原来apic的8bit apicid基础上扩展到32bit

所以通过ioapic发中断和发MSI(不是MSIX)中断的设备就有问题了;他们需要做IR,interrupt remapping(VT-d提供的一项能力,通过IOMMU来做)
但是,也有些例外,比如使用physical mode且cpu个数小于256时,8bit的apic id已经完全可以cover所有的cpu了,而且这8bit直接可以写入x2apic的32bit的destination field,所以这种情况下可以直接使用x2apic physical mode而无需IR能力

https://patchwork.kernel.org/patch/2826342/这个邮件内容告诉我们,使用x2apic而同时又没有IR能力的情况,只有虚拟化场景(bios不能enable VT-d这种情况好像要被禁止了,enable x2apic一定要开VT-d)

然而,虚拟化场景下为何一定要enable x2apic呢?
IBM给出了答案Red Hat Enterprise Linux 6 implements x2APIC emulation for KVM guests. Advanced programmable interrupt controllers (APICs) are used in symmetric multiprocessor (SMP) computer systems. x2APIC is a machine state register (MSR) interface to a local APIC with performance and scalability enhancements. IBM lab tests show that enabling the x2APIC support for Red Hat Enterprise Linux 6 guests can result in 2% to 5% throughput improvement for many I/O workloads. You can enable the x2APIC support by specifying the -cpu qemu64,+x2apic option on the qemu-kvm command for a KVM guest.

然而这解释当然是隔靴搔痒,真正的解释还是要show me the patch:

commit ce69a784504222c3ab6f1b3c357d09ec5772127a
Author: Gleb Natapov <gleb@redhat.com>
Date:   Mon Jul 20 15:24:17 2009 +0300

    x86/apic: Enable x2APIC without interrupt remapping under KVM

    KVM would like to provide x2APIC interface to a guest without emulating
    interrupt remapping device. The reason KVM prefers guest to use x2APIC
    is that x2APIC interface is better virtualizable and provides better
    performance than mmio xAPIC interface:

     - msr exits are faster than mmio (no page table walk, emulation)
     - no need to read back ICR to look at the busy bit
     - one 64 bit ICR write instead of two 32 bit writes
     - shared code with the Hyper-V paravirt interface

    Included patch changes x2APIC enabling logic to enable it even if IR
    initialization failed, but kernel runs under KVM and no apic id is
    greater than 255 (if there is one spec requires BIOS to move to x2apic
    mode before starting an OS).

    -v2: fix build
    -v3: fix bug causing compiler warning

    Signed-off-by: Gleb Natapov <gleb@redhat.com>
    Acked-by: Suresh Siddha <suresh.b.siddha@intel.com>
    Cc: Sheng Yang <sheng@linux.intel.com>
    Cc: "avi@redhat.com" <avi@redhat.com>
    LKML-Reference: <20090720122417.GR5638@redhat.com>
    Signed-off-by: Ingo Molnar <mingo@elte.hu>

 

topcoder: Xscoregame

这一题用递归很简单,直接深度优先搜索所有排序,取最大值即可,但是完全没有DP的思想而且求解时间不可接受;但是如果要DP,如何减少状态数?要知道本题的输入数列排序是影响最终结果的。

下面的解法是DP的,这个DP的妙处在于仍然用输入数列A的子数列组合As作为状态,但是这个状态下面仍然分一些子状态,不过不是用这个子数组的所有排序作为子状态而是用一种hash的方式,即将该子数组的可能结果(子数组每一种排序按题目算法所求得的状态值R=x+x^As[i])对64(2的6次方)求余所得作为子状态的index。

为什么可以这么做?因为题目中数组A最大的数为50(6bit,可覆盖),所以在一个状态转移到下一状态时,即某个子数组新加入一个元素的时候,其实只需要考虑状态值R的低6位不同的情况即可,因为只有低6位有后效性,只有低6位相同的所有排序只需取最大状态值(这里不用求具体排序,主要求最大值),其他都不用考虑了,而不同的低6位值可能带来不同的有效子状态。说白了,就是当前低6位不一样的,现在不能判定以后谁优谁劣,而当前低6位相同的,已经可以毫无顾忌的进行优胜劣汰了。这样一来就大大减少了需要考虑的分支,于是求解效率就呈几何级数地提高了。

class Xscoregame:
    def getscore(self, a):
        s = {(0,0):0}
        n = len(a)

        ln = 1<<n
        for i in range(0, ln):
            for j in range(0, 64):
                if s.get((i,j), -1) < 0:
                    continue
                for idx,k in enumerate(a):
                    e = 1 << idx
                    if e & i > 0:
                        continue
                    t = s[(i,j)]
                    t += t ^ k
                    ni = i|e
                    nj = t%64
                    s[(ni, nj)] = max(s.get((ni, nj),-1), t)

        ret = 0
        nn = ln - 1
        for j in range(0, 64):
            ret = max(ret, s.get((nn,j),-1))

        return ret

if __name__ == "__main__":
    print Xscoregame().getscore([1,2,3])
    print Xscoregame().getscore([10,0,0,0])
    print Xscoregame().getscore([1,1,1,1,1,1])
    print Xscoregame().getscore([1,0,1,0,1,0,1,0])
    print Xscoregame().getscore([50,0,1,0,1,0,1,0,1,0,1,0,1,0,1])

 

topcoder: YetAnotherORProblem2

这题显示只有18%的AC率,题目本身不难,估计都是败在执行时间上了;我的解法如下,典型的DP思维,子问题是:如果已知长度为n-1的情况下,和或操作相等的数列,求长度为n的情况;状态index为每个满足条件的数列的和Sn-1(因为和或相等,也是该数列的或),状态值为该index对应的数列有多少种。状态转移方程为:

if Xn | Sn-1 == Xn + Sn-1: then Stat[Xn|Sn-1] += Stat[Sn-1]

class YetAnotherORProblem2:
    def countSequences(self, n, r):
        ls = {0:1}
        c = 0
        mc = 1
        while mc < r:
            mc <<= 1
        mc -= 1

        for i in range(0, n+1):
            if i == n:
                for k,v in ls.items():
                    c += v
                return c%1000000009
            cs = {0:0}
            for k,v in ls.items():
                mask = mc^k
                # 这里是个重要的优化,大于mask+1的值不可能满足条件
                for j in range(0, mask+1):
                    if k & j == 0:
                        cs[k+j] =  cs.get(k+j, 0) + v
            ls = cs.copy()

 

topcoder: YetAnotherTwoTeamsProblem

这是一道标记为hard的题,确实比较hard

开始用了一个比较简单暴力的方法,其实也是DP思路,但是状态取的比较偷懒,直接用了所有的可能组合作为状态,虽然在求解过程中可以利用到已经搜索过的状态,但是整个搜索空间还是太大了,导致虽然结果正确,但是严重超时了:

class YetAnotherTwoTeamsProblem:
    def count(self, skill):
        ss = 0
        for i in skill:
            ss += i
        #skill.sort()
        mask = (1 << len(skill)) - 1
        stat = {0:0}
        sm = {0:100000}
        su = {}
        c = 0
        for idx,s in enumerate(skill):
            i = 1<<(idx+1)
            for j in stat.keys():
                n = i|j
                if i == j or stat[j] < 0 or stat.get(n, 1) < 0:
                    continue

                temp = s + stat[j]
                sm[n] = min(sm[j], s)
                h = su.get(temp, {}).get(sm[n], 0)
                stat[n] = temp
                if h != 0:
                    if h == -2:
                        c += 1
                        stat[mask^n] = -1
                    elif h == -1:
                        stat[mask^n] = -1
                    continue

                d = ss - temp
                if temp < d:
                    stat[n] = temp
                elif temp == d:
                    stat[n] = -1
                    stat[mask^n] = -1
                else:
                    if temp - d < sm[n]*2:
                        stat[n] = -2
                        stat[mask^n] = -1
                        c += 1
                    else:
                        stat[n] = -1
                ht = su.get(temp, {})
                if ht == {}:
                    su[temp] = {sm[n]:stat[n]}
                else:
                    ht[sm[n]] = stat[n]

        return c

又做了一次伸手党,看了解答后醒悟了,其实在我自己的解法中已经朦胧的利用了当前考察组合的skill总和作为状态。正解是采用了skill总和作为状态index。 点我展开分析内容

topcoder: ZigZag

再来水一道简单题:题目地址

想法很自然,记录下当前的方向,一旦出现方向不一致的就把总数加一,同时方向改变,否则continue,然而其实已经包含了动态规划思想

上代码:

class ZigZag:
    def longestZigZag(self, seq):
        count = 1
        d = 0
        p = seq[0]
        for i in seq[1:]:
            if p == i:
                continue
            elif p < i:
                if d == 1:
                    p = i
                    continue
                else:
                    count += 1
                d = 1
            else:
                if d == -1:
                    p = i
                    continue
                else:
                    count += 1
                d = -1
            p = i

        return count

看了下别人提交的,思路一样,只不过有人上来先把整个序列的变化趋势算一遍,即遍历seq,然后算出seq[i]-seq[i-1]的值记录下来记为V[i],其实要的就是一个正负号,然后再次遍历还是记录当前的方向,乘以当前遍历到的那个趋势V[i],乘积发生变化时就在结果上加一并改变当前方向值。

topcoder practice: ZooExchangeProgram

为了兴趣,也为了保持coding的能力,重新开始写一些算法题;为了对算法了解的更透彻,也为了锻炼表达能力,干脆在博客里面对解题代码进行详解,代码是自己写的,但也不排除参考了别人的解法,消化吸收了就是自己的,不是吗?

题面:

太长了,就不贴了,链接在此

代码及详解:

class ZooExchangeProgram:
    def getNumber(self, label, lower, upper):
        l = list(label)
        # 这个r就是我们最后要通过各种现有label组的合并达到的目标集合
        r = range(lower, upper+1)

        # 简单的将无解的情况排除
        for i in r:
            if i not in l:
                return -1

        # 按照给定的目标集合,将所有在包含于目标集合的label组都整理出来
        groups = []
        start = 0
        for i,x in enumerate(l):
            if x not in r:
                if start < i:
                    groups.append(set(l[start:i]))
                start = i + 1
            elif i == len(l) - 1:
                groups.append(set(l[start:i+1]))

        # 去除重复的组,因为我们最终求解的是达成目标集合所需要最小的组数量,答案不用给出包含具体某个组的信息,所以去重不影响结果,只会加速求解;这一段代码体现了python的简洁,一个in关键字就可以代替C语言的compare函数,虽然背后的效率肯定比C低,但是这对让程序员关注算法本身提供了极大便利。当然还有其它更加高效的list of lists去重方法,比如使用库itertools,但是本题的限制了label的数量为50个之内,所以最多有50个组,双重循环一次也还能接受。
        ng = []
        for i in groups:
            if i not in ng:
                ng.append(i)

        # tricky part: 这里开始才是本题的精髓,使用的是动态规划;状态定义为某个组所覆盖的元素集合,或者多个组的并集所覆盖的元素集合,最终状态就是目标集合,而每个组,或者每种多组的并集所构成的所有状态都是中间状态;而且,达成同一种状态可能有多种组合形式;暴力求解方法就是搜索计算所有这些组构成的全部组合形式,将其中构成最终状态的组合记录下来,然后从中找到由最少组构成的组合;这种方法所需计算量非常大,因为其搜索过程中放弃了很多中间结果,为了搜索到一组可能解,过程中走过的弯路,在下一组求解中可能仍然要重新走一遍,这是极大的浪费;动态规划思想记录下搜索过程中的任何一次尝试结果。本题中,我们要纪录的就是每次搜索到某个中间状态时所需要的组合数,下次再组合出这种状态时,比较以前记录下来的值,如果本次组合数更小则更新记录;这样,只需遍历一次所有组合,就能得到所有可能的中间状态的最小组合数,届时只需要从中选择那个最终状态对应的组合数即可。
        count = [0] * 50
        state = {0:0}
        mode = []
        mask = 0
        ret = 0

        # 这里是为了优化程序的执行时间,需要将那些必须选入组合的组排除出我们搜索的范围,反正无论如何它们都得入选,我们干嘛还要去考虑它们呢,博主一开始就是没有排除它们,在提交后被判失败,自己测试发现其中一个case居然耗费了12s之多。哪些是必选的呢,就是那些包含了别人都没有的元素的组,到这里我们所说的组都是包含于目标集合的,所以有独一无二的元素的组必须入选。
        # 先计算所有目标集合中的元素哪些是只存在于一个组中的
        for i in ng:
            for j in i:
                count[j] += 1
        # 对独一无二的元素,我们单独处理,把它们排除在搜索范围(搜索范围的组构成mode)内,然后将它们所在的组的并集mask记录下来;其它的组计算出它们的状态值作为key保存在字典state里面,对应的value记为1,即这种状态显然只需要对应的单个组自己就能构成。
        # 之所以用字典,而不用list,因为list的索引必须是连续的,即使中间很多item都没有用到,也要为他们分配内存,这样的话会浪费大量的内存,比如我们这题当中的label最大可以到50,也就是说从1到50,这50个label会有1<<50种可能的状态组合,那我们需要一个1<<50,即1*2^50,即使用最小的byte型数组(python好像不能指定类型),那也是1024TB的内存,博主开始也没有意识到这个问题,直接用了list,在测试过程中python报memory error才恍然发现了这么个大bug。而python提供了字典这么好的一个数据结构,使用字典可以做到用多少给多少,大大大的减少了内存浪费。
        for i in ng:
            temp = 0
            only = False
            for j in i:
                if count[j] == 1:
                    only = True
                temp += 1<<(j-1)
            if not only:
                mode.append(temp)
                state[temp] = 1
            else:
                mask |= temp
                ret += 1

        if mask == (1<<upper)-(1<<(lower-1)):
            return ret

        best = 50 # 不会比50个组的组合更差的情况了
        for m in mode:
            for i in state.keys():
                # 新的状态可由老状态和m组成,但未必比历史记录更优
                state[i|m] = min(state.get(i|m,50), state.get(i,50)+1)
                if mask|i|m  == (1<<upper)-(1<<(lower-1)):
                   best = min(best, ret+state[i|m])

        return best

Mac新手键盘映射探索

作为Linux老司机都是重度终端依赖症患者,unix终端对ctrl键的摧残各位老司机都懂的;换了Mac后发现键盘极其反人类,右边control没有,左边control那么小而且位置感人,ctrl+r这种分分钟用的着的快捷键在Mac上换了各种姿势都不爽,这是要把左手按残的节奏阿,左手可是很重要的,咳,我说的是敲代码。

谷歌了一把键盘映射,发现一个叫keyremap4macbook的应用,几个论坛里面都强烈推荐,现在的版本叫做karabiner;先安装了前者,居然还要我重启,为了左手的解放勉为其难的重启了下,结果还是徒劳,完全没有任何反应,估计是版本的问题。这种ugly的非App Store应用折腾了我半天还无效,干脆就放弃了。

接着再求助搜索引擎,找到了这个,虽然是短短一行,但是却提醒了我,看来还是需要再探索一下Mac系统自身设置。人肉探索了一段时间后,发现了一个可以利用的点,如图所示,虽然注释文字描述的比较抽象,但是下面的菜单列表对正常智商的人来说一目了然了,显然是个映射关系,那么我们将那个“大户”command键合control键对调一下是否合理呢,我目前看来是合理的,作为程序员最需要效率的就是在terminal中挥散汗水的时间,而其他的操作对快捷键要求不高,那么我们需要的正是让control键无处不在,触手可及!