PHP,DDD,CQRS,Event Sourcing,Kubernetes,Docker,Golang

0%

[译]架构风格 - 《Domain-Driven Design in PHP》第2章

本篇博文由本博客(http://www.veitor.net)经原文翻译,转载请注明出处。

为了能够构建一个复杂的应用程序,其中一个关键点就是需要有一个适合该程序的架构设计。领域驱动设计的一个优势就是它不受任何架构风格的约束。相反,我们可以自由的去为核心领域内的每一个限界上下文选择最合适的架构,为每一个领域问题提供多种架构选择。

例如,订单处理系统可以使用事件溯源(Event Sourcing)去追踪所有不同订单的操作;一个产品目录系统能够使用CQRS去将产品详情展示给不同客户端;一个内容管理系统能使用六边形架构(Hexagonal Architecture)来展示如博客、静态页等需求。

本章将介绍PHP相关的每种架构风格,从传统PHP代码演变为复杂的架构。请注意虽然还有很多其他现有的架构如Data Fabric和SOA,我们发现他们其中一些从PHP角度来介绍太过于复杂。

曾经的美好时光

在PHP4发布之前,该语言还不支持面向对象。那时,写程序的通用做法是使用过程式编程和全局状态。在PHP社区中像关注点分离(Separation of Concerns,SoC)和MVC这些概念还是比较陌生的。

下面的例子是以这种传统方式编写的应用程序,由许多与HTML代码混合的前端控制器组成。在这时,基础设施、UI和领域层代码混在一起:

<?php
include __DIR__ . '/bootstrap.php';
$link = mysql_connect('localhost', 'a_username', '4_p4ssw0rd');
if (!$link) {
    die('Could not connect: ' . mysql_error());
}
mysql_set_charset('utf8', $link);
mysql_select_db('my_database', $link);
$errormsg = null;
if (isset($_POST['submit'] && isValid($_POST['post'])) {
    $post = getFrom($_POST['post']);
    mysql_query('START TRANSACTION', $link);
    $sql = sprintf("INSERT INTO posts (title, content) VALUES ('%s','%s')", mysql_real_escape_string($post['title']), mysql_real_escape_string($post['content']));
    $result = mysql_query($sql, $link);
    if ($result) {
        mysql_query('COMMIT', $link);
    } else {
        mysql_query('ROLLBACK', $link);
        $errormsg = 'Post could not be created! :(';
    }
}
$result = mysql_query('SELECT id, title, content FROM posts', $link);
?>
<html>
<head></head>
<body>
<?php if (null !== $errormsg): ?>
<div class="alert error"><?php echo $errormsg; ?></div>
<?php
else: ?>
<div class="alert success">
Bravo! Post was created successfully!
</div>
<?php
endif; ?>
<table>
<thead><tr><th>ID</th><th>TITLE</th>
<th>ACTIONS</th></tr></thead>
<tbody>
<?php while ($post = mysql_fetch_assoc($result)): ?>
<tr>
<td><?php echo $post['id']; ?></td>
<td><?php echo $post['title']; ?></td>
<td><?php editPostUrl($post['id']); ?></td>
</tr>
<?php
endwhile; ?>
</tbody>
</table>
</body>
</html>
<?php mysql_close($link); ?>

这种编码方式通常被称为第1章中提到的“大泥球”。然而,这种风格改进似乎是将网页头部和底部封装到各自的文件中,然后在该页面中再引入进来。这就避免了重复劳动并且能有效复用。

<?php
include __DIR__ . '/bootstrap.php';
$link = mysql_connect('localhost', 'a_username', '4_p4ssw0rd');
if (!$link) {
    die('Could not connect: ' . mysql_error());
}
mysql_set_charset('utf8', $link);
mysql_select_db('my_database', $link);
$errormsg = null;
if (isset($_POST['submit'] && isValid($_POST['post'])) {
    $post = getFrom($_POST['post']);
    mysql_query('START TRANSACTION', $link);
    $sql = sprintf("INSERT INTO posts(title, content) VALUES('%s','%s')", mysql_real_escape_string($post['title']), mysql_real_escape_string($post['content']));
    $result = mysql_query($sql, $link);
    if ($result) {
        mysql_query('COMMIT', $link);
    } else {
        mysql_query('ROLLBACK', $link);
        $errormsg = 'Post could not be created! :(';
    }
    $result = mysql_query('SELECT id, title, content FROM posts', $link);
?>
<?php include __DIR__ . '/header.php'; ?>
<?php if (null !== $errormsg): ?>
<div class="alert error"><?php echo $errormsg; ?></div>
<?php
    else: ?>
<div class="alert success">
Bravo! Post was created successfully!
</div>
<?php
    endif; ?>
<table>
<thead>
<tr>
<th>ID</th>
<th>TITLE</th>
<th>ACTIONS</th>
</tr>
</thead>
<tbody>
<?php while ($post = mysql_fetch_assoc($result)): ?>
<tr>
<td><?php echo $post['id']; ?></td>
<td><?php echo $post['title']; ?></td>
<td><?php editPostUrl($post['id']); ?></td>
</tr>
<?php
    endwhile; ?>
</tbody>
</table>
<?php include __DIR__ . '/footer.php'; ?>

如今,尽管这样非常的不推荐,但仍然有这种过程式编程的程序存在。这种架构风格的主要缺点就是没有做到关注点分离,用这种方式开发的程序在维护成本上比我们现在所知道并经过验证的架构高多了。

分层架构

从代码可维护性和复用性角度来看,让代码更容易维护的最好方法就是分解概念,也就是为每个不同的关注点创建一个层。在我们前面的例子中,很容易形成不同的分层:一个用于封装数据访问和操作,另一个处理基础设施问题,还有最后一个负责前两者的协调调用。分层架构的一个基本规则就是每一层必须与其下一层紧密耦合,如下图所示:

QQ截图20180419221418.jpg

分层架构真正寻求的是分离应用程序的不同组件。比如前一个例子,一篇博文的展现必须与这篇博文的概念实体完全分离。一个博文实体可以与多个展现相关联,而不是与特定的展现紧耦合。这通常被称为关注点分离(Separation of Concerns)。

另一个寻求相同目的的架构模式是MVC模式。它最初被认为广泛用于构建桌面GUI应用程序的,现在主要用于web应用程序,这要归功于像Symfony、Yii、CodeIgniter等这样的流行web框架。

Model-View-Controller

MVC架构模式将应用分成了3层,具体描述如下:

  • 模型(Model):捕获和集中所有领域模型的行为。这一层独立于数据展示,来管理所有的数据、逻辑和业务规则。模型层被称为是MVC应用的灵魂与核心。
  • 控制器(Controller):协调其他层的交互,触发模型上的动作来更改模型的状态,并且刷新与该模型关联的展现。除此之外,控制器可以将消息发送到是视图层来更改指定模型的展示。
  • 视图(View):将模型层的不同展示表现出来,并且提供一个方式去改变模型状态。
    QQ截图20180419224352.jpg

分层架构示例

模型

继续前面的例子,我们提到不同关注点应该被分离。为了做到这一点,所有的层应该在我们杂乱的代码中进行确定。在这过程中,我们需要特别注意符合我们模型层的代码,其将成为我们应用的核心。

<?php
class Post {
    private $title;
    private $content;
    public static function writeNewFrom($title, $content) {
        return new static($title, $content);
    }
    private function __construct($title, $content) {
        $this->setTitle($title);
        $this->setContent($content);
    }
    private function setTitle($title) {
        if (empty($title)) {
            throw new RuntimeException('Title cannot be empty');
        }
        $this->title = $title;
    }
    private function setContent($content) {
        if (empty($content)) {
            throw new RuntimeException('Content cannot be empty');
        }
        $this->content = $content;
    }
}
class PostRepository {
    private $db;
    public function __construct() {
        $this->db = new PDO(
            'mysql:host=localhost;dbname=my_database',
            'a_username',
            '4_p4ssw0rd',
            [
                PDO::MYSQL_ATTR_INIT_COMMAND =>
                                'SET NAMES utf8mb4'
            ]
        );
    }
    public function add(Post $post) {
        $this->db->beginTransaction();
        try {
            $stm = $this->db->prepare(
                  'INSERT INTO posts (title, content) VALUES (?, ?)'
            );
            $stm->execute([
                $post->title(),
                $post->content()
            ]);
            $this->db->commit();
        } catch (Exception $e) {
            $this->db->rollback();
            throw new UnableToCreatePostException($e);
        }
    }
}

模型层现在已经由Post类和PostRepository类来定义。Post类代表一篇博文,PostRepository类代表整个博文的集合。另外,模型内部还需要另一个层(协调和编排领域模型行为的层)。进入应用层(Application Layer):

<?php
class PostService {
    public function createPost($title, $content) {
        $post = Post::writeNewFrom($title, $content);
        (new PostRepository())->add($post);
        return $post;
    }
}

PostService类是所谓的应用服务(Application Service),它的目的是协调和编排领域行为。换句话说,Application Service是让事情发生的东西,它是领域模型的直接客户端。没有其他对象类型能够直接与模型层的内部层进行交流。

视图

视图层可以同时从模型层或者控制器层发送和接收消息。它的主要目的是在UI层面将模型展示给用户,而且每次模型更新它都将刷新UI展示。一般来说,视图层接收一个对象(通常是数据转换对象,Data Transfer Object,DTO)而不是模型层的实例对象,从而将收集到所需的信息成功展示。对于PHP,这有一些模板引擎能够很好的帮助将模型展示与模型和控制器分离,目前较流行的是Twig。让我们看一下使用Twig的视图层是怎么样的。

使用DTO而不是模型实例?
这是一个旧的活跃话题。为什么是创建一个DTO而不是直接将模型实例对象给视图层?主要原因和简要的回答还是:关注点分离。让视图检查和使用模型实例将导致视图层和模型层的紧耦合。事实上,模型层的一个改动可能将影响所有使用该模型实例的视图。

{% extends "base.html.twig" %} {% block content %} {% if errormsg is defined %}
    
{ { errormsg }}
{% else %}
Bravo! Post was created successfully!
{% endif %} {% for post in posts %} {% endfor %}
ID TITLE ACTIONS
{ { post.id }} { { post.title }} Edit Post
{% endblock %}

大多数时间,当模型触发更改状态时,它也会通知相关的视图刷新UI。在典型的web场景中,由于客户端和服务端的特性,模型和其展示在同步变化时有些困难。在这类环境中,一些javascript定义的交互需要来维持这种同步变化,所以像下面的几个javascript的MVC框架近几年开始流行起来:

  • AngularJS
  • Ember.ks
  • Marionette.js
  • React

控制器

控制器负责组织和协调视图和模型,它从视图层接收信息并触发模型的行为来做一些需要的操作。此外,它还发送信息到视图层以展示模型。这两个操作都由应用层来执行,应用层负责编排、组织和封装领域行为。

就PHP中的web程序而言,控制器通常会被理解成一组class类来实现其目的。它们接收HTTP请求并返回一个HTTP响应。

<?php
class PostsController {
    public function updateAction(Request $request) {
        if ($request->request->has('submit') && Validator::validate($request->request->post)) {
            $postService = new PostService();
            try {
                $postService->createPost($request->request->get('title'), $request->request->get('content'));
                $this->addFlash('notice', 'Post has been created successfully!');
            }
            catch(Exception $e) {
                $this->addFlash('error', 'Unable to create the post!');
            }
        }
        return $this->render('posts/update-result.html.twig');
    }
}

反转依赖:六边形架构

遵循分层架构的基本规则,在实现包含基础设施的领域接口(interface)时会存在风险。

例如,在MVC中,前一个例子中的PostRepository类应该放在领域模型中。但是,直接将基础设施细节部分放在领域中会违反关注点分离原则,这是有问题的;很难去避免违反分层架构的基本规则,那将导致领域过多的了解到技术实现的部分,使得代码难以测试。

依赖倒置原则(The Dependency Inversion Principle,DIP)

我们如何去处理刚才的问题?由于领域模型层依赖于具体的基础设施实现,所以可以通过将基础设施层放到其他三个层之上,这样就用上了依赖倒置原则。

依赖倒置原则
高级模块不应该依赖于低级模块,两者都应该依赖于抽象。
抽象不应该依赖于细节,细节也不应该依赖于抽象。

通过使用依赖倒置原则,架构体系将发生改变,基础设施层(可称之为低级模块)现在依赖于视图UI层,应用层和领域层(这些是高级模块)。依赖性已经被倒置过来。

但什么是六边形架构?六边形架构(也称之为端口和适配器)由Alistair Cockburn在他的《六边形架构》一书中定义。六边形架构将应用程序描述为六边形,其中每一个边都代表带有一个或多个适配器的端口(译者注:原文单词为Port),一个端口是一个带有可插入适配器的连接器,该连接器将外部输入信息转换成应用程序可以理解的内部信息。就DIP而言,一个端口将是一个高级模块,一个适配器将是一个低级模块。此外,如果应用程序需要将消息发送到外部,将使用带有适配器的端口来发送,并将信息转换为外部可理解的内容。因此,六边形架构在应用中提出了对称性的概念,这也是架构体系发生改变的主要原因。该架构通常被表示为六边形,因为谈论顶层和底层将不再具有意义,而是主要谈论外部和内部。

应用六边形架构

继续使用博客的例子,我们需要的第一个概念是外部与应用程序交流的端口。在这种情况下,我们将使用HTTP端口和对应的适配器,外部将使用该端口给应用程序发送信息。博客示例使用数据库来存储所有博客文章的集合,为了允许应用程序能从数据库检索文章,需要这么一个端口:

interface PostRepository
{
  public function byId(PostId $id);
  public function add(Post $post);
}

这个接口(interface)告诉了我们一个端口,应用程序将通过该端口获取博文信息,并且将该端口放置在了领域层,现在需要一个该端口的适配器。适配器负责使用特定技术来检索博文。

<?php
class PDOPostRepository implements PostRepository {
    private $db;
    public function __construct(PDO $db) {
        $this->db = $db;
    }
    public function byId(PostId $id) {
        $stm = $this->db->prepare('SELECT * FROM posts WHERE id = ?');
        $stm->execute([$id->id() ]);
        return recreateFrom($stm->fetch());
    }
    public function add(Post $post) {
        $stm = $this->db->prepare('INSERT INTO posts (title, content) VALUES (?, ?)');
        $stm->execute([$post->title(), $post->content(), ]);
    }
}

一旦我们有了端口和对应的适配器,最后一步就是重构PostService来使用它们。通过依赖注入可以轻松的获得到:

<?php
class PostService {
    private $postRepository;
    public function __construct(PostRepositor $postRepository) {
        $this->postRepository = $postRepository;
    }
    public function createPost($title, $content) {
        $post = Post::writeNewFrom($title, $content);
        $this->postRepository->add($post);
        return $post;
    }
}

这只是六边形架构的一个简单例子,这是一个灵活的架构,可以促进关注点的分离。由于通过端口,内部应用程序可以与外部通信,因此六边形架构还促进了对称性。从现在开始,我们将使用这个架构作为基础架构,来继续解释CQRS和事件溯源。

关于更多该架构的示例,你可以查看附录(译者注:暂无链接,后期会添加上)。有关更详细的示例,你应该跳到第11章应用程序(Application Service),这一章解释了事务性和其他横切关注点等高级话题。

命令查询责任分离(Command Query Responsibility Segregation,CQRS)

六边形架构是一个很好的基础架构,但是也有一些局限性。例如,复杂的用户UI界面可能需要以多种形式来展示聚合信息(第8章,聚合),或者需要从多个聚合获取数据。在这种情况下,我们可能在仓储类中有很多finder方法(可能与UI界面一样多)。或者我们可能决定将这种复杂性的东西放到应用服务中,使用复杂结构来从多个聚合聚集所需要的数据。以下是一个示例:

<?php
interface PostRepository
{
    public function save(Post $post);
    public function byId(PostId $id);
    public function all();
    public function byCategory(CategoryId $categoryId);
    public function byTag(TagId $tagId);
    public function withComments(PostId $id);
    public function groupedByMonth();
// ...
}

当这些技术被滥用时,UI视图的构建会变的相当痛苦。我们是让应用服务直接返回领域模型实例还是返回一类DTO?对此该做一些权衡。如果是后者,我们避免领域模型与基础设施代码(web控制器、命令行控制器等)的紧密耦合。

幸运的是,我们还有另一种方法。如果有多个并且不同的视图,我们将其从领域模型中去除,并看做是基础设施问题。命令查询分离(CQS)是一个基于设计原则的可选项,由Bertrand Meyer定义的,并且这催生了一个新的架构模式,称之为命令查询责任分离(CQRS),由Greg Young定义。

命令查询分离(Command Query Separation, CQS)
这个设计原则规定,每个方法应该是执行操作的命令,或者是一个返回数据给调用者的查询。不能同时是这两种。

CQRS寻求一个更严格的关注点分离,将模型分为了两部分:

  • _写模型_:也被称为_命令模型_,它负责写入并对真正的领域行为负责。
  • _读模型_:它负责在应用中的读取内容,并将这些内容认为是领域模型外的。

每当有人触发写模型的命令时,就会执行写入数据的操作。此外,这还将触发读模型的更新,为了在读模型上展示最新的更改。

严格的分离将导致另一个问题:最终一致性。读模型的一致性受写模型命令的影响,换句话说,读模型是最终一致性。也就是当每一次的写模型执行一个命令,它将根据最新的更改拉起一个过程来负责更新读模型,这就将有一段时间内,用户UI界面是展示的旧的信息,这经常发生,因为我们受当前技术的限制。

思考一下web应用的缓存系统,每当数据库更新了新的信息,缓存里的数据可能还是旧的,所以每当数据更新,将有一个更新缓存的过程,缓存系统是最终一致性的。

这一类过程用CQRS术语来说就是写模型投影,或者叫投影。我们将写模型投影到读模型上,这个过程可以是同步的也可以是异步的,取决于你的需求。这将归功于另一个有用的战术设计模式——领域事件(Domain Events),本书稍后将做详细解释。写模型投影的基础是收集所有已经发布的领域事件,并使用来自这些事件中的信息去更新读模型。

写模型

这是领域行为真正的持有者。继续我们的例子,Repository接口将被简化为:

<?php
interface PostRepository
{
    public function save(Post $post);
    public function byId(PostId $id);
}

byId方法负责通过id加载聚合以便能操作该聚合,除了这个方法之外,现在PostRepository类已经从所有的读取问题中解放出来了。一旦这个完成了,所有的查询方法也从Post模型中剥离了出来,只留下了命令方法,这意味着我们有效的摆脱了所有关于获取Post获取数据的方法。相反的,通过订阅领域事件,当事件发生时将会触发写模型投影。

<?php
class AggregateRoot
{
    private $recordedEvents = [];
    protected function recordApplyAndPublishThat(DomainEvent $domainEvent)
    {
        $this->recordThat($domainEvent);
        $this->applyThat($domainEvent);
        $this->publishThat($domainEvent);
    }
    protected function recordThat(DomainEvent $domainEvent)
    {
        $this->recordedEvents[] = $domainEvent;
    }
    protected function applyThat(DomainEvent $domainEvent)
    {
        $modifier = 'apply' . get_class($domainEvent);
        $this->{$modifier}($domainEvent);
    }
    protected function publishThat(DomainEvent $domainEvent)
    {
        DomainEventPublisher::getInstance()->publish($domainEvent);
    }
    public function recordedEvents()
    {
        return $this->recordedEvents;
    }
    public function clearEvents()
    {
        $this->recordedEvents = [];
    }
}
class Post extends AggregateRoot
{
    private $id;
    private $title;
    private $content;
    private $published = false;
    private $categories;
    private function __construct(PostId $id)
    {
        $this->id = $id;
        $this->categories = new Collection();
    }
    public static function writeNewFrom($title, $content)
    {
        $postId = PostId::create();
        $post = new static($postId);
        $post->recordApplyAndPublishThat(new PostWasCreated($postId, $title, $content));
    }
    public function publish()
    {
        $this->recordApplyAndPublishThat(new PostWasPublished($this->id));
    }
    public function categorizeIn(CategoryId $categoryId)
    {
        $this->recordApplyAndPublishThat(new PostWasCategorized($this->id, $categoryId));
    }
    public function changeContentFor($newContent)
    {
        $this->recordApplyAndPublishThat(new PostContentWasChanged($this->id, $newContent));
    }
    public function changeTitleFor($newTitle)
    {
        $this->recordApplyAndPublishThat(new PostTitleWasChanged($this->id, $newTitle));
    }
}

所有触发状态改变的动作都是通过领域事件来实现的,对于每一个发布的领域事件,都有一个方法来反映出状态的改变。

<?php
class Post extends AggregateRoot
{
    // ...
    protected function applyPostWasCreated(PostWasCreated $event)
    {
        $this->id = $event->id();
        $this->title = $event->title();
        $this->content = $event->content();
    }
    protected function applyPostWasPublished(PostWasPublished $event)
    {
        $this->published = true;
    }
    protected function applyPostWasCategorized(PostWasCategorized $event)
    {
        $this->categories->add($event->categoryId());
    }
    protected function applyPostContentWasChanged(PostContentWasChanged $event)
    {
        $this->content = $event->content();
    }
    protected function applyPostTitleWasChanged(PostTitleWasChanged $event)
    {
        $this->title = $event->title();
    }
}

读模型

读模型也被称为查询模型,是脱离领域的反规范化(denormalized)的数据模型。事实上,对于CQRS,所有的读取问题都被视为报告过程,一个基础设施问题。一般来说,当使用CQRS,读取模型需要满足UI界面的需求和视图混合的复杂度。在根据关系数据库定义读模型的情况下,最简单的方法是在数据库表和UI视图之间设置一对一的关系,其他写入的地方会发布领域事件,触发使用写入模型投影来更新数据表和UI视图。

-- 带有评论的单篇博文的视图的定义
-- Definition of a UI view of a single post with its comments
CREATE TABLE single_post_with_comments (
    id INTEGER NOT NULL,
    post_id INTEGER NOT NULL,
    post_title VARCHAR(100) NOT NULL,
    post_content TEXT NOT NULL,
    post_created_at DATETIME NOT NULL,
    comment_content TEXT NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- Set up some data
INSERT INTO single_post_with_comments VALUES
    (1, 1, "Layered" , "Some content", NOW(), "A comment"),
    (2, 1, "Layered" , "Some content", NOW(), "The comment"),
    (3, 2, "Hexagonal" , "Some content", NOW(), "No comment"),
    (4, 2, "Hexagonal", "Some content", NOW(), "All comments"),
    (5, 3, "CQRS", "Some content", NOW(), "This comment"),
    (6, 3, "CQRS", "Some content", NOW(), "That comment");
-- Query it
SELECT * FROM single_post_with_comments WHERE post_id = 1;

这种结构风格的一个重要特征是,由于应用程序的状态被写模型处理了,所以使用读模型完全是一次性的。这意味着读模型能被需要时使用写模型投影来移除和重建。

在这里,我们可以博客程序中看到一些可能的视图示例:

SELECT * FROM
    posts_grouped_by_month_and_year
ORDER BY month DESC,year ASC;
SELECT * FROM
    posts_by_tags
WHERE tag = "ddd";
SELECT * FROM
    posts_by_author
WHERE author_id = 1;

需要重点指出的是,CQRS不会强制将读模型的定义与其实现限制为关系数据库。它完全取决于你程序的需求,可以是使用关系数据库、面向文档的数据库、key-value键值对数据库以及任何适合你程序的数据库。继续博客程序,我们将使用Elasticsearch(一个面向文档的数据库)来实现读模型。

<?php
class PostsController {
    public function listAction() {
        $client = new ElasticsearchClientBuilder::create()->build();
        $response = $client->search(['index' => 'blog-engine', 'type' => 'posts', 'body' => ['sort' => ['created_at' => ['order' => 'desc']]]]);
        return ['posts' => $response];
    }
}

读模型的代码已经针对Elasticsearch单个索引彻底的简化了。

这表明了读模型不需要一个对象关系映射器(ORM),但是,写模型可能会从对象关系映射器的使用中获得好处,因为这将允许你根据程序的需求来组织和构建读模型。

同步写模型和读模型

棘手的部分来了,我们怎么将读模型和写模型同步?我们已经说过,我们将使用获取的领域事件来完成同步。对于每一个获取的领域事件,一个特定的投影将被执行。所以领域事件和投影之间存在一对一的关系。

让我们看一个配置投影的例子,以便更好的理解。首先,我们需要为投影定义一个骨架:

<?php
interface Projection
{
    public function listensTo();
    public function project($event);
}

因此为PostWasCreated事件定义一个Elasticsearch投影将像这样简单:

<?php
namespace InfrastructureProjectionElasticsearch;
use ElasticsearchClient;
use PostWasCreated;
class PostWasCreatedProjection implements Projection {
    private $client;
    public function __construct(Client $client) {
        $this->client = $client;
    }
    public function listensTo() {
        return PostWasCreated::class;
    }
    public function project($event) {
        $this->client->index(['index' => 'posts', 'type' => 'post', 'id' => $event->getPostId(), 'body' => ['content' => $event->getPostContent(),
        // ...
        ]]);
    }
}

投影器的实现是一种专门的领域事件监听器,它和默认的领域事件监听器的区别在于,投影器对一组领域事件作出反应,而不是只对一个领域事件作出反应。

<?php
namespace InfrastructureProjection;
class Projector {
    private $projections = [];
    public function register(array $projections) {
        foreach ($projections as $projection) {
            $this->projections[$projection->eventType() ] = $projection;
        }
    }
    public function project(array $events) {
        foreach ($events as $event) {
            if (isset($this->projections[get_class($event) ])) {
                $this->projections[get_class($event) ]->project($event);
            }
        }
    }
}

以下代码展示了投影器和事件之间的流程:

<?php
$client = new ElasticsearchClientBuilder::create()->build();
$projector = new Projector();
$projector->register([new InfrastructureProjectionElasticsearchPostWasCreatedProjection($client), new InfrastructureProjectionElasticsearchPostWasPublishedProjection($client), new InfrastructureProjectionElasticsearchPostWasCategorizedProjection($client), new InfrastructureProjectionElasticsearchPostContentWasChangedProjection($client), new InfrastructureProjectionElasticsearchPostTitleWasChangedProjection($client), ]);
$events = [new PostWasCreated( /* ... */
), new PostWasPublished( /* ... */
), new PostWasCategorized( /* ... */
), new PostContentWasChanged( /* ... */
), new PostTitleWasChanged( /* ... */
), ];
$projector->project($event);

这个代码是同步的,但是如果需要也可以成为异步。通过在视图层展示一些提示信息来让用户了解这些没有同步更新的数据。

对于下一个例子,我们将使结合ReactPHP来使用amqplib PHP扩展:

<?php
// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();
// Create a channel
$ch = new AMQPChannel($cnn);
// Declare a new exchange
$ex = new AMQPExchange($ch);
$ex->setName('events');
$ex->declare();
// Create an event loop
$loop = ReactEventLoopFactory::create();
// Create a producer that will send any waiting messages every half a
second $producer = new GosComponentReactAMQPProducer($ex, $loop, 0.5);
$serializer = JMSSerializerSerializerBuilder::create()->build();
$projector = new AsyncProjector($producer, $serializer);
$events = [new PostWasCreated( /* ... */
), new PostWasPublished( /* ... */
), new PostWasCategorized( /* ... */
), new PostContentWasChanged( /* ... */
), new PostTitleWasChanged( /* ... */
), ];
$projector->project($event);

为使该代码能运行,我们需要一个异步投影器。这有一个简单的实现:

<?php
// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();
// Create a channel
$ch = new AMQPChannel($cnn);
// Declare a new exchange
$ex = new AMQPExchange($ch);
$ex->setName('events');
$ex->declare();
// Create an event loop
$loop = ReactEventLoopFactory::create();
// Create a producer that will send any waiting messages every half a
second $producer = new GosComponentReactAMQPProducer($ex, $loop, 0.5);
$serializer = JMSSerializerSerializerBuilder::create()->build();
$projector = new AsyncProjector($producer, $serializer);
$events = [new PostWasCreated( /* ... */
), new PostWasPublished( /* ... */
), new PostWasCategorized( /* ... */
), new PostContentWasChanged( /* ... */
), new PostTitleWasChanged( /* ... */
), ];
$projector->project($event);

而RabbitMQ事件消费者的代码像这样:

<?php
// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();
// Create a channel
$ch = new AMQPChannel($cnn);
// Create a new queue
$queue = new AMQPQueue($ch);
$queue->setName('events');
$queue->declare();
// Create an event loop
$loop = ReactEventLoopFactory::create();
$serializer = JMSSerializerSerializerBuilder::create()->build();
$client = new ElasticsearchClientBuilder::create()->build();
$projector = new Projector();
$projector->register([new InfrastructureProjectionElasticsearchPostWasCreatedProjection($client), new InfrastructureProjectionElasticsearchPostWasPublishedProjection($client), new InfrastructureProjectionElasticsearchPostWasCategorizedProjection($client), new InfrastructureProjectionElasticsearchPostContentWasChangedProjection($client), new InfrastructureProjectionElasticsearchPostTitleWasChangedProjection($client), ]);
// Create a consumer
$consumer = new GosComponentReactAMQPConsumer($queue, $loop, 0.5, 10);
// Check for messages every half a second and consume up to 10 at a time.
$consumer->on('consume', function ($envelope, $queue) use($projector, $serializer) {
    $event = $serializer->unserialize($envelope->getBody(), 'json');
    $projector->project($event);
});
$loop->run();

从现在开始,所有需要的Repository消费一个投影器实例,并使得Repository调用投影过程:

<?php
class DoctrinePostRepository implements PostRepository {
    private $em;
    private $projector;
    public function __construct(EntityManager $em, Projector $projector) {
        $this->em = $em;
        $this->projector = $projector;
    }
    public function save(Post $post) {
        $this->em->transactional(function (EntityManager $em) use($post) {
            $em->persist($post);
            foreach ($post->recordedEvents() as $event) {
                $em->persist($event);
            }
        });
        $this->projector->project($post->recordedEvents());
    }
    public function byId(PostId $id) {
        return $this->em->find($id);
    }
}

Post实例和记录的事件在同一个事务中被触发和持久化,这确保了不会有事件丢失,如果事务执行成功,我们将这些事件投影到读模型。因此,写模型和读模型之间不会不一致。

使用ORM还是不使用ORM?
在实现CQRS时最常见的一个问题是:是否真的需要使用ORM?我们坚信在写模型中使用ORM是非常好的,并且具有使用工具的所有有点,这有助于我们使用关系数据库的情况下节省大量的工作。但是我们不应该忘记,我们仍然需要在关系数据库中持久化和检索写模型的状态。

事件溯源

CQRS是一个强大而灵活的架构,在收集和保存领域事件(在聚合操作中发生的)方面有一个额外的好处,可以让你详细的了解领域中发生的事情。因为领域事件描述了过去所发生的事情,这在领域中是具有重要意义的,所以领域事件是战术模式的关键点之一,

小心记录太多的事件
越来越多的事件意味着在领域中大量使用事件,这很可能是由业务来激发的。但作为一个经验法则,记住保持简单为好。

通过使用CQRS,我们能够记录领域层中发生的所有相关事件。领域模型的状态可以通过之前记录的事件来重现。我们只要一个工具以一致的方式来存储这些事件,所以我们需要一个事件仓库。

事件溯源背后的基本思想是,将聚合的状态表示为事件的顺序变化。

使用CQRS,我们部分实现了以下功能:Post实体通过领域事件来更改其状态,由于之前已经解释过了,通过将对象映射到关系数据库的方式,Post实体已被持久化。

离事件溯源更近一步,如果我们使用一张数据表来存储所有博客文章的状态,另一张表存储所有文章的评论等。使用事件溯源让我们使用单个表:存储所有由聚合发布的领域事件,单一的做追加。是的,你没看错,一张数据表。

有了这样的模型,像ORM这样的工具不再需要了。唯一需要的工具是一个简单的数据库抽象层,通过它可以追加事件进去:

<?php
interface EventSourcedAggregateRoot {
    public static function reconstitute(EventStream $events);
}
class Post extends AggregateRoot implements EventSourcedAggregateRoot {
    public static function reconstitute(EventStream $history) {
        $post = new static ($history->getAggregateId());
        foreach ($events as $event) {
            $post->applyThat($event);
        }
        return $post;
    }
}

现在Post聚合有一个方法,当给定一组事件时(换句话说,一个事件流),他能够在保存之前,一步一步的重现状态直到成为当前状态。下一步将构建PostRepository端口的适配器,该端口将从Post聚合提取所有发布的事件,并将它们追加到存储事件的表中,这就是我们所说的事件仓库:

<?php
class EventStorePostRepository implements PostRepository {
    private $eventStore;
    private $projector;
    public function __construct($eventStore, $projector) {
        $this->eventStore = $eventStore;
        $this->projector = $projector;
    }
    public function save(Post $post) {
        $events = $post->recordedEvents();
        $this->eventStore->append(new EventStream($post->id(), $events));
        $post->clearEvents();
        $this->projector->project($events);
    }
}

这是当我们使用事件仓库来保存Post聚合发布的所有事件时的实现,现在我们需要一个方式去从事件历史中恢复一个聚合。由Post聚合实现的一个方法,来从事件中重建一个博客文章状态:

<?php
class EventStorePostRepository implements PostRepository {
    public function byId(PostId $id) {
        return Post::reconstitute($this->eventStore->getEventsFor($id));
    }
}

事件仓库是负责保存和恢复事件流所有责任的主力。其公开API有两个简单的方法组成:它们是appendgetEventsFrom,前者将事件流追加到事件仓库中,后者加载事件流来重建聚合。

我们可以使用键值对形式实现去存储所有事件:

<?php
class EventStore {
    private $redis;
    private $serializer;
    public function __construct($redis, $serializer) {
        $this->redis = $redis;
        $this->serializer = $serializer;
    }
    public function append(EventStream $eventstream) {
        foreach ($eventstream as $event) {
            $data = $this->serializer->serialize($event, 'json');
            $date = (new DateTimeImmutable())->format('YmdHis');
            $this->redis->rpush('events:' . $event->getAggregateId(), $this->serializer->serialize(['type' => get_class($event), 'created_on' => $date, 'data' => $data], 'json'));
        }
    }
    public function getEventsFor($id) {
        $serializedEvents = $this->redis->lrange('events:' . $id, 0, -1);
        $eventStream = [];
        foreach ($serializedEvents as $serializedEvent) {
            $eventData = $this->serializerdeserialize($serializedEvent, 'array', 'json');
            $eventStream[] = $this->serializer->deserialize($eventData['data'], $eventData['type'], 'json');
        }
        return new EventStream($id, $eventStream);
    }
}

这个事件仓库的实现是基于Redis的,使用前缀将事件追加到列表中:另外,在持久化事件之前,我们提取一些元数据,像事件类名或创建日期等,因为稍后会派上用场。

显然,就性能而言,聚合总是要经历一遍事件历史才能达到最终的状态,这代价是非常昂贵的。事件流如果包含数百甚至上千事件更是如此。处理这种情况的最佳方法是从聚合中获取快照,并且仅重现快照之后发生的事件流中的事件。快照只是在任何特定时刻聚合状态的简单序列化形式,它可以基于聚合事件流的事件数量,也可以基于时间。采用第一种方法,每触发n个事件(如每隔50、100或200个事件)就会记录一张快照,第二种方法是每隔n秒记录一次快照。

我们将使用第一种记录快照的方式,在事件的元数据中,我们存储了一个额外的字段——version,该字段表示我们开始重现聚合历史的开始位置。

<?php
class SnapshotRepository {
    public function byId($id) {
        $key = 'snapshots:' . $id;
        $metadata = $this->serializer->unserialize($this->redis->get($key));
        if (null === $metadata) {
            return;
        }
        return new Snapshot($metadata['version'], $this->serializer->unserialize($metadata['snapshot']['data'], $metadata['snapshot']['type'], 'json'));
    }
    public function save($id, Snapshot $snapshot) {
        $key = 'snapshots:' . $id;
        $aggregate = $snapshot->aggregate();
        $snapshot = ['version' => $snapshot->version(), 'snapshot' => ['type' => get_class($aggregate), 'data' => $this->serializer->serialize($aggregate, 'json') ]];
        $this->redis->set($key, $snapshot);
    }
}

现在我们需要重构EventStore类,以便开始使用SnapshotRepository类在可接受的性能时间内去加载聚合。

<?php
class EventStorePostRepository implements PostRepository {
    public function byId(PostId $id) {
        $snapshot = $this->snapshotRepository->byId($id);
        if (null === $snapshot) {
            return Post::reconstitute($this->eventStore->getEventsFrom($id));
        }
        $post = $snapshot->aggregate();
        $post->replay($this->eventStore->fromVersion($id, $snapshot->version()));
        return $post;
    }
}

我们只需要定期进行记录聚合的快照,我们可以通过负责异步或同步监听事件仓库的进程来执行快照的记录。以下是一个演示聚合快照实施的简单示例:

<?php
class EventStorePostRepository implements PostRepository {
    public function save(Post $post) {
        $id = $post->id();
        $events = $post->recordedEvents();
        $post->clearEvents();
        $this->eventStore->append(new EventStream($id, $events));
        $countOfEvents = $this->eventStore->countEventsFor($id);
        $version = $countOfEvents / 100;
        if (!$this->snapshotRepository->has($post->id(), $version)) {
            $this->snapshotRepository->save($id, new Snapshot($post, $version));
        }
        $this->projector->project($events);
    }
}

使用ORM还是不使用ORM?
从这种架构风格的用例我们可以清楚的看到,使用ORM来持久和获取事件会过度设计。即使我们使用关系数据库来存储事件,我们也只需从数据库中持久化和获取事件。

总结

由于可选的架构风格有很多,因此你可能会对于本章节的内容感到困惑。你需要在众多选择中去权衡优缺点,来做出正确的选择。有一点很清楚:大泥球的方式不应该选择,因为代码会很快烂掉。分层架构是一个更好的选择,但是它也存在一些缺点,如各层之间的紧耦合。可以说,最均衡的选择是六边形架构,因为它可以用作基础架构,并且可以促进程序内部和外部高度解耦和对称性。这是我们为大多数场景下推荐的选择。

我们也看到CQRS和事件溯源是相对灵活的架构,可以帮助你应对复杂性。CQRS和事件溯源都有自己的定位,但也不要让这个因素来分散你的注意你。因为它们都有一些开销,你应该有一个来使用它们的技术理由。这些架构确实非常有用,可以在CQRS的Repository仓储的finder方法的数量上和事件溯源触发的事件数量上得到启发。如果finder方法开始增加,并且仓储变得难以维护,那么是时候考虑CQRS,以便拆分读写问题。如果每个聚合的事件数量趋于增长,并且业务对更细粒度的信息关注,那么可以考虑的一个选择是事件溯源。

从Brian Foote和Joseph Yoder论文中提取的:大泥球(BIG BALL OF MUD)是一个杂乱的结构、蔓延无序的、胶带似的、下水管道似的、意大利面条似的代码丛林。(译者注:只可意会)