vendor/shopware/core/Framework/Api/Sync/SyncService.php line 176

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\Api\Sync;
  3. use Doctrine\DBAL\Connection;
  4. use Doctrine\DBAL\ConnectionException;
  5. use Shopware\Core\Framework\Adapter\Database\ReplicaConnection;
  6. use Shopware\Core\Framework\Api\Converter\ApiVersionConverter;
  7. use Shopware\Core\Framework\Api\Converter\Exceptions\ApiConversionException;
  8. use Shopware\Core\Framework\Api\Exception\InvalidSyncOperationException;
  9. use Shopware\Core\Framework\Context;
  10. use Shopware\Core\Framework\DataAbstractionLayer\DefinitionInstanceRegistry;
  11. use Shopware\Core\Framework\DataAbstractionLayer\EntityDefinition;
  12. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  13. use Shopware\Core\Framework\DataAbstractionLayer\EntityWriteResult;
  14. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  15. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenEvent;
  16. use Shopware\Core\Framework\DataAbstractionLayer\Indexing\EntityIndexerRegistry;
  17. use Shopware\Core\Framework\DataAbstractionLayer\Write\EntityWriterInterface;
  18. use Shopware\Core\Framework\DataAbstractionLayer\Write\WriteContext;
  19. use Shopware\Core\Framework\DataAbstractionLayer\Write\WriteException;
  20. use Shopware\Core\Framework\Feature;
  21. use Shopware\Core\Framework\Log\Package;
  22. use Shopware\Core\Framework\Struct\ArrayEntity;
  23. use Shopware\Core\Framework\Validation\WriteConstraintViolationException;
  24. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  25. #[Package('core')]
  26. class SyncService implements SyncServiceInterface
  27. {
  28.     private DefinitionInstanceRegistry $definitionRegistry;
  29.     private Connection $connection;
  30.     private ApiVersionConverter $apiVersionConverter;
  31.     private EntityWriterInterface $writer;
  32.     private EventDispatcherInterface $eventDispatcher;
  33.     /**
  34.      * @internal
  35.      */
  36.     public function __construct(
  37.         DefinitionInstanceRegistry $definitionRegistry,
  38.         Connection $connection,
  39.         ApiVersionConverter $apiVersionConverter,
  40.         EntityWriterInterface $writer,
  41.         EventDispatcherInterface $eventDispatcher
  42.     ) {
  43.         $this->definitionRegistry $definitionRegistry;
  44.         $this->connection $connection;
  45.         $this->apiVersionConverter $apiVersionConverter;
  46.         $this->writer $writer;
  47.         $this->eventDispatcher $eventDispatcher;
  48.     }
  49.     /**
  50.      * @param SyncOperation[] $operations
  51.      *
  52.      * @throws ConnectionException
  53.      * @throws InvalidSyncOperationException
  54.      */
  55.     public function sync(array $operationsContext $contextSyncBehavior $behavior): SyncResult
  56.     {
  57.         ReplicaConnection::ensurePrimary();
  58.         $context = clone $context;
  59.         if (\count($behavior->getSkipIndexers())) {
  60.             $context->addExtension(EntityIndexerRegistry::EXTENSION_INDEXER_SKIP, new ArrayEntity($behavior->getSkipIndexers()));
  61.         }
  62.         if (
  63.             $behavior->getIndexingBehavior() !== null
  64.             && \in_array($behavior->getIndexingBehavior(), [EntityIndexerRegistry::DISABLE_INDEXINGEntityIndexerRegistry::USE_INDEXING_QUEUE], true)
  65.         ) {
  66.             // @deprecated tag:v6.5.0 - complete if statement will be removed, context.state should be used instead
  67.             if (!Feature::isActive('v6.5.0.0')) {
  68.                 $context->addExtension($behavior->getIndexingBehavior(), new ArrayEntity());
  69.             }
  70.             $context->addState($behavior->getIndexingBehavior());
  71.         }
  72.         // allows to execute all writes inside a single transaction and a single entity write event
  73.         // @internal (flag:FEATURE_NEXT_15815) tag:v6.5.0 - Remove "IF" condition - useSingleOperation is always true
  74.         if ($behavior->useSingleOperation()) {
  75.             $result $this->writer->sync($operationsWriteContext::createFromContext($context));
  76.             $writes EntityWrittenContainerEvent::createWithWrittenEvents($result->getWritten(), $context, []);
  77.             $deletes EntityWrittenContainerEvent::createWithWrittenEvents($result->getDeleted(), $context, []);
  78.             if ($deletes->getEvents() !== null) {
  79.                 $writes->addEvent(...$deletes->getEvents()->getElements());
  80.             }
  81.             $this->eventDispatcher->dispatch($writes);
  82.             $ids $this->getWrittenEntities($result->getWritten());
  83.             $deleted $this->getWrittenEntitiesByEvent($deletes);
  84.             $notFound $this->getWrittenEntities($result->getNotFound());
  85.             //@internal (flag:FEATURE_NEXT_15815) - second construct parameter removed - simply remove if condition and all other code below
  86.             if (Feature::isActive('FEATURE_NEXT_15815')) {
  87.                 return new SyncResult($ids$notFound$deleted);
  88.             }
  89.             return new SyncResult($idstrue$notFound$deleted);
  90.         }
  91.         //@deprecated tag:v6.5.0 (flag:FEATURE_NEXT_15815) - remove all code below and all functions which will are no longer used
  92.         Feature::triggerDeprecationOrThrow(
  93.             'v6.5.0.0',
  94.             'Sync api can only be used in single operation mode in v6.5.0.0'
  95.         );
  96.         if ($behavior->failOnError()) {
  97.             $this->connection->beginTransaction();
  98.         }
  99.         $hasError false;
  100.         $results = [];
  101.         foreach ($operations as $operation) {
  102.             $this->validateSyncOperationInput($operation);
  103.             if (!$behavior->failOnError()) {
  104.                 //begin a new transaction for every operation to provide chunk-safe operations
  105.                 $this->connection->beginTransaction();
  106.             }
  107.             $result $this->execute($operation$context);
  108.             $results[$operation->getKey()] = $result;
  109.             if ($result->hasError()) {
  110.                 $hasError true;
  111.                 if ($behavior->failOnError()) {
  112.                     foreach ($results as $result) {
  113.                         $result->resetEntities();
  114.                     }
  115.                     continue;
  116.                 }
  117.                 $this->connection->rollBack();
  118.             } elseif (!$behavior->failOnError()) {
  119.                 // Only commit if transaction not already marked as rollback
  120.                 if (!$this->connection->isRollbackOnly()) {
  121.                     $this->connection->commit();
  122.                 } else {
  123.                     $this->connection->rollBack();
  124.                 }
  125.             }
  126.         }
  127.         if ($behavior->failOnError()) {
  128.             // Only commit if transaction not already marked as rollback
  129.             if ($hasError === false && !$this->connection->isRollbackOnly()) {
  130.                 $this->connection->commit();
  131.             } else {
  132.                 $this->connection->rollBack();
  133.             }
  134.         }
  135.         return new SyncResult($results$hasError === false);
  136.     }
  137.     private function execute(SyncOperation $operationContext $context): SyncOperationResult
  138.     {
  139.         $repository $this->definitionRegistry->getRepository($operation->getEntity());
  140.         switch (mb_strtolower($operation->getAction())) {
  141.             case SyncOperation::ACTION_UPSERT:
  142.                 return $this->upsertRecords($operation$context$repository);
  143.             case SyncOperation::ACTION_DELETE:
  144.                 return $this->deleteRecords($operation$context$repository);
  145.             default:
  146.                 throw new \RuntimeException(
  147.                     sprintf(
  148.                         'provided action "%s" is not supported. Following actions are supported: %s',
  149.                         $operation->getAction(),
  150.                         implode(', '$operation->getSupportedActions())
  151.                     )
  152.                 );
  153.         }
  154.     }
  155.     private function upsertRecords(
  156.         SyncOperation $operation,
  157.         Context $context,
  158.         EntityRepositoryInterface $repository
  159.     ): SyncOperationResult {
  160.         $results = [];
  161.         $records array_values($operation->getPayload());
  162.         $definition $repository->getDefinition();
  163.         foreach ($records as $index => $record) {
  164.             try {
  165.                 $record $this->convertToApiVersion($record$definition$index);
  166.                 $result $repository->upsert([$record], $context);
  167.                 $results[$index] = [
  168.                     'entities' => $this->getWrittenEntitiesByEvent($result),
  169.                     'errors' => [],
  170.                 ];
  171.             } catch (\Throwable $exception) {
  172.                 $writeException $this->getWriteError($exception$index);
  173.                 $errors = [];
  174.                 foreach ($writeException->getErrors() as $error) {
  175.                     $errors[] = $error;
  176.                 }
  177.                 $results[$index] = [
  178.                     'entities' => [],
  179.                     'errors' => $errors,
  180.                 ];
  181.             }
  182.         }
  183.         return new SyncOperationResult($results);
  184.     }
  185.     private function deleteRecords(
  186.         SyncOperation $operation,
  187.         Context $context,
  188.         EntityRepositoryInterface $repository
  189.     ): SyncOperationResult {
  190.         $results = [];
  191.         $records array_values($operation->getPayload());
  192.         $definition $repository->getDefinition();
  193.         foreach ($records as $index => $record) {
  194.             try {
  195.                 $record $this->convertToApiVersion($record$definition$index);
  196.                 $result $repository->delete([$record], $context);
  197.                 $results[$index] = [
  198.                     'entities' => $this->getWrittenEntitiesByEvent($result),
  199.                     'errors' => [],
  200.                 ];
  201.             } catch (\Throwable $exception) {
  202.                 $writeException $this->getWriteError($exception$index);
  203.                 $errors = [];
  204.                 foreach ($writeException->getErrors() as $error) {
  205.                     $errors[] = $error;
  206.                 }
  207.                 $results[$index] = [
  208.                     'entities' => [],
  209.                     'errors' => $errors,
  210.                 ];
  211.             }
  212.         }
  213.         return new SyncOperationResult($results);
  214.     }
  215.     /**
  216.      * @param array<string, mixed|null> $record
  217.      *
  218.      * @return array<string, mixed|null>
  219.      */
  220.     private function convertToApiVersion(array $recordEntityDefinition $definitionint $writeIndex): array
  221.     {
  222.         $exception = new ApiConversionException();
  223.         $converted $this->apiVersionConverter->convertPayload($definition$record$exception"/{$writeIndex}");
  224.         $exception->tryToThrow();
  225.         return $converted;
  226.     }
  227.     private function getWriteError(\Throwable $exceptionint $writeIndex): WriteException
  228.     {
  229.         if ($exception instanceof WriteException) {
  230.             foreach ($exception->getExceptions() as $innerException) {
  231.                 if ($innerException instanceof WriteConstraintViolationException) {
  232.                     $path preg_replace('/^\/0/'"/{$writeIndex}"$innerException->getPath());
  233.                     if ($path !== null) {
  234.                         $innerException->setPath($path);
  235.                     }
  236.                 }
  237.             }
  238.             return $exception;
  239.         }
  240.         return (new WriteException())->add($exception);
  241.     }
  242.     /**
  243.      * @param array<string, EntityWriteResult[]> $grouped
  244.      *
  245.      * @return array<string, array<int, mixed>>
  246.      */
  247.     private function getWrittenEntities(array $grouped): array
  248.     {
  249.         $mapped = [];
  250.         foreach ($grouped as $entity => $results) {
  251.             foreach ($results as $result) {
  252.                 $mapped[$entity][] = $result->getPrimaryKey();
  253.             }
  254.         }
  255.         ksort($mapped);
  256.         return $mapped;
  257.     }
  258.     /**
  259.      * @return array<string, array<int, mixed>>
  260.      */
  261.     private function getWrittenEntitiesByEvent(EntityWrittenContainerEvent $result): array
  262.     {
  263.         $entities = [];
  264.         /** @var EntityWrittenEvent $event */
  265.         foreach ($result->getEvents() ?? [] as $event) {
  266.             $entity $event->getEntityName();
  267.             if (!isset($entities[$entity])) {
  268.                 $entities[$entity] = [];
  269.             }
  270.             $entities[$entity] = array_merge($entities[$entity], $event->getIds());
  271.         }
  272.         ksort($entities);
  273.         return $entities;
  274.     }
  275.     /**
  276.      * @deprecated tag:v6.5.0 - Sync Operation will be validated inside EntityWriter instead.
  277.      *
  278.      * @throws InvalidSyncOperationException
  279.      */
  280.     private function validateSyncOperationInput(SyncOperation $operation): void
  281.     {
  282.         $errors $operation->validate();
  283.         if (\count($errors)) {
  284.             throw new InvalidSyncOperationException(sprintf('Invalid sync operation. %s'implode(' '$errors)));
  285.         }
  286.     }
  287. }