diff --git a/src/Gateway/LtbxdGateway.php b/src/Gateway/LtbxdGateway.php index c64f5d2..04af1ec 100644 --- a/src/Gateway/LtbxdGateway.php +++ b/src/Gateway/LtbxdGateway.php @@ -20,13 +20,13 @@ readonly class LtbxdGateway * @return LtbxdMovie[] * @throws GatewayException */ - public function parseFile(): array + public function parseFileFromPath(string $path): array { - if (!file_exists($this->fileDir)) { - throw new GatewayException(sprintf('Could not find file %s', $this->fileDir)); + if (!file_exists($path)) { + throw new GatewayException(sprintf('Could not find file %s', $path)); } - $fileContent = file_get_contents($this->fileDir); + $fileContent = file_get_contents($path); try { return $this->serializer->deserialize($fileContent, LtbxdMovie::class.'[]', 'csv'); @@ -34,4 +34,13 @@ readonly class LtbxdGateway throw new GatewayException('Error while deserializing Letterboxd data', previous: $e); } } + + /** + * @return LtbxdMovie[] + * @throws GatewayException + */ + public function parseFile(): array + { + return $this->parseFileFromPath($this->fileDir); + } } diff --git a/src/MessageHandler/ProcessImportMessageHandler.php b/src/MessageHandler/ProcessImportMessageHandler.php new file mode 100644 index 0000000..063bcd3 --- /dev/null +++ b/src/MessageHandler/ProcessImportMessageHandler.php @@ -0,0 +1,83 @@ +em->getRepository(Import::class)->find($message->importId); + if (!$import) { + $this->logger->error('Import not found', ['importId' => $message->importId]); + + return; + } + + try { + $csvContent = $this->defaultStorage->read($import->getFilePath()); + + $tmpFile = tempnam(sys_get_temp_dir(), 'import_'); + file_put_contents($tmpFile, $csvContent); + + try { + $ltbxdMovies = $this->ltbxdGateway->parseFileFromPath($tmpFile); + } finally { + unlink($tmpFile); + } + + $totalFilms = count($ltbxdMovies); + $totalBatches = (int) ceil($totalFilms / self::BATCH_SIZE); + + $import->setTotalFilms($totalFilms); + $import->setTotalBatches($totalBatches); + $import->setStatus(Import::STATUS_PROCESSING); + $this->em->flush(); + + for ($i = 0; $i < $totalBatches; $i++) { + $this->bus->dispatch(new ImportFilmsBatchMessage( + importId: $import->getId(), + offset: $i * self::BATCH_SIZE, + limit: self::BATCH_SIZE, + )); + } + } catch (\Throwable $e) { + $this->logger->error('Import processing failed', [ + 'importId' => $import->getId(), + 'error' => $e->getMessage(), + ]); + + $import->setStatus(Import::STATUS_FAILED); + $this->em->flush(); + + $notification = new Notification(); + $notification->setUser($import->getUser()); + $notification->setMessage('L\'import a échoué.'); + $this->em->persist($notification); + $this->em->flush(); + } + } +}