以下是基于Python的Amazon Mechanical Turk(MTurk)进阶教程,涵盖复杂任务设计、质量控制、批量操作等实用功能,帮助你更高效地管理众包任务。

一、复杂任务设计(XML模板进阶)

MTurk任务内容需通过XML格式定义,支持多种交互类型(如多选题、图片标注、文本输入等)。以下是几个实用案例:

1. 多选题任务
import boto3
import xml.etree.ElementTree as ET

def create_multiple_choice_question(question_text, options):
    """生成多选题XML模板"""
    root = ET.Element("QuestionForm")
    question = ET.SubElement(root, "Question")
    
    # 问题标题
    title = ET.SubElement(question, "QuestionContent")
    ET.SubElement(title, "Text").text = question_text
    
    # 选项设置
    answer_spec = ET.SubElement(question, "AnswerSpecification")
    selection_answer = ET.SubElement(answer_spec, "SelectionAnswer")
    ET.SubElement(selection_answer, "MinSelectionCount").text = "1"  # 至少选1项
    ET.SubElement(selection_answer, "MaxSelectionCount").text = "1"  # 最多选1项
    
    # 选项列表
    options_list = ET.SubElement(selection_answer, "Selections")
    for idx, option in enumerate(options):
        selection = ET.SubElement(options_list, "Selection")
        ET.SubElement(selection, "SelectionIdentifier").text = f"opt{idx}"
        ET.SubElement(selection, "Text").text = option
    
    return ET.tostring(root, encoding="unicode")

# 初始化客户端(沙盒环境)
mturk = boto3.client(
    'mturk',
    region_name='us-east-1',
    endpoint_url='https://mturk-requester-sandbox.us-east-1.amazonaws.com',
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY'
)

# 创建多选题任务
question_xml = create_multiple_choice_question(
    question_text="以下哪种动物是哺乳动物?",
    options=["鳄鱼", "蝙蝠", "企鹅", "鲨鱼"]
)

response = mturk.create_hit(
    Title="动物分类选择题",
    Description="判断哪种动物属于哺乳动物(单选)",
    Reward="0.02",
    MaxAssignments=3,  # 3个工人回答,用于交叉验证
    LifetimeInSeconds=86400,
    AssignmentDurationInSeconds=60,
    Question=question_xml
)

print(f"HIT创建成功,ID: {response['HIT']['HITId']}")
2. 图片标注任务(带URL的图像判断)
def create_image_annotation_question(image_url, question_text):
    """生成图片标注任务XML(判断图片内容)"""
    root = ET.Element("QuestionForm")
    question = ET.SubElement(root, "Question")
    
    # 问题内容(包含图片)
    content = ET.SubElement(question, "QuestionContent")
    ET.SubElement(content, "Text").text = question_text
    # 插入图片(支持JPG/PNG,需公开可访问URL)
    image = ET.SubElement(content, "Binary")
    ET.SubElement(image, "MimeType").text = "image/jpeg"
    ET.SubElement(image, "DataURL").text = image_url
    
    # 回答格式(文本输入)
    answer_spec = ET.SubElement(question, "AnswerSpecification")
    free_text = ET.SubElement(answer_spec, "FreeTextAnswer")
    ET.SubElement(free_text, "MaxLength").text = "200"  # 限制回答长度
    
    return ET.tostring(root, encoding="unicode")

# 创建图片标注任务
image_question_xml = create_image_annotation_question(
    image_url="https://picsum.photos/id/237/300/200",  # 示例图片(小狗)
    question_text="请描述图片中动物的动作和表情"
)

response = mturk.create_hit(
    Title="图片内容描述",
    Description="观察图片并描述动物状态",
    Reward="0.05",
    MaxAssignments=2,
    LifetimeInSeconds=86400,
    AssignmentDurationInSeconds=120,
    Question=image_question_xml
)

二、质量控制:资格筛选与审核

通过资格测试(Qualification)筛选高质量工人,避免低质回答。

1. 创建资格类型(筛选工人)
def create_qualification():
    """创建自定义资格类型(如"英语水平测试")"""
    response = mturk.create_qualification_type(
        Name="English_Proficiency_Test",
        Description="筛选英语水平合格的工人",
        QualificationTypeStatus="Active",
        AutoGranted=False,  # 不自动授予,需通过测试
        # 可选:设置最低分数要求
        # IntegerValueMinimum=70,  # 若为测试型资格,需达到70分以上
    )
    return response['QualificationType']['QualificationTypeId']

# 创建资格类型
qual_id = create_qualification()
print(f"资格类型创建成功,ID: {qual_id}")
2. 绑定资格到HIT(限制只有合格工人可接任务)
# 在创建HIT时添加资格要求
response = mturk.create_hit(
    Title="英语短句翻译",
    Description="将中文短句翻译成英语(仅限通过英语测试的工人)",
    Reward="0.03",
    MaxAssignments=5,
    LifetimeInSeconds=86400,
    AssignmentDurationInSeconds=60,
    Question=create_multiple_choice_question(
        question_text="翻译:'我喜欢编程'",
        options=["I like code.", "I like programming.", "I love code."]
    ),
    # 资格要求:仅允许拥有指定资格的工人接任务
    QualificationRequirements=[
        {
            "QualificationTypeId": qual_id,
            "Comparator": "Exists",  # 只要拥有该资格即可
            "ActionsGuarded": "Accept"  # 限制接任务权限
        }
    ]
)
3. 审核工人回答并授予资格
def approve_and_grant_qualification(assignment_id, worker_id, qual_id):
    """审核通过后授予工人资格"""
    # 批准任务(支付报酬)
    mturk.approve_assignment(
        AssignmentId=assignment_id,
        RequesterFeedback="回答正确,感谢参与!"
    )
    # 授予资格
    mturk.associate_qualification_with_worker(
        QualificationTypeId=qual_id,
        WorkerId=worker_id,
        IntegerValue=100,  # 资格分数(可选)
        SendNotification=True  # 通知工人
    )

# 示例:获取任务结果并审核
hit_id = "YOUR_HIT_ID"
assignments = mturk.list_assignments_for_hit(HITId=hit_id, AssignmentStatuses=["Submitted"])

for assign in assignments['Assignments']:
    worker_id = assign['WorkerId']
    assignment_id = assign['AssignmentId']
    answer = assign['Answer']  # 解析回答(XML格式)
    
    # 简化逻辑:假设所有回答都合格(实际需根据业务判断)
    approve_and_grant_qualification(assignment_id, worker_id, qual_id)

三、批量操作:高效管理大量任务

当需要发布成百上千个任务时,批量处理可大幅提升效率。

1. 批量创建HIT(使用循环+任务列表)
def batch_create_hits(task_list, reward="0.02"):
    """批量创建HIT(任务列表格式:[(问题文本, 选项列表), ...])"""
    hit_ids = []
    for idx, (question_text, options) in enumerate(task_list):
        question_xml = create_multiple_choice_question(question_text, options)
        response = mturk.create_hit(
            Title=f"批量任务 {idx+1}",
            Description="批量处理的选择题任务",
            Reward=reward,
            MaxAssignments=1,
            LifetimeInSeconds=86400,
            AssignmentDurationInSeconds=60,
            Question=question_xml
        )
        hit_ids.append(response['HIT']['HITId'])
        print(f"已创建第{idx+1}个HIT,ID: {response['HIT']['HITId']}")
    return hit_ids

# 示例任务列表
task_list = [
    ("1+1等于几?", ["1", "2", "3"]),
    ("地球是什么形状?", ["圆形", "方形", "三角形"]),
    ("以下哪个是编程语言?", ["HTML", "Python", "PDF"])
]

# 批量创建
batch_hit_ids = batch_create_hits(task_list)
2. 批量获取结果并导出到CSV
import csv
from xml.etree.ElementTree import XML

def extract_answer(answer_xml):
    """解析工人回答的XML格式"""
    root = XML(answer_xml)
    # 提取选择型回答(根据任务类型调整解析逻辑)
    for answer in root.findall(".//SelectionIdentifier"):
        return answer.text
    # 提取文本型回答
    for answer in root.findall(".//FreeText"):
        return answer.text
    return "未找到回答"

def batch_export_results(hit_ids, output_file="mturk_results.csv"):
    """批量获取HIT结果并导出到CSV"""
    with open(output_file, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["HIT ID", "工人ID", "回答内容", "提交时间"])
        
        for hit_id in hit_ids:
            assignments = mturk.list_assignments_for_hit(
                HITId=hit_id,
                AssignmentStatuses=["Submitted", "Approved"]
            )
            for assign in assignments.get("Assignments", []):
                answer = extract_answer(assign["Answer"])
                writer.writerow([
                    hit_id,
                    assign["WorkerId"],
                    answer,
                    assign["SubmitTime"]
                ])
    print(f"结果已导出至 {output_file}")

# 导出批量任务的结果
batch_export_results(batch_hit_ids)

四、错误处理与最佳实践

1. 异常捕获与重试机制
from botocore.exceptions import BotoCoreError, ClientError
import time

def safe_create_hit(question_xml, max_retries=3):
    """带重试机制的HIT创建函数"""
    for attempt in range(max_retries):
        try:
            return mturk.create_hit(
                Title="带错误处理的任务",
                Description="演示异常捕获",
                Reward="0.01",
                MaxAssignments=1,
                LifetimeInSeconds=86400,
                AssignmentDurationInSeconds=60,
                Question=question_xml
            )
        except (BotoCoreError, ClientError) as e:
            print(f"创建HIT失败(第{attempt+1}次尝试):{str(e)}")
            if attempt < max_retries - 1:
                time.sleep(2 **attempt)  # 指数退避重试
    raise Exception("达到最大重试次数,创建HIT失败")
2. 成本与资源控制

-** 批量删除过期HIT **:避免未完成任务占用配额

def delete_expired_hits():
    """删除已过期且未完成的HIT"""
    hits = mturk.list_hits()['HITs']
    for hit in hits:
        if hit['HITStatus'] in ["Expired", "Reviewable"]:
            mturk.delete_hit(HITId=hit['HITId'])
            print(f"已删除HIT:{hit['HITId']}")

-** 监控账户余额 **:避免超额支出

def check_balance():
    """查询MTurk账户余额(美元)"""
    response = mturk.get_account_balance()
    return float(response['AvailableBalance'])

print(f"当前余额:{check_balance()} USD")

五、官方资源与工具

1.** 沙盒测试账号 **:

2.** 调试工具 **:

3.** 合规注意事项**:

  • 任务内容需符合MTurk 可接受使用政策
  • 涉及个人数据的任务需遵守GDPR等隐私法规

通过以上进阶功能,你可以实现更灵活的任务设计、更严格的质量控制和更高效的批量管理。建议先在沙盒环境充分测试,再逐步迁移到生产环境。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐